from collections import defaultdict
from collections import deque
from itertools import chain
import linecache
import os
from pathlib import Path
import sys
import threading
from types import CoroutineType
from types import FunctionType
from types import ModuleType
from typing import Any
from typing import Deque
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import cast

import ddtrace
from ddtrace import config as ddconfig
from ddtrace._trace.tracer import Tracer
from ddtrace.debugging._async import dd_coroutine_wrapper
from ddtrace.debugging._config import di_config
from ddtrace.debugging._config import ed_config
from ddtrace.debugging._encoding import LogSignalJsonEncoder
from ddtrace.debugging._encoding import SignalQueue
from ddtrace.debugging._exception.auto_instrument import SpanExceptionProcessor
from ddtrace.debugging._function.discovery import FunctionDiscovery
from ddtrace.debugging._function.store import FullyNamedWrappedFunction
from ddtrace.debugging._function.store import FunctionStore
from ddtrace.debugging._metrics import metrics
from ddtrace.debugging._probe.model import FunctionLocationMixin
from ddtrace.debugging._probe.model import FunctionProbe
from ddtrace.debugging._probe.model import LineLocationMixin
from ddtrace.debugging._probe.model import LineProbe
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import MetricFunctionProbe
from ddtrace.debugging._probe.model import MetricLineProbe
from ddtrace.debugging._probe.model import Probe
from ddtrace.debugging._probe.model import SpanDecorationFunctionProbe
from ddtrace.debugging._probe.model import SpanDecorationLineProbe
from ddtrace.debugging._probe.model import SpanFunctionProbe
from ddtrace.debugging._probe.registry import ProbeRegistry
from ddtrace.debugging._probe.remoteconfig import ProbePollerEvent
from ddtrace.debugging._probe.remoteconfig import ProbePollerEventType
from ddtrace.debugging._probe.remoteconfig import ProbeRCAdapter
from ddtrace.debugging._probe.status import ProbeStatusLogger
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.collector import SignalContext
from ddtrace.debugging._signal.metric_sample import MetricSample
from ddtrace.debugging._signal.model import Signal
from ddtrace.debugging._signal.model import SignalState
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.tracing import DynamicSpan
from ddtrace.debugging._signal.tracing import SpanDecoration
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.internal import atexit
from ddtrace.internal import compat
from ddtrace.internal import forksafe
from ddtrace.internal.logger import get_logger
from ddtrace.internal.metrics import Metrics
from ddtrace.internal.module import ModuleHookType
from ddtrace.internal.module import ModuleWatchdog
from ddtrace.internal.module import origin
from ddtrace.internal.module import register_post_run_module_hook
from ddtrace.internal.module import unregister_post_run_module_hook
from ddtrace.internal.rate_limiter import BudgetRateLimiterWithJitter as RateLimiter
from ddtrace.internal.rate_limiter import RateLimitExceeded
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.safety import _isinstance
from ddtrace.internal.service import Service
from ddtrace.internal.wrapping import Wrapper


log = get_logger(__name__)

_probe_metrics = Metrics(namespace="dynamic.instrumentation.metric")
_probe_metrics.enable()


class DebuggerError(Exception):
    """Generic debugger error."""

    pass


class DebuggerModuleWatchdog(ModuleWatchdog):
    _locations: Set[str] = set()

    @classmethod
    def register_origin_hook(cls, origin: Path, hook: ModuleHookType) -> None:
        if origin in cls._locations:
            # We already have a hook for this origin, don't register a new one
            # but invoke it directly instead, if the module was already loaded.
            module = cls.get_by_origin(origin)
            if module is not None:
                hook(module)

            return

        cls._locations.add(str(origin))

        super().register_origin_hook(origin, hook)

    @classmethod
    def unregister_origin_hook(cls, origin: Path, hook: ModuleHookType) -> None:
        try:
            cls._locations.remove(str(origin))
        except KeyError:
            # Nothing to unregister.
            return

        return super().unregister_origin_hook(origin, hook)

    @classmethod
    def register_module_hook(cls, module_name: str, hook: ModuleHookType) -> None:
        if module_name in cls._locations:
            # We already have a hook for this origin, don't register a new one
            # but invoke it directly instead, if the module was already loaded.
            module = sys.modules.get(module_name)
            if module is not None:
                hook(module)

            return

        cls._locations.add(module_name)

        super().register_module_hook(module_name, hook)

    @classmethod
    def unregister_module_hook(cls, module_name: str, hook: ModuleHookType) -> None:
        try:
            cls._locations.remove(module_name)
        except KeyError:
            # Nothing to unregister.
            return

        return super().unregister_module_hook(module_name, hook)

    @classmethod
    def on_run_module(cls, module: ModuleType) -> None:
        if cls._instance is not None:
            # Treat run module as an import to trigger import hooks and register
            # the module's origin.
            cls._instance.after_import(module)


class Debugger(Service):
    _instance: Optional["Debugger"] = None
    _probe_meter = _probe_metrics.get_meter("probe")
    _span_processor: Optional[SpanExceptionProcessor] = None

    __rc_adapter__ = ProbeRCAdapter
    __uploader__ = LogsIntakeUploaderV1
    __collector__ = SignalCollector
    __watchdog__ = DebuggerModuleWatchdog
    __logger__ = ProbeStatusLogger

    @classmethod
    def enable(cls, run_module: bool = False) -> None:
        """Enable dynamic instrumentation

        This class method is idempotent. Dynamic instrumentation will be
        disabled automatically at exit.
        """
        if cls._instance is not None:
            log.debug("%s already enabled", cls.__name__)
            return

        log.debug("Enabling %s", cls.__name__)

        di_config.enabled = True

        cls.__watchdog__.install()

        if di_config.metrics:
            metrics.enable()

        cls._instance = debugger = cls()

        debugger.start()

        forksafe.register(cls._restart)
        atexit.register(cls.disable)
        register_post_run_module_hook(cls._on_run_module)

        log.debug("%s enabled", cls.__name__)

    @classmethod
    def disable(cls, join: bool = True) -> None:
        """Disable dynamic instrumentation.

        This class method is idempotent. Called automatically at exit, if
        dynamic instrumentation was enabled.
        """
        if cls._instance is None:
            log.debug("%s not enabled", cls.__name__)
            return

        log.debug("Disabling %s", cls.__name__)

        remoteconfig_poller.unregister("LIVE_DEBUGGING")

        forksafe.unregister(cls._restart)
        atexit.unregister(cls.disable)
        unregister_post_run_module_hook(cls._on_run_module)

        if cls._instance._span_processor:
            cls._instance._span_processor.unregister()

        cls._instance.stop(join=join)
        cls._instance = None

        cls.__watchdog__.uninstall()
        if di_config.metrics:
            metrics.disable()

        di_config.enabled = False

        log.debug("%s disabled", cls.__name__)

    def __init__(self, tracer: Optional[Tracer] = None) -> None:
        super().__init__()

        self._tracer = tracer or ddtrace.tracer
        service_name = di_config.service_name

        self._signal_queue = SignalQueue(
            encoder=LogSignalJsonEncoder(service_name),
            on_full=self._on_encoder_buffer_full,
        )
        self._status_logger = status_logger = self.__logger__(service_name)

        self._probe_registry = ProbeRegistry(status_logger=status_logger)
        self._uploader = self.__uploader__(self._signal_queue)
        self._collector = self.__collector__(self._signal_queue)
        self._services = [self._uploader]

        self._function_store = FunctionStore(extra_attrs=["__dd_wrappers__"])

        log_limiter = RateLimiter(limit_rate=1.0, raise_on_exceed=False)
        self._global_rate_limiter = RateLimiter(
            limit_rate=di_config.global_rate_limit,  # TODO: Make it configurable. Note that this is per-process!
            on_exceed=lambda: log_limiter.limit(log.warning, "Global rate limit exceeded"),
            call_once=True,
            raise_on_exceed=False,
        )

        if ed_config.enabled:
            from ddtrace.debugging._exception.auto_instrument import SpanExceptionProcessor

            self._span_processor = SpanExceptionProcessor(collector=self._collector)
            self._span_processor.register()
        else:
            self._span_processor = None

        if di_config.enabled:
            # TODO: this is only temporary and will be reverted once the DD_REMOTE_CONFIGURATION_ENABLED variable
            #  has been removed
            if ddconfig._remote_config_enabled is False:
                ddconfig._remote_config_enabled = True
                log.info("Disabled Remote Configuration enabled by Dynamic Instrumentation.")

            # Register the debugger with the RCM client.
            if not remoteconfig_poller.update_product_callback("LIVE_DEBUGGING", self._on_configuration):
                di_callback = self.__rc_adapter__(None, self._on_configuration, status_logger=status_logger)
                remoteconfig_poller.register("LIVE_DEBUGGING", di_callback)

        log.debug("%s initialized (service name: %s)", self.__class__.__name__, service_name)

    def _on_encoder_buffer_full(self, item, encoded):
        # type (Any, bytes) -> None
        # Send upload request
        self._uploader.upload()

    def _dd_debugger_hook(self, probe: Probe) -> None:
        """Debugger probe hook.

        This gets called with a reference to the probe. We only check whether
        the probe is active. If so, we push the collected data to the collector
        for bulk processing. This way we avoid adding delay while the
        instrumented code is running.
        """
        try:
            actual_frame = sys._getframe(1)
            signal: Optional[Signal] = None
            if isinstance(probe, MetricLineProbe):
                signal = MetricSample(
                    probe=probe,
                    frame=actual_frame,
                    thread=threading.current_thread(),
                    trace_context=self._tracer.current_trace_context(),
                    meter=self._probe_meter,
                )
            elif isinstance(probe, LogLineProbe):
                if probe.take_snapshot:
                    # TODO: Global limit evaluated before probe conditions
                    if self._global_rate_limiter.limit() is RateLimitExceeded:
                        return

                signal = Snapshot(
                    probe=probe,
                    frame=actual_frame,
                    thread=threading.current_thread(),
                    trace_context=self._tracer.current_trace_context(),
                )
            elif isinstance(probe, SpanDecorationLineProbe):
                signal = SpanDecoration(
                    probe=probe,
                    frame=actual_frame,
                    thread=threading.current_thread(),
                )
            else:
                log.error("Unsupported probe type: %r", type(probe))
                return

            signal.line()

            log.debug("[%s][P: %s] Debugger. Report signal %s", os.getpid(), os.getppid(), signal)
            self._collector.push(signal)

            if signal.state is SignalState.DONE:
                self._probe_registry.set_emitting(probe)

        except Exception:
            log.error("Failed to execute probe hook", exc_info=True)

    def _dd_debugger_wrapper(self, wrappers: Dict[str, FunctionProbe]) -> Wrapper:
        """Debugger wrapper.

        This gets called with a reference to the wrapped function and the probe,
        together with the arguments to pass. We only check
        whether the probe is active and the debugger is enabled. If so, we
        capture all the relevant debugging context.
        """

        def _(wrapped: FunctionType, args: Tuple[Any], kwargs: Dict[str, Any]) -> Any:
            if not wrappers:
                return wrapped(*args, **kwargs)

            argnames = wrapped.__code__.co_varnames
            actual_frame = sys._getframe(1)
            allargs = list(chain(zip(argnames, args), kwargs.items()))
            thread = threading.current_thread()

            open_contexts: Deque[SignalContext] = deque()
            signal: Optional[Signal] = None

            # Group probes on the basis of whether they create new context.
            context_creators: List[Probe] = []
            context_consumers: List[Probe] = []
            for p in wrappers.values():
                (context_creators if p.__context_creator__ else context_consumers).append(p)

            # Trigger the context creators first, so that the new context can be
            # consumed by the consumers.
            for probe in chain(context_creators, context_consumers):
                # Because new context might be created, we need to recompute it
                # for each probe.
                trace_context = self._tracer.current_trace_context()

                if isinstance(probe, MetricFunctionProbe):
                    signal = MetricSample(
                        probe=probe,
                        frame=actual_frame,
                        thread=thread,
                        args=allargs,
                        trace_context=trace_context,
                        meter=self._probe_meter,
                    )
                elif isinstance(probe, LogFunctionProbe):
                    signal = Snapshot(
                        probe=probe,
                        frame=actual_frame,
                        thread=thread,
                        args=allargs,
                        trace_context=trace_context,
                    )
                elif isinstance(probe, SpanFunctionProbe):
                    signal = DynamicSpan(
                        probe=probe,
                        frame=actual_frame,
                        thread=thread,
                        args=allargs,
                        trace_context=trace_context,
                    )
                elif isinstance(probe, SpanDecorationFunctionProbe):
                    signal = SpanDecoration(
                        probe=probe,
                        frame=actual_frame,
                        thread=thread,
                        args=allargs,
                    )
                else:
                    log.error("Unsupported probe type: %s", type(probe))
                    continue

                # Open probe signal contexts are ordered, with those that have
                # created new tracing context first. We need to finalise them in
                # reverse order, so we append them to the beginning.
                open_contexts.appendleft(self._collector.attach(signal))

            if not open_contexts:
                return wrapped(*args, **kwargs)

            start_time = compat.monotonic_ns()
            try:
                retval = wrapped(*args, **kwargs)
                end_time = compat.monotonic_ns()
                exc_info = (None, None, None)
            except Exception:
                end_time = compat.monotonic_ns()
                retval = None
                exc_info = sys.exc_info()  # type: ignore[assignment]
            else:
                # DEV: We do not unwind generators here as they might result in
                # tight loops. We return the result as a generator object
                # instead.
                if _isinstance(retval, CoroutineType):
                    return dd_coroutine_wrapper(retval, open_contexts)

            for context in open_contexts:
                context.exit(retval, exc_info, end_time - start_time)
                signal = context.signal
                if signal.state is SignalState.DONE:
                    self._probe_registry.set_emitting(signal.probe)

            exc = exc_info[1]
            if exc is not None:
                raise exc

            return retval

        return _

    def _probe_injection_hook(self, module: ModuleType) -> None:
        # This hook is invoked by the ModuleWatchdog or the post run module hook
        # to inject probes.

        # Group probes by function so that we decompile each function once and
        # bulk-inject the probes.
        probes_for_function: Dict[FullyNamedWrappedFunction, List[Probe]] = defaultdict(list)
        for probe in self._probe_registry.get_pending(str(origin(module))):
            if not isinstance(probe, LineLocationMixin):
                continue
            line = probe.line
            assert line is not None  # nosec
            functions = FunctionDiscovery.from_module(module).at_line(line)
            if not functions:
                module_origin = str(origin(module))
                if linecache.getline(module_origin, line):
                    # The source actually has a line at the given line number
                    message = (
                        f"Cannot install probe {probe.probe_id}: "
                        f"function at line {line} within source file {module_origin} "
                        "is likely decorated with an unsupported decorator."
                    )
                else:
                    message = (
                        f"Cannot install probe {probe.probe_id}: "
                        f"no functions at line {line} within source file {module_origin} found"
                    )
                log.error(message)
                self._probe_registry.set_error(probe, "NoFunctionsAtLine", message)
                continue
            for function in (cast(FullyNamedWrappedFunction, _) for _ in functions):
                probes_for_function[function].append(cast(LineProbe, probe))

        for function, probes in probes_for_function.items():
            failed = self._function_store.inject_hooks(
                function, [(self._dd_debugger_hook, cast(LineProbe, probe).line, probe) for probe in probes]
            )

            for probe in probes:
                if probe.probe_id in failed:
                    self._probe_registry.set_error(probe, "InjectionFailure", "Failed to inject")
                else:
                    self._probe_registry.set_installed(probe)

            if failed:
                log.error("[%s][P: %s] Failed to inject probes %r", os.getpid(), os.getppid(), failed)

            log.debug(
                "[%s][P: %s] Injected probes %r in %r",
                os.getpid(),
                os.getppid(),
                [probe.probe_id for probe in probes if probe.probe_id not in failed],
                function,
            )

    def _inject_probes(self, probes: List[LineProbe]) -> None:
        for probe in probes:
            if probe not in self._probe_registry:
                if len(self._probe_registry) >= di_config.max_probes:
                    log.warning("Too many active probes. Ignoring new ones.")
                    return
                log.debug("[%s][P: %s] Received new %s.", os.getpid(), os.getppid(), probe)
                self._probe_registry.register(probe)

            resolved_source = probe.source_file
            if resolved_source is None:
                log.error(
                    "Cannot inject probe %s: source file %s cannot be resolved", probe.probe_id, probe.source_file
                )
                self._probe_registry.set_error(probe, "NoSourceFile", "Source file location cannot be resolved")
                continue

        for source in {probe.source_file for probe in probes if probe.source_file is not None}:
            try:
                self.__watchdog__.register_origin_hook(source, self._probe_injection_hook)
            except Exception as exc:
                for probe in probes:
                    if probe.source_file != source:
                        continue
                    exc_type = type(exc)
                    self._probe_registry.set_error(probe, exc_type.__name__, str(exc))
                log.error("Cannot register probe injection hook on source '%s'", source, exc_info=True)

    def _eject_probes(self, probes_to_eject: List[LineProbe]) -> None:
        # TODO[perf]: Bulk-collect probes as for injection. This is lower
        # priority as probes are normally removed manually by users.
        unregistered_probes: List[LineProbe] = []
        for probe in probes_to_eject:
            if probe not in self._probe_registry:
                log.error("Attempted to eject unregistered probe %r", probe)
                continue

            (registered_probe,) = self._probe_registry.unregister(probe)
            unregistered_probes.append(cast(LineProbe, registered_probe))

        probes_for_source: Dict[Path, List[LineProbe]] = defaultdict(list)
        for probe in unregistered_probes:
            if probe.source_file is None:
                continue
            probes_for_source[probe.source_file].append(probe)

        for resolved_source, probes in probes_for_source.items():
            module = self.__watchdog__.get_by_origin(resolved_source)
            if module is not None:
                # The module is still loaded, so we can try to eject the hooks
                probes_for_function: Dict[FullyNamedWrappedFunction, List[LineProbe]] = defaultdict(list)
                for probe in probes:
                    if not isinstance(probe, LineLocationMixin):
                        continue
                    line = probe.line
                    assert line is not None, probe  # nosec
                    functions = FunctionDiscovery.from_module(module).at_line(line)
                    for function in (cast(FullyNamedWrappedFunction, _) for _ in functions):
                        probes_for_function[function].append(probe)

                for function, ps in probes_for_function.items():
                    failed = self._function_store.eject_hooks(
                        cast(FunctionType, function),
                        [(self._dd_debugger_hook, probe.line, probe) for probe in ps if probe.line is not None],
                    )
                    for probe in ps:
                        if probe.probe_id in failed:
                            log.error("Failed to eject %r from %r", probe, function)
                        else:
                            log.debug("Ejected %r from %r", probe, function)

            if not self._probe_registry.has_probes(str(resolved_source)):
                try:
                    self.__watchdog__.unregister_origin_hook(resolved_source, self._probe_injection_hook)
                    log.debug("Unregistered injection hook on source '%s'", resolved_source)
                except ValueError:
                    log.error("Cannot unregister injection hook for %r", probe, exc_info=True)

    def _probe_wrapping_hook(self, module: ModuleType) -> None:
        probes = self._probe_registry.get_pending(module.__name__)
        for probe in probes:
            if not isinstance(probe, FunctionLocationMixin):
                continue

            try:
                assert probe.module is not None and probe.func_qname is not None  # nosec
                function = FunctionDiscovery.from_module(module).by_name(probe.func_qname)
            except ValueError:
                message = (
                    f"Cannot install probe {probe.probe_id}: no function '{probe.func_qname}' in module {probe.module}"
                    "found (note: if the function exists, it might be decorated with an unsupported decorator)"
                )
                self._probe_registry.set_error(probe, "NoFunctionInModule", message)
                log.error(message)
                continue

            if hasattr(function, "__dd_wrappers__"):
                # TODO: Check if this can be made into a set instead
                wrapper = cast(FullyNamedWrappedFunction, function)
                assert wrapper.__dd_wrappers__, "Function has debugger wrappers"  # nosec
                wrapper.__dd_wrappers__[probe.probe_id] = probe
                log.debug(
                    "[%s][P: %s] Function probe %r added to already wrapped %r",
                    os.getpid(),
                    os.getppid(),
                    probe.probe_id,
                    function,
                )
            else:
                wrappers = cast(FullyNamedWrappedFunction, function).__dd_wrappers__ = {probe.probe_id: probe}
                self._function_store.wrap(cast(FunctionType, function), self._dd_debugger_wrapper(wrappers))
                log.debug(
                    "[%s][P: %s] Function probe %r wrapped around %r",
                    os.getpid(),
                    os.getppid(),
                    probe.probe_id,
                    function,
                )
            self._probe_registry.set_installed(probe)

    def _wrap_functions(self, probes: List[FunctionProbe]) -> None:
        for probe in probes:
            if len(self._probe_registry) >= di_config.max_probes:
                log.warning("Too many active probes. Ignoring new ones.")
                return

            self._probe_registry.register(probe)
            try:
                assert probe.module is not None  # nosec
                self.__watchdog__.register_module_hook(probe.module, self._probe_wrapping_hook)
            except Exception as exc:
                exc_type = type(exc)
                self._probe_registry.set_error(probe, exc_type.__name__, str(exc))
                log.error("Cannot register probe wrapping hook on module '%s'", probe.module, exc_info=True)

    def _unwrap_functions(self, probes: List[FunctionProbe]) -> None:
        # Keep track of all the modules involved to see if there are any import
        # hooks that we can clean up at the end.
        touched_modules: Set[str] = set()

        for probe in probes:
            registered_probes = self._probe_registry.unregister(probe)
            if not registered_probes:
                log.error("Attempted to eject unregistered probe %r", probe)
                continue

            (registered_probe,) = registered_probes

            assert probe.module is not None  # nosec
            module = sys.modules.get(probe.module, None)
            if module is not None:
                # The module is still loaded, so we can try to unwrap the function
                touched_modules.add(probe.module)
                assert probe.func_qname is not None  # nosec
                function = FunctionDiscovery.from_module(module).by_name(probe.func_qname)
                if hasattr(function, "__dd_wrappers__"):
                    wrapper = cast(FullyNamedWrappedFunction, function)
                    assert wrapper.__dd_wrappers__, "Function has debugger wrappers"  # nosec
                    del wrapper.__dd_wrappers__[probe.probe_id]
                    if not wrapper.__dd_wrappers__:
                        del wrapper.__dd_wrappers__
                        self._function_store.unwrap(wrapper)
                    log.debug("Unwrapped %r", registered_probe)
                else:
                    log.error("Attempted to unwrap %r, but no wrapper found", registered_probe)

        # Clean up import hooks.
        for module_name in touched_modules:
            if not self._probe_registry.has_probes(module_name):
                try:
                    self.__watchdog__.unregister_module_hook(module_name, self._probe_wrapping_hook)
                    log.debug("Unregistered wrapping import hook on module %s", module_name)
                except ValueError:
                    log.error("Cannot unregister wrapping import hook for module %r", module_name, exc_info=True)

    def _on_configuration(self, event: ProbePollerEventType, probes: Iterable[Probe]) -> None:
        log.debug("[%s][P: %s] Received poller event %r with probes %r", os.getpid(), os.getppid(), event, probes)

        if event == ProbePollerEvent.STATUS_UPDATE:
            self._probe_registry.log_probes_status()
            return

        if event == ProbePollerEvent.MODIFIED_PROBES:
            for probe in probes:
                if probe in self._probe_registry:
                    registered_probe = self._probe_registry.get(probe.probe_id)
                    if registered_probe is None:
                        # We didn't have the probe. This shouldn't have happened!
                        log.error("Modified probe %r was not found in registry.", probe)
                        continue
                    self._probe_registry.update(probe)

            return

        line_probes: List[LineProbe] = []
        function_probes: List[FunctionProbe] = []
        for probe in probes:
            if isinstance(probe, LineLocationMixin):
                line_probes.append(cast(LineProbe, probe))
            elif isinstance(probe, FunctionLocationMixin):
                function_probes.append(cast(FunctionProbe, probe))
            else:
                log.warning("Skipping probe '%r': not supported.", probe)

        if event == ProbePollerEvent.NEW_PROBES:
            self._inject_probes(line_probes)
            self._wrap_functions(function_probes)
        elif event == ProbePollerEvent.DELETED_PROBES:
            self._eject_probes(line_probes)
            self._unwrap_functions(function_probes)
        else:
            raise ValueError("Unknown probe poller event %r" % event)

    def _stop_service(self, join: bool = True) -> None:
        self._function_store.restore_all()
        for service in self._services:
            service.stop()
            if join:
                service.join()

    def _start_service(self) -> None:
        for service in self._services:
            log.debug("[%s][P: %s] Debugger. Start service %s", os.getpid(), os.getppid(), service)
            service.start()

    @classmethod
    def _restart(cls):
        log.info("[%s][P: %s] Restarting the debugger in child process", os.getpid(), os.getppid())
        cls.disable(join=False)
        cls.enable()

    @classmethod
    def _on_run_module(cls, module: ModuleType) -> None:
        debugger = cls._instance
        if debugger is not None:
            debugger.__watchdog__.on_run_module(module)
