from __future__ import annotations

import dataclasses
import functools
import logging
import time
import urllib.parse

from os.path import commonprefix
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

import requests
import requests.adapters
import requests.auth
import requests.exceptions

from cachecontrol import CacheControlAdapter
from cachecontrol.caches import FileCache
from requests_toolbelt import user_agent

from poetry.__version__ import __version__
from poetry.config.config import Config
from poetry.console.exceptions import ConsoleMessage
from poetry.console.exceptions import PoetryRuntimeError
from poetry.exceptions import PoetryError
from poetry.utils.constants import REQUESTS_TIMEOUT
from poetry.utils.constants import RETRY_AFTER_HEADER
from poetry.utils.constants import STATUS_FORCELIST
from poetry.utils.password_manager import HTTPAuthCredential
from poetry.utils.password_manager import PasswordManager


if TYPE_CHECKING:
    from cleo.io.io import IO


logger = logging.getLogger(__name__)


@dataclasses.dataclass(frozen=True)
class RepositoryCertificateConfig:
    cert: Path | None = dataclasses.field(default=None)
    client_cert: Path | None = dataclasses.field(default=None)
    verify: bool = dataclasses.field(default=True)

    @classmethod
    def create(
        cls, repository: str, config: Config | None
    ) -> RepositoryCertificateConfig:
        config = config if config else Config.create()

        verify: str | bool = config.get(
            f"certificates.{repository}.verify",
            config.get(f"certificates.{repository}.cert", True),
        )
        client_cert: str = config.get(f"certificates.{repository}.client-cert")

        return cls(
            cert=Path(verify) if isinstance(verify, str) else None,
            client_cert=Path(client_cert) if client_cert else None,
            verify=verify if isinstance(verify, bool) else True,
        )


@dataclasses.dataclass
class AuthenticatorRepositoryConfig:
    name: str
    url: str
    netloc: str = dataclasses.field(init=False)
    path: str = dataclasses.field(init=False)

    def __post_init__(self) -> None:
        parsed_url = urllib.parse.urlsplit(self.url)
        self.netloc = parsed_url.netloc
        self.path = parsed_url.path

    def certs(self, config: Config) -> RepositoryCertificateConfig:
        return RepositoryCertificateConfig.create(self.name, config)

    def get_http_credentials(
        self, password_manager: PasswordManager
    ) -> HTTPAuthCredential:
        # try with the repository name via the password manager
        credential = password_manager.get_http_auth(self.name)

        if credential.password is not None:
            return credential

        if password_manager.use_keyring:
            # fallback to url and netloc based keyring entries
            credential = password_manager.get_credential(
                self.url, self.netloc, username=credential.username
            )

        return credential


class Authenticator:
    def __init__(
        self,
        config: Config | None = None,
        io: IO | None = None,
        cache_id: str | None = None,
        disable_cache: bool = False,
        pool_size: int = requests.adapters.DEFAULT_POOLSIZE,
    ) -> None:
        self._config = config or Config.create()
        self._io = io
        self._sessions_for_netloc: dict[str, requests.Session] = {}
        self._credentials: dict[str, HTTPAuthCredential] = {}
        self._certs: dict[str, RepositoryCertificateConfig] = {}
        self._configured_repositories: (
            dict[str, AuthenticatorRepositoryConfig] | None
        ) = None
        self._password_manager = PasswordManager(self._config)
        self._cache_control = (
            FileCache(
                self._config.repository_cache_directory
                / (cache_id or "_default_cache")
                / "_http"
            )
            if not disable_cache
            else None
        )
        self.get_repository_config_for_url = functools.lru_cache(maxsize=None)(
            self._get_repository_config_for_url
        )
        self._pool_size = pool_size
        self._user_agent = user_agent("poetry", __version__)

    def create_session(self) -> requests.Session:
        session = requests.Session()
        session.headers["User-Agent"] = self._user_agent

        if self._cache_control is None:
            return session

        adapter = CacheControlAdapter(
            cache=self._cache_control,
            pool_maxsize=self._pool_size,
        )
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        return session

    def get_session(self, url: str | None = None) -> requests.Session:
        if not url:
            return self.create_session()

        parsed_url = urllib.parse.urlsplit(url)
        netloc = parsed_url.netloc

        if netloc not in self._sessions_for_netloc:
            logger.debug("Creating new session for %s", netloc)
            self._sessions_for_netloc[netloc] = self.create_session()

        return self._sessions_for_netloc[netloc]

    def close(self) -> None:
        for session in self._sessions_for_netloc.values():
            if session is not None:
                session.close()

    def __del__(self) -> None:
        self.close()

    def delete_cache(self, url: str) -> None:
        if self._cache_control is not None:
            self._cache_control.delete(key=url)

    def authenticated_url(self, url: str) -> str:
        parsed = urllib.parse.urlparse(url)
        credential = self.get_credentials_for_url(url)

        if credential.username is not None and credential.password is not None:
            username = urllib.parse.quote(credential.username, safe="")
            password = urllib.parse.quote(credential.password, safe="")

            return (
                f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
            )

        return url

    def request(
        self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any
    ) -> requests.Response:
        headers = kwargs.get("headers")
        request = requests.Request(method, url, headers=headers)
        credential = self.get_credentials_for_url(url)

        if credential.username is not None or credential.password is not None:
            request = requests.auth.HTTPBasicAuth(
                credential.username or "", credential.password or ""
            )(request)

        session = self.get_session(url=url)
        prepared_request = session.prepare_request(request)

        proxies: dict[str, str] = kwargs.get("proxies", {})
        stream: bool | None = kwargs.get("stream")

        certs = self.get_certs_for_url(url)
        verify: bool | str | Path = kwargs.get("verify") or certs.cert or certs.verify
        cert: str | Path | None = kwargs.get("cert") or certs.client_cert

        if cert is not None:
            cert = str(cert)

        verify = str(verify) if isinstance(verify, Path) else verify

        settings = session.merge_environment_settings(
            prepared_request.url, proxies, stream, verify, cert
        )

        # Send the request.
        send_kwargs = {
            "timeout": kwargs.get("timeout", REQUESTS_TIMEOUT),
            "allow_redirects": kwargs.get("allow_redirects", True),
        }
        send_kwargs.update(settings)

        attempt = 0
        resp = None

        while True:
            is_last_attempt = attempt >= 5
            try:
                resp = session.send(prepared_request, **send_kwargs)
            except (requests.exceptions.ConnectionError, OSError) as e:
                if is_last_attempt:
                    parsed_url = urllib.parse.urlsplit(url)
                    exc = PoetryRuntimeError.create(
                        reason=f"<error>All attempts to connect to <c1>{parsed_url.netloc}</> failed.</>",
                        exception=e,
                    )
                    exc.append(
                        ConsoleMessage(
                            "the server is not responding to requests at the moment\n"
                            "the hostname cannot be resolved by your DNS\n"
                            "your network is not connected to the internet\n"
                        )
                        .indent("    - ")
                        .make_section("Probable Causes")
                        .wrap("warning")
                    )
                    exc.append(
                        ConsoleMessage(
                            f"<b>Note:</> The path requested was <c1>{parsed_url.path}</>.",
                            debug=True,
                        )
                    )
                    raise exc
            else:
                if resp.status_code not in STATUS_FORCELIST or is_last_attempt:
                    if raise_for_status:
                        resp.raise_for_status()
                    return resp

            if not is_last_attempt:
                attempt += 1
                delay = self._get_backoff(resp, attempt)
                logger.debug("Retrying HTTP request in %s seconds.", delay)
                time.sleep(delay)
                continue

        # this should never really be hit under any sane circumstance
        raise PoetryError(f"Failed HTTP request: {method.upper()} {url}")

    def _get_backoff(self, response: requests.Response | None, attempt: int) -> float:
        if response is not None:
            retry_after = response.headers.get(RETRY_AFTER_HEADER, "")
            if retry_after:
                return float(retry_after)

        return 0.5 * attempt

    def get(self, url: str, **kwargs: Any) -> requests.Response:
        return self.request("get", url, **kwargs)

    def head(self, url: str, **kwargs: Any) -> requests.Response:
        kwargs.setdefault("allow_redirects", False)
        return self.request("head", url, **kwargs)

    def post(self, url: str, **kwargs: Any) -> requests.Response:
        return self.request("post", url, **kwargs)

    def _get_credentials_for_repository(
        self, repository: AuthenticatorRepositoryConfig
    ) -> HTTPAuthCredential:
        # cache repository credentials by repository url to avoid multiple keyring
        # backend queries when packages are being downloaded from the same source
        key = repository.url

        if key not in self._credentials:
            self._credentials[key] = repository.get_http_credentials(
                password_manager=self._password_manager
            )

        return self._credentials[key]

    def _get_credentials_for_url(
        self, url: str, exact_match: bool = False
    ) -> HTTPAuthCredential:
        repository = self.get_repository_config_for_url(url, exact_match)

        credential = (
            self._get_credentials_for_repository(repository=repository)
            if repository is not None
            else HTTPAuthCredential()
        )

        if credential.password is None:
            parsed_url = urllib.parse.urlsplit(url)
            netloc = parsed_url.netloc
            credential = self._password_manager.get_credential(
                url, netloc, username=credential.username
            )

            return HTTPAuthCredential(
                username=credential.username, password=credential.password
            )

        return credential

    def get_credentials_for_git_url(self, url: str) -> HTTPAuthCredential:
        parsed_url = urllib.parse.urlsplit(url)

        if parsed_url.scheme not in {"http", "https"}:
            return HTTPAuthCredential()

        key = f"git+{url}"

        if key not in self._credentials:
            self._credentials[key] = self._get_credentials_for_url(url, True)

        return self._credentials[key]

    def get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
        parsed_url = urllib.parse.urlsplit(url)
        netloc = parsed_url.netloc

        if url not in self._credentials:
            if "@" not in netloc:
                # no credentials were provided in the url, try finding the
                # best repository configuration
                self._credentials[url] = self._get_credentials_for_url(url)
            else:
                # Split from the right because that's how urllib.parse.urlsplit()
                # behaves if more than one @ is present (which can be checked using
                # the password attribute of urlsplit()'s return value).
                auth, netloc = netloc.rsplit("@", 1)
                # Split from the left because that's how urllib.parse.urlsplit()
                # behaves if more than one : is present (which again can be checked
                # using the password attribute of the return value)
                user, password = auth.split(":", 1) if ":" in auth else (auth, "")
                self._credentials[url] = HTTPAuthCredential(
                    urllib.parse.unquote(user),
                    urllib.parse.unquote(password),
                )

        return self._credentials[url]

    def get_pypi_token(self, name: str) -> str | None:
        return self._password_manager.get_pypi_token(name)

    def get_http_auth(self, name: str) -> HTTPAuthCredential | None:
        if name == "pypi":
            repository = AuthenticatorRepositoryConfig(
                name, "https://upload.pypi.org/legacy/"
            )
        else:
            if name not in self.configured_repositories:
                return None
            repository = self.configured_repositories[name]

        return self._get_credentials_for_repository(repository=repository)

    def get_certs_for_repository(self, name: str) -> RepositoryCertificateConfig:
        if name.lower() == "pypi" or name not in self.configured_repositories:
            return RepositoryCertificateConfig()
        return self.configured_repositories[name].certs(self._config)

    @property
    def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]:
        if self._configured_repositories is None:
            self._configured_repositories = {}
            for repository_name in self._config.get("repositories", []):
                url = self._config.get(f"repositories.{repository_name}.url")
                self._configured_repositories[repository_name] = (
                    AuthenticatorRepositoryConfig(repository_name, url)
                )

        return self._configured_repositories

    def reset_credentials_cache(self) -> None:
        self.get_repository_config_for_url.cache_clear()
        self._credentials = {}

    def add_repository(self, name: str, url: str) -> None:
        self.configured_repositories[name] = AuthenticatorRepositoryConfig(name, url)
        self.reset_credentials_cache()

    def get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
        if url not in self._certs:
            self._certs[url] = self._get_certs_for_url(url)
        return self._certs[url]

    def _get_repository_config_for_url(
        self, url: str, exact_match: bool = False
    ) -> AuthenticatorRepositoryConfig | None:
        parsed_url = urllib.parse.urlsplit(url)
        candidates_netloc_only = []
        candidates_path_match = []

        for repository in self.configured_repositories.values():
            if exact_match:
                if parsed_url.path == repository.path:
                    return repository
                continue

            if repository.netloc == parsed_url.netloc:
                if parsed_url.path.startswith(repository.path) or commonprefix(
                    (parsed_url.path, repository.path)
                ):
                    candidates_path_match.append(repository)
                    continue
                candidates_netloc_only.append(repository)

        if candidates_path_match:
            candidates = candidates_path_match
        elif candidates_netloc_only:
            candidates = candidates_netloc_only
        else:
            return None

        if len(candidates) > 1:
            logger.debug(
                "Multiple source configurations found for %s - %s",
                parsed_url.netloc,
                ", ".join(c.name for c in candidates),
            )
            # prefer the more specific path
            candidates.sort(
                key=lambda c: len(commonprefix([parsed_url.path, c.path])), reverse=True
            )

        return candidates[0]

    def _get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
        selected = self.get_repository_config_for_url(url)
        if selected:
            return selected.certs(config=self._config)
        return RepositoryCertificateConfig()


_authenticator: Authenticator | None = None


def get_default_authenticator() -> Authenticator:
    global _authenticator

    if _authenticator is None:
        _authenticator = Authenticator()

    return _authenticator
