import re
import warnings
from http.cookies import SimpleCookie
from pathlib import Path
from typing import Any, List, Literal, Optional, Tuple, Union, cast

import certifi

from ._wrapper import ffi, lib
from .const import CurlECode, CurlHttpVersion, CurlInfo, CurlOpt, CurlWsFlag

DEFAULT_CACERT = certifi.where()
REASON_PHRASE_RE = re.compile(rb"HTTP/\d\.\d [0-9]{3} (.*)")
STATUS_LINE_RE = re.compile(rb"HTTP/(\d\.\d) ([0-9]{3}) (.*)")


class CurlError(Exception):
    """Base exception for curl_cffi package"""

    def __init__(self, msg, code: Union[CurlECode, Literal[0]] = 0, *args, **kwargs):
        super().__init__(msg, *args, **kwargs)
        self.code: Union[CurlECode, Literal[0]] = code


CURLINFO_TEXT = 0
CURLINFO_HEADER_IN = 1
CURLINFO_HEADER_OUT = 2
CURLINFO_DATA_IN = 3
CURLINFO_DATA_OUT = 4
CURLINFO_SSL_DATA_IN = 5
CURLINFO_SSL_DATA_OUT = 6

CURL_WRITEFUNC_PAUSE = 0x10000001
CURL_WRITEFUNC_ERROR = 0xFFFFFFFF


@ffi.def_extern()
def debug_function(curl, type: int, data, size, clientp) -> int:
    """ffi callback for curl debug info"""
    text = ffi.buffer(data, size)[:]
    if type in (CURLINFO_SSL_DATA_IN, CURLINFO_SSL_DATA_OUT):
        print("SSL OUT", text)
    elif type in (CURLINFO_DATA_IN, CURLINFO_DATA_OUT):
        print(text.decode("utf-8", "replace"))
    else:
        print(text.decode(), end="")
    return 0


@ffi.def_extern()
def buffer_callback(ptr, size, nmemb, userdata):
    """ffi callback for curl write function, directly writes to a buffer"""
    # assert size == 1
    buffer = ffi.from_handle(userdata)
    buffer.write(ffi.buffer(ptr, nmemb)[:])
    return nmemb * size


def ensure_int(s):
    if not s:
        return 0
    return int(s)


@ffi.def_extern()
def write_callback(ptr, size, nmemb, userdata):
    """ffi callback for curl write function, calls the callback python function"""
    # although similar enough to the function above, kept here for performance reasons
    callback = ffi.from_handle(userdata)
    wrote = callback(ffi.buffer(ptr, nmemb)[:])
    wrote = ensure_int(wrote)
    if wrote == CURL_WRITEFUNC_PAUSE or wrote == CURL_WRITEFUNC_ERROR:  # noqa: SIM109
        return wrote
    # should make this an exception in future versions
    if wrote != nmemb * size:
        warnings.warn("Wrote bytes != received bytes.", RuntimeWarning, stacklevel=2)
    return nmemb * size


# Credits: @alexio777 on https://github.com/lexiforest/curl_cffi/issues/4
def slist_to_list(head) -> List[bytes]:
    """Converts curl slist to a python list."""
    result = []
    ptr = head
    while ptr:
        result.append(ffi.string(ptr.data))
        ptr = ptr.next
    lib.curl_slist_free_all(head)
    return result


class Curl:
    """
    Wrapper for ``curl_easy_*`` functions of libcurl.
    """

    def __init__(self, cacert: str = "", debug: bool = False, handle=None) -> None:
        """
        Parameters:
            cacert: CA cert path to use, by default, curl_cffi uses certs from ``certifi``.
            debug: whether to show curl debug messages.
            handle: a curl handle instance from ``curl_easy_init``.
        """
        self._curl = handle if handle else lib.curl_easy_init()
        self._headers = ffi.NULL
        self._proxy_headers = ffi.NULL
        self._resolve = ffi.NULL
        self._cacert = cacert or DEFAULT_CACERT
        self._is_cert_set = False
        self._write_handle = None
        self._header_handle = None
        self._body_handle = None
        # TODO: use CURL_ERROR_SIZE
        self._error_buffer = ffi.new("char[]", 256)
        self._debug = debug
        self._set_error_buffer()

    def _set_error_buffer(self) -> None:
        ret = lib._curl_easy_setopt(self._curl, CurlOpt.ERRORBUFFER, self._error_buffer)
        if ret != 0:
            warnings.warn("Failed to set error buffer", stacklevel=2)
        if self._debug:
            self.setopt(CurlOpt.VERBOSE, 1)
            lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)

    def debug(self) -> None:
        """Set debug to True"""
        self.setopt(CurlOpt.VERBOSE, 1)
        lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)

    def __del__(self) -> None:
        self.close()

    def _check_error(self, errcode: int, *args: Any) -> None:
        error = self._get_error(errcode, *args)
        if error is not None:
            raise error

    def _get_error(self, errcode: int, *args: Any):
        if errcode != 0:
            errmsg = ffi.string(self._error_buffer).decode()
            action = " ".join([str(a) for a in args])
            return CurlError(
                f"Failed to {action}, curl: ({errcode}) {errmsg}. "
                "See https://curl.se/libcurl/c/libcurl-errors.html first for more details.",
                code=cast(CurlECode, errcode),
            )

    def setopt(self, option: CurlOpt, value: Any) -> int:
        """Wrapper for ``curl_easy_setopt``.

        Parameters:
            option: option to set, using constants from CurlOpt enum
            value: value to set, strings will be handled automatically

        Returns:
            0 if no error, see ``CurlECode``.
        """
        input_option = {
            # this should be int in curl, but cffi requires pointer for void*
            # it will be convert back in the glue c code.
            0: "long*",
            10000: "char*",
            20000: "void*",
            30000: "int64_t*",  # offset type
            40000: "void*",  # blob type
        }
        # print("option", option, "value", value)

        # Convert value
        value_type = input_option.get((option // 10000) * 10000)
        if value_type == "long*" or value_type == "int64_t*":
            c_value = ffi.new(value_type, value)
        elif option == CurlOpt.WRITEDATA:
            c_value = ffi.new_handle(value)
            self._write_handle = c_value
            lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.buffer_callback)
        elif option == CurlOpt.HEADERDATA:
            c_value = ffi.new_handle(value)
            self._header_handle = c_value
            lib._curl_easy_setopt(self._curl, CurlOpt.HEADERFUNCTION, lib.buffer_callback)
        elif option == CurlOpt.WRITEFUNCTION:
            c_value = ffi.new_handle(value)
            self._write_handle = c_value
            lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback)
            option = CurlOpt.WRITEDATA
        elif option == CurlOpt.HEADERFUNCTION:
            c_value = ffi.new_handle(value)
            self._header_handle = c_value
            lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback)
            option = CurlOpt.HEADERDATA
        elif value_type == "char*":
            c_value = value.encode() if isinstance(value, str) else value
            # Must keep a reference, otherwise may be GCed.
            if option == CurlOpt.POSTFIELDS:
                self._body_handle = c_value
        else:
            raise NotImplementedError(f"Option unsupported: {option}")

        if option == CurlOpt.HTTPHEADER:
            for header in value:
                self._headers = lib.curl_slist_append(self._headers, header)
            ret = lib._curl_easy_setopt(self._curl, option, self._headers)
        elif option == CurlOpt.PROXYHEADER:
            for proxy_header in value:
                self._proxy_headers = lib.curl_slist_append(self._proxy_headers, proxy_header)
            ret = lib._curl_easy_setopt(self._curl, option, self._proxy_headers)
        elif option == CurlOpt.RESOLVE:
            for resolve in value:
                if isinstance(resolve, str):
                    resolve = resolve.encode()
                self._resolve = lib.curl_slist_append(self._resolve, resolve)
            ret = lib._curl_easy_setopt(self._curl, option, self._resolve)
        else:
            ret = lib._curl_easy_setopt(self._curl, option, c_value)
        self._check_error(ret, "setopt", option, value)

        if option == CurlOpt.CAINFO:
            self._is_cert_set = True

        return ret

    def getinfo(self, option: CurlInfo) -> Union[bytes, int, float, List]:
        """Wrapper for ``curl_easy_getinfo``. Gets information in response after curl perform.

        Parameters:
            option: option to get info of, using constants from ``CurlInfo`` enum

        Returns:
            value retrieved from last perform.
        """
        ret_option = {
            0x100000: "char**",
            0x200000: "long*",
            0x300000: "double*",
            0x400000: "struct curl_slist **",
        }
        ret_cast_option = {
            0x100000: ffi.string,
            0x200000: int,
            0x300000: float,
        }
        c_value = ffi.new(ret_option[option & 0xF00000])
        ret = lib.curl_easy_getinfo(self._curl, option, c_value)
        self._check_error(ret, "getinfo", option)
        # cookielist and ssl_engines starts with 0x400000, see also: const.py
        if option & 0xF00000 == 0x400000:
            return slist_to_list(c_value[0])
        if c_value[0] == ffi.NULL:
            return b""
        return ret_cast_option[option & 0xF00000](c_value[0])

    def version(self) -> bytes:
        """Get the underlying libcurl version."""
        return ffi.string(lib.curl_version())

    def impersonate(self, target: str, default_headers: bool = True) -> int:
        """Set the browser type to impersonate.

        Parameters:
            target: browser to impersonate.
            default_headers: whether to add default headers, like User-Agent.

        Returns:
            0 if no error.
        """
        return lib.curl_easy_impersonate(self._curl, target.encode(), int(default_headers))

    def _ensure_cacert(self) -> None:
        if not self._is_cert_set:
            ret = self.setopt(CurlOpt.CAINFO, self._cacert)
            self._check_error(ret, "set cacert")
            ret = self.setopt(CurlOpt.PROXY_CAINFO, self._cacert)
            self._check_error(ret, "set proxy cacert")

    def perform(self, clear_headers: bool = True) -> None:
        """Wrapper for ``curl_easy_perform``, performs a curl request.

        Parameters:
            clear_headers: clear header slist used in this perform

        Raises:
            CurlError: if the perform was not successful.
        """
        # make sure we set a cacert store
        self._ensure_cacert()

        # here we go
        ret = lib.curl_easy_perform(self._curl)

        try:
            self._check_error(ret, "perform")
        finally:
            # cleaning
            self.clean_after_perform(clear_headers)

    def clean_after_perform(self, clear_headers: bool = True) -> None:
        """Clean up handles and buffers after perform, called at the end of `perform`."""
        self._write_handle = None
        self._header_handle = None
        self._body_handle = None
        if clear_headers:
            if self._headers != ffi.NULL:
                lib.curl_slist_free_all(self._headers)
            self._headers = ffi.NULL

            if self._proxy_headers != ffi.NULL:
                lib.curl_slist_free_all(self._proxy_headers)
            self._proxy_headers = ffi.NULL

    def duphandle(self) -> "Curl":
        """Wrapper for ``curl_easy_duphandle``.

        This is not a full copy of entire curl object in python. For example, headers
        handle is not copied, you have to set them again."""
        new_handle = lib.curl_easy_duphandle(self._curl)
        c = Curl(cacert=self._cacert, debug=self._debug, handle=new_handle)
        return c

    def reset(self) -> None:
        """Reset all curl options, wrapper for ``curl_easy_reset``."""
        self._is_cert_set = False
        if self._curl is not None:
            lib.curl_easy_reset(self._curl)
            self._set_error_buffer()
        self._resolve = ffi.NULL

    def parse_cookie_headers(self, headers: List[bytes]) -> SimpleCookie:
        """Extract ``cookies.SimpleCookie`` from header lines.

        Parameters:
            headers: list of headers in bytes.

        Returns:
            A parsed cookies.SimpleCookie instance.
        """
        cookie: SimpleCookie = SimpleCookie()
        for header in headers:
            if header.lower().startswith(b"set-cookie: "):
                cookie.load(header[12:].decode())  # len("set-cookie: ") == 12
        return cookie

    @staticmethod
    def get_reason_phrase(status_line: bytes) -> bytes:
        """Extract reason phrase, like ``OK``, ``Not Found`` from response status line."""
        m = REASON_PHRASE_RE.match(status_line)
        return m.group(1) if m else b""

    @staticmethod
    def parse_status_line(status_line: bytes) -> Tuple[CurlHttpVersion, int, bytes]:
        """Parse status line.

        Returns:
            http_version, status_code, and reason phrase
        """
        m = STATUS_LINE_RE.match(status_line)
        if not m:
            return CurlHttpVersion.V1_0, 0, b""
        if m.group(1) == "2.0":
            http_version = CurlHttpVersion.V2_0
        elif m.group(1) == "1.1":
            http_version = CurlHttpVersion.V1_1
        elif m.group(1) == "1.0":
            http_version = CurlHttpVersion.V1_0
        else:
            http_version = CurlHttpVersion.NONE
        status_code = int(m.group(2))
        reason = m.group(3)

        return http_version, status_code, reason

    def close(self) -> None:
        """Close and cleanup curl handle, wrapper for ``curl_easy_cleanup``."""
        if self._curl:
            lib.curl_easy_cleanup(self._curl)
            self._curl = None
        ffi.release(self._error_buffer)
        self._resolve = ffi.NULL

    def ws_recv(self, n: int = 1024) -> Tuple[bytes, Any]:
        """Receive a frame from a websocket connection.

        Args:
            n: maximum data to receive.

        Returns:
            a tuple of frame content and curl frame meta struct.

        Raises:
            CurlError: if failed.
        """
        buffer = ffi.new("char[]", n)
        n_recv = ffi.new("int *")
        p_frame = ffi.new("struct curl_ws_frame **")

        ret = lib.curl_ws_recv(self._curl, buffer, n, n_recv, p_frame)
        self._check_error(ret, "WS_RECV")

        # Frame meta explained: https://curl.se/libcurl/c/curl_ws_meta.html
        frame = p_frame[0]

        return ffi.buffer(buffer)[: n_recv[0]], frame

    def ws_send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY) -> int:
        """Send data to a websocket connection.

        Args:
            payload: content to send.
            flags: websocket flag to set for the frame, default: binary.

        Returns:
            0 if no error.

        Raises:
            CurlError: if failed.
        """
        n_sent = ffi.new("int *")
        buffer = ffi.from_buffer(payload)
        ret = lib.curl_ws_send(self._curl, buffer, len(buffer), n_sent, 0, flags)
        self._check_error(ret, "WS_SEND")
        return n_sent[0]

    def ws_close(self) -> None:
        """Send the close frame."""
        self.ws_send(b"", CurlWsFlag.CLOSE)


class CurlMime:
    """Wrapper for the ``curl_mime_`` API."""

    def __init__(self, curl: Optional[Curl] = None):
        """
        Args:
            curl: Curl instance to use.
        """
        self._curl = curl if curl else Curl()
        self._form = lib.curl_mime_init(self._curl._curl)

    def addpart(
        self,
        name: str,
        *,
        content_type: Optional[str] = None,
        filename: Optional[str] = None,
        local_path: Optional[Union[str, bytes, Path]] = None,
        data: Optional[bytes] = None,
    ) -> None:
        """Add a mime part for a mutlipart html form.

        Note: You can only use either local_path or data, not both.

        Args:
            name: name of the field.
            content_type: content_type for the field. for example: ``image/png``.
            filename: filename for the server.
            local_path: file to upload on local disk.
            data: file content to upload.
        """
        part = lib.curl_mime_addpart(self._form)

        ret = lib.curl_mime_name(part, name.encode())
        if ret != 0:
            raise CurlError("Add field failed.")

        # mime type
        if content_type is not None:
            ret = lib.curl_mime_type(part, content_type.encode())
            if ret != 0:
                raise CurlError("Add field failed.")

        # remote file name
        if filename is not None:
            ret = lib.curl_mime_filename(part, filename.encode())
            if ret != 0:
                raise CurlError("Add field failed.")

        if local_path and data:
            raise CurlError("Can not use local_path and data at the same time.")

        # this is a filename
        if local_path is not None:
            if isinstance(local_path, Path):
                local_path_str = str(local_path)
            elif isinstance(local_path, bytes):
                local_path_str = local_path.decode()
            else:
                local_path_str = local_path

            if not Path(local_path_str).exists():
                raise FileNotFoundError(f"File not found at {local_path_str}")
            ret = lib.curl_mime_filedata(part, local_path_str.encode())
            if ret != 0:
                raise CurlError("Add field failed.")

        if data is not None:
            if not isinstance(data, bytes):
                data = str(data).encode()
            ret = lib.curl_mime_data(part, data, len(data))

    @classmethod
    def from_list(cls, files: List[dict]):
        """Create a multipart instance from a list of dict, for keys, see ``addpart``"""
        form = cls()
        for file in files:
            form.addpart(**file)
        return form

    def attach(self, curl: Optional[Curl] = None) -> None:
        """Attach the mime instance to a curl instance."""
        c = curl if curl else self._curl
        c.setopt(CurlOpt.MIMEPOST, self._form)

    def close(self) -> None:
        """Close the mime instance and underlying files. This method must be called after
        ``perform`` or ``request``."""
        lib.curl_mime_free(self._form)
        self._form = ffi.NULL

    def __del__(self) -> None:
        self.close()
