# Copyright (C) 2012 Canonical Ltd.
# Copyright (C) 2012, 2013 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2012 Yahoo! Inc.
#
# This file is part of cloud-init. See LICENSE file for license information.

import copy
import json
import os
import sys
from collections import namedtuple
from typing import Dict, Iterable, List, Optional, Set

from cloudinit import (
    atomic_helper,
    cloud,
    distros,
    features,
    handlers,
    helpers,
    importer,
)
from cloudinit import log as logging
from cloudinit import net, sources, type_utils, util
from cloudinit.event import EventScope, EventType, userdata_to_events

# Default handlers (used if not overridden)
from cloudinit.handlers.boot_hook import BootHookPartHandler
from cloudinit.handlers.cloud_config import CloudConfigPartHandler
from cloudinit.handlers.jinja_template import JinjaTemplatePartHandler
from cloudinit.handlers.shell_script import ShellScriptPartHandler
from cloudinit.handlers.shell_script_by_frequency import (
    ShellScriptByFreqPartHandler,
)
from cloudinit.net import cmdline
from cloudinit.reporting import events
from cloudinit.settings import (
    CLOUD_CONFIG,
    PER_ALWAYS,
    PER_INSTANCE,
    PER_ONCE,
    RUN_CLOUD_CONFIG,
)
from cloudinit.sources import NetworkConfigSource

LOG = logging.getLogger(__name__)

NO_PREVIOUS_INSTANCE_ID = "NO_PREVIOUS_INSTANCE_ID"


COMBINED_CLOUD_CONFIG_DOC = (
    "Aggregated cloud-config created by merging merged_system_cfg"
    " (/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d), metadata,"
    " vendordata and userdata. The combined_cloud_config represents"
    " the aggregated desired configuration acted upon by cloud-init."
)


def update_event_enabled(
    datasource: sources.DataSource,
    cfg: dict,
    event_source_type: EventType,
    scope: Optional[EventScope] = None,
) -> bool:
    """Determine if a particular EventType is enabled.

    For the `event_source_type` passed in, check whether this EventType
    is enabled in the `updates` section of the userdata. If `updates`
    is not enabled in userdata, check if defined as one of the
    `default_events` on the datasource. `scope` may be used to
    narrow the check to a particular `EventScope`.

    Note that on first boot, userdata may NOT be available yet. In this
    case, we only have the data source's `default_update_events`,
    so an event that should be enabled in userdata may be denied.
    """
    default_events: Dict[
        EventScope, Set[EventType]
    ] = datasource.default_update_events
    user_events: Dict[EventScope, Set[EventType]] = userdata_to_events(
        cfg.get("updates", {})
    )
    # A value in the first will override a value in the second
    allowed = util.mergemanydict(
        [
            copy.deepcopy(user_events),
            copy.deepcopy(default_events),
        ]
    )
    LOG.debug("Allowed events: %s", allowed)

    scopes: Iterable[EventScope]
    if not scope:
        scopes = allowed.keys()
    else:
        scopes = [scope]
    scope_values = [s.value for s in scopes]

    for evt_scope in scopes:
        if event_source_type in allowed.get(evt_scope, []):
            LOG.debug(
                "Event Allowed: scope=%s EventType=%s",
                evt_scope.value,
                event_source_type,
            )
            return True

    LOG.debug(
        "Event Denied: scopes=%s EventType=%s", scope_values, event_source_type
    )
    return False


class Init:
    def __init__(self, ds_deps: Optional[List[str]] = None, reporter=None):
        if ds_deps is not None:
            self.ds_deps = ds_deps
        else:
            self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK]
        # Created on first use
        self._cfg: Optional[dict] = None
        self._paths: Optional[helpers.Paths] = None
        self._distro: Optional[distros.Distro] = None
        # Changed only when a fetch occurs
        self.datasource: Optional[sources.DataSource] = None
        self.ds_restored = False
        self._previous_iid = None

        if reporter is None:
            reporter = events.ReportEventStack(
                name="init-reporter",
                description="init-desc",
                reporting_enabled=False,
            )
        self.reporter = reporter

    def _reset(self, reset_ds=False):
        # Recreated on access
        self._cfg = None
        self._paths = None
        self._distro = None
        if reset_ds:
            self.datasource = None
            self.ds_restored = False

    @property
    def distro(self):
        if not self._distro:
            # Try to find the right class to use
            system_config = self._extract_cfg("system")
            distro_name = system_config.pop("distro", "ubuntu")
            distro_cls = distros.fetch(distro_name)
            LOG.debug("Using distro class %s", distro_cls)
            self._distro = distro_cls(distro_name, system_config, self.paths)
            # If we have an active datasource we need to adjust
            # said datasource and move its distro/system config
            # from whatever it was to a new set...
            if self.datasource is not None:
                self.datasource.distro = self._distro
                self.datasource.sys_cfg = self.cfg
        return self._distro

    @property
    def cfg(self):
        return self._extract_cfg("restricted")

    def _extract_cfg(self, restriction):
        # Ensure actually read
        self.read_cfg()
        # Nobody gets the real config
        ocfg = copy.deepcopy(self._cfg)
        if restriction == "restricted":
            ocfg.pop("system_info", None)
        elif restriction == "system":
            ocfg = util.get_cfg_by_path(ocfg, ("system_info",), {})
        elif restriction == "paths":
            ocfg = util.get_cfg_by_path(ocfg, ("system_info", "paths"), {})
        if not isinstance(ocfg, (dict)):
            ocfg = {}
        return ocfg

    @property
    def paths(self):
        if not self._paths:
            path_info = self._extract_cfg("paths")
            self._paths = helpers.Paths(path_info, self.datasource)
        return self._paths

    def _initial_subdirs(self):
        c_dir = self.paths.cloud_dir
        run_dir = self.paths.run_dir
        initial_dirs = [
            c_dir,
            os.path.join(c_dir, "scripts"),
            os.path.join(c_dir, "scripts", "per-instance"),
            os.path.join(c_dir, "scripts", "per-once"),
            os.path.join(c_dir, "scripts", "per-boot"),
            os.path.join(c_dir, "scripts", "vendor"),
            os.path.join(c_dir, "seed"),
            os.path.join(c_dir, "instances"),
            os.path.join(c_dir, "handlers"),
            os.path.join(c_dir, "sem"),
            os.path.join(c_dir, "data"),
            os.path.join(run_dir, "sem"),
        ]
        return initial_dirs

    def purge_cache(self, rm_instance_lnk=False):
        rm_list = [self.paths.boot_finished]
        if rm_instance_lnk:
            rm_list.append(self.paths.instance_link)
        for f in rm_list:
            util.del_file(f)
        return len(rm_list)

    def initialize(self):
        self._initialize_filesystem()

    def _initialize_filesystem(self):
        mode = 0o640
        fmode = None

        util.ensure_dirs(self._initial_subdirs())
        log_file = util.get_cfg_option_str(self.cfg, "def_log_file")
        if log_file:
            # At this point the log file should have already been created
            # in the setupLogging function of log.py

            try:
                fmode = util.get_permissions(log_file)
            except OSError:
                pass

            # if existing file mode fmode is stricter, do not change it.
            if fmode and util.compare_permission(fmode, mode) < 0:
                mode = fmode

            util.ensure_file(log_file, mode, preserve_mode=False)
            perms = self.cfg.get("syslog_fix_perms")
            if not perms:
                perms = {}
            if not isinstance(perms, list):
                perms = [perms]

            error = None
            for perm in perms:
                u, g = util.extract_usergroup(perm)
                try:
                    util.chownbyname(log_file, u, g)
                    return
                except OSError as e:
                    error = e

            LOG.warning(
                "Failed changing perms on '%s'. tried: %s. %s",
                log_file,
                ",".join(perms),
                error,
            )

    def read_cfg(self, extra_fns=None):
        # None check so that we don't keep on re-loading if empty
        if self._cfg is None:
            self._cfg = self._read_cfg(extra_fns)
            # LOG.debug("Loaded 'init' config %s", self._cfg)

    def _read_cfg(self, extra_fns):
        no_cfg_paths = helpers.Paths({}, self.datasource)
        instance_data_file = no_cfg_paths.get_runpath(
            "instance_data_sensitive"
        )
        merger = helpers.ConfigMerger(
            paths=no_cfg_paths,
            datasource=self.datasource,
            additional_fns=extra_fns,
            base_cfg=fetch_base_config(instance_data_file=instance_data_file),
        )
        return merger.cfg

    def _restore_from_cache(self):
        # We try to restore from a current link and static path
        # by using the instance link, if purge_cache was called
        # the file wont exist.
        return sources.pkl_load(self.paths.get_ipath_cur("obj_pkl"))

    def _write_to_cache(self):
        if self.datasource is None:
            return False
        if util.get_cfg_option_bool(self.cfg, "manual_cache_clean", False):
            # The empty file in instance/ dir indicates manual cleaning,
            # and can be read by ds-identify.
            util.write_file(
                self.paths.get_ipath_cur("manual_clean_marker"),
                omode="w",
                content="",
            )
        return sources.pkl_store(
            self.datasource, self.paths.get_ipath_cur("obj_pkl")
        )

    def _get_datasources(self):
        # Any config provided???
        pkg_list = self.cfg.get("datasource_pkg_list") or []
        # Add the defaults at the end
        for n in ["", type_utils.obj_name(sources)]:
            if n not in pkg_list:
                pkg_list.append(n)
        cfg_list = self.cfg.get("datasource_list") or []
        return (cfg_list, pkg_list)

    def _restore_from_checked_cache(self, existing):
        if existing not in ("check", "trust"):
            raise ValueError("Unexpected value for existing: %s" % existing)

        ds = self._restore_from_cache()
        if not ds:
            return (None, "no cache found")

        run_iid_fn = self.paths.get_runpath("instance_id")
        if os.path.exists(run_iid_fn):
            run_iid = util.load_file(run_iid_fn).strip()
        else:
            run_iid = None

        if run_iid == ds.get_instance_id():
            return (ds, "restored from cache with run check: %s" % ds)
        elif existing == "trust":
            return (ds, "restored from cache: %s" % ds)
        else:
            if hasattr(ds, "check_instance_id") and ds.check_instance_id(
                self.cfg
            ):
                return (ds, "restored from checked cache: %s" % ds)
            else:
                return (None, "cache invalid in datasource: %s" % ds)

    def _get_data_source(self, existing) -> sources.DataSource:
        if self.datasource is not None:
            return self.datasource

        with events.ReportEventStack(
            name="check-cache",
            description="attempting to read from cache [%s]" % existing,
            parent=self.reporter,
        ) as myrep:

            ds, desc = self._restore_from_checked_cache(existing)
            myrep.description = desc
            self.ds_restored = bool(ds)
            LOG.debug(myrep.description)

        if not ds:
            util.del_file(self.paths.instance_link)
            (cfg_list, pkg_list) = self._get_datasources()
            # Deep copy so that user-data handlers can not modify
            # (which will affect user-data handlers down the line...)
            (ds, dsname) = sources.find_source(
                self.cfg,
                self.distro,
                self.paths,
                copy.deepcopy(self.ds_deps),
                cfg_list,
                pkg_list,
                self.reporter,
            )
            LOG.info("Loaded datasource %s - %s", dsname, ds)
        self.datasource = ds
        # Ensure we adjust our path members datasource
        # now that we have one (thus allowing ipath to be used)
        self._reset()
        return ds

    def _get_instance_subdirs(self):
        return ["handlers", "scripts", "sem"]

    def _get_ipath(self, subname=None):
        # Force a check to see if anything
        # actually comes back, if not
        # then a datasource has not been assigned...
        instance_dir = self.paths.get_ipath(subname)
        if not instance_dir:
            raise RuntimeError(
                "No instance directory is available."
                " Has a datasource been fetched??"
            )
        return instance_dir

    def _reflect_cur_instance(self):
        # Remove the old symlink and attach a new one so
        # that further reads/writes connect into the right location
        idir = self._get_ipath()
        util.del_file(self.paths.instance_link)
        util.sym_link(idir, self.paths.instance_link)

        # Ensures these dirs exist
        dir_list = []
        for d in self._get_instance_subdirs():
            dir_list.append(os.path.join(idir, d))
        util.ensure_dirs(dir_list)

        # Write out information on what is being used for the current instance
        # and what may have been used for a previous instance...
        dp = self.paths.get_cpath("data")

        # Write what the datasource was and is..
        ds = "%s: %s" % (type_utils.obj_name(self.datasource), self.datasource)
        previous_ds = None
        ds_fn = os.path.join(idir, "datasource")
        try:
            previous_ds = util.load_file(ds_fn).strip()
        except Exception:
            pass
        if not previous_ds:
            previous_ds = ds
        util.write_file(ds_fn, "%s\n" % ds)
        util.write_file(
            os.path.join(dp, "previous-datasource"), "%s\n" % (previous_ds)
        )

        # What the instance id was and is...
        iid = self.datasource.get_instance_id()
        iid_fn = os.path.join(dp, "instance-id")

        previous_iid = self.previous_iid()
        util.write_file(iid_fn, "%s\n" % iid)
        util.write_file(self.paths.get_runpath("instance_id"), "%s\n" % iid)
        util.write_file(
            os.path.join(dp, "previous-instance-id"), "%s\n" % (previous_iid)
        )

        self._write_to_cache()
        # Ensure needed components are regenerated
        # after change of instance which may cause
        # change of configuration
        self._reset()
        return iid

    def previous_iid(self):
        if self._previous_iid is not None:
            return self._previous_iid

        dp = self.paths.get_cpath("data")
        iid_fn = os.path.join(dp, "instance-id")
        try:
            self._previous_iid = util.load_file(iid_fn).strip()
        except Exception:
            self._previous_iid = NO_PREVIOUS_INSTANCE_ID

        LOG.debug("previous iid found to be %s", self._previous_iid)
        return self._previous_iid

    def is_new_instance(self):
        """Return true if this is a new instance.

        If datasource has already been initialized, this will return False,
        even on first boot.
        """
        previous = self.previous_iid()
        ret = (
            previous == NO_PREVIOUS_INSTANCE_ID
            or previous != self.datasource.get_instance_id()
        )
        return ret

    def fetch(self, existing="check"):
        return self._get_data_source(existing=existing)

    def instancify(self):
        return self._reflect_cur_instance()

    def cloudify(self):
        # Form the needed options to cloudify our members
        return cloud.Cloud(
            self.datasource,
            self.paths,
            self.cfg,
            self.distro,
            helpers.Runners(self.paths),
            reporter=self.reporter,
        )

    def update(self):
        self._store_rawdata(self.datasource.get_userdata_raw(), "userdata")
        self._store_processeddata(self.datasource.get_userdata(), "userdata")
        self._store_raw_vendordata(
            self.datasource.get_vendordata_raw(), "vendordata"
        )
        self._store_processeddata(
            self.datasource.get_vendordata(), "vendordata"
        )
        self._store_raw_vendordata(
            self.datasource.get_vendordata2_raw(), "vendordata2"
        )
        self._store_processeddata(
            self.datasource.get_vendordata2(), "vendordata2"
        )

    def setup_datasource(self):
        with events.ReportEventStack(
            "setup-datasource", "setting up datasource", parent=self.reporter
        ):
            if self.datasource is None:
                raise RuntimeError("Datasource is None, cannot setup.")
            self.datasource.setup(is_new_instance=self.is_new_instance())

    def activate_datasource(self):
        with events.ReportEventStack(
            "activate-datasource",
            "activating datasource",
            parent=self.reporter,
        ):
            if self.datasource is None:
                raise RuntimeError("Datasource is None, cannot activate.")
            self.datasource.activate(
                cfg=self.cfg, is_new_instance=self.is_new_instance()
            )
            self._write_to_cache()

    def _store_rawdata(self, data, datasource):
        # Raw data is bytes, not a string
        if data is None:
            data = b""
        util.write_file(self._get_ipath("%s_raw" % datasource), data, 0o600)

    def _store_raw_vendordata(self, data, datasource):
        # Only these data types
        if data is not None and type(data) not in [bytes, str, list]:
            raise TypeError(
                "vendordata_raw is unsupported type '%s'" % str(type(data))
            )
        # This data may be a list, convert it to a string if so
        if isinstance(data, list):
            data = atomic_helper.json_dumps(data)
        self._store_rawdata(data, datasource)

    def _store_processeddata(self, processed_data, datasource):
        # processed is a Mime message, so write as string.
        if processed_data is None:
            processed_data = ""
        util.write_file(
            self._get_ipath(datasource), str(processed_data), 0o600
        )

    def _default_handlers(self, opts=None) -> List[handlers.Handler]:
        if opts is None:
            opts = {}

        opts.update(
            {
                "paths": self.paths,
                "datasource": self.datasource,
            }
        )
        # TODO(harlowja) Hmmm, should we dynamically import these??
        cloudconfig_handler = CloudConfigPartHandler(**opts)
        shellscript_handler = ShellScriptPartHandler(**opts)
        def_handlers = [
            cloudconfig_handler,
            shellscript_handler,
            ShellScriptByFreqPartHandler(PER_ALWAYS, **opts),
            ShellScriptByFreqPartHandler(PER_INSTANCE, **opts),
            ShellScriptByFreqPartHandler(PER_ONCE, **opts),
            BootHookPartHandler(**opts),
            JinjaTemplatePartHandler(
                **opts, sub_handlers=[cloudconfig_handler, shellscript_handler]
            ),
        ]
        return def_handlers

    def _default_vendordata_handlers(self):
        return self._default_handlers(
            opts={
                "script_path": "vendor_scripts",
                "cloud_config_path": "vendor_cloud_config",
            }
        )

    def _default_vendordata2_handlers(self):
        return self._default_handlers(
            opts={
                "script_path": "vendor_scripts",
                "cloud_config_path": "vendor2_cloud_config",
            }
        )

    def _do_handlers(
        self, data_msg, c_handlers_list, frequency, excluded=None
    ):
        """
        Generalized handlers suitable for use with either vendordata
        or userdata
        """
        if excluded is None:
            excluded = []

        cdir = self.paths.get_cpath("handlers")
        idir = self._get_ipath("handlers")

        # Add the path to the plugins dir to the top of our list for importing
        # new handlers.
        #
        # Note(harlowja): instance dir should be read before cloud-dir
        for d in [cdir, idir]:
            if d and d not in sys.path:
                sys.path.insert(0, d)

        def register_handlers_in_dir(path):
            # Attempts to register any handler modules under the given path.
            if not path or not os.path.isdir(path):
                return
            potential_handlers = util.get_modules_from_dir(path)
            for (fname, mod_name) in potential_handlers.items():
                try:
                    mod_locs, looked_locs = importer.find_module(
                        mod_name, [""], ["list_types", "handle_part"]
                    )
                    if not mod_locs:
                        LOG.warning(
                            "Could not find a valid user-data handler"
                            " named %s in file %s (searched %s)",
                            mod_name,
                            fname,
                            looked_locs,
                        )
                        continue
                    mod = importer.import_module(mod_locs[0])
                    mod = handlers.fixup_handler(mod)
                    types = c_handlers.register(mod)
                    if types:
                        LOG.debug(
                            "Added custom handler for %s [%s] from %s",
                            types,
                            mod,
                            fname,
                        )
                except Exception:
                    util.logexc(
                        LOG, "Failed to register handler from %s", fname
                    )

        # This keeps track of all the active handlers
        c_handlers = helpers.ContentHandlers()

        # Add any handlers in the cloud-dir
        register_handlers_in_dir(cdir)

        # Register any other handlers that come from the default set. This
        # is done after the cloud-dir handlers so that the cdir modules can
        # take over the default user-data handler content-types.
        for mod in c_handlers_list:
            types = c_handlers.register(mod, overwrite=False)
            if types:
                LOG.debug("Added default handler for %s from %s", types, mod)

        # Form our cloud interface
        data = self.cloudify()

        def init_handlers():
            # Init the handlers first
            for (_ctype, mod) in c_handlers.items():
                if mod in c_handlers.initialized:
                    # Avoid initiating the same module twice (if said module
                    # is registered to more than one content-type).
                    continue
                handlers.call_begin(mod, data, frequency)
                c_handlers.initialized.append(mod)

        def walk_handlers(excluded):
            # Walk the user data
            part_data = {
                "handlers": c_handlers,
                # Any new handlers that are encountered get writen here
                "handlerdir": idir,
                "data": data,
                # The default frequency if handlers don't have one
                "frequency": frequency,
                # This will be used when new handlers are found
                # to help write their contents to files with numbered
                # names...
                "handlercount": 0,
                "excluded": excluded,
            }
            handlers.walk(data_msg, handlers.walker_callback, data=part_data)

        def finalize_handlers():
            # Give callbacks opportunity to finalize
            for (_ctype, mod) in c_handlers.items():
                if mod not in c_handlers.initialized:
                    # Said module was never inited in the first place, so lets
                    # not attempt to finalize those that never got called.
                    continue
                c_handlers.initialized.remove(mod)
                try:
                    handlers.call_end(mod, data, frequency)
                except Exception:
                    util.logexc(LOG, "Failed to finalize handler: %s", mod)

        try:
            init_handlers()
            walk_handlers(excluded)
        finally:
            finalize_handlers()

    def consume_data(self, frequency=PER_INSTANCE):
        # Consume the userdata first, because we need want to let the part
        # handlers run first (for merging stuff)
        with events.ReportEventStack(
            "consume-user-data",
            "reading and applying user-data",
            parent=self.reporter,
        ):
            if util.get_cfg_option_bool(self.cfg, "allow_userdata", True):
                self._consume_userdata(frequency)
            else:
                LOG.debug("allow_userdata = False: discarding user-data")

        with events.ReportEventStack(
            "consume-vendor-data",
            "reading and applying vendor-data",
            parent=self.reporter,
        ):
            self._consume_vendordata("vendordata", frequency)

        with events.ReportEventStack(
            "consume-vendor-data2",
            "reading and applying vendor-data2",
            parent=self.reporter,
        ):
            self._consume_vendordata("vendordata2", frequency)

        # Perform post-consumption adjustments so that
        # modules that run during the init stage reflect
        # this consumed set.
        #
        # They will be recreated on future access...
        self._reset()
        # Note(harlowja): the 'active' datasource will have
        # references to the previous config, distro, paths
        # objects before the load of the userdata happened,
        # this is expected.
        combined_cloud_cfg = copy.deepcopy(self.cfg)
        combined_cloud_cfg["_doc"] = COMBINED_CLOUD_CONFIG_DOC
        # Persist system_info key from /etc/cloud/cloud.cfg in both
        # combined_cloud_config file and instance-data-sensitive.json's
        # merged_system_cfg key.
        combined_cloud_cfg["system_info"] = self._extract_cfg("system")
        # Add features information to allow for file-based discovery of
        # feature settings.
        combined_cloud_cfg["features"] = features.get_features()
        atomic_helper.write_json(
            self.paths.get_runpath("combined_cloud_config"),
            combined_cloud_cfg,
            mode=0o600,
        )
        json_sensitive_file = self.paths.get_runpath("instance_data_sensitive")
        try:
            instance_json = util.load_json(util.load_file(json_sensitive_file))
        except (OSError, IOError) as e:
            LOG.warning(
                "Skipping write of system_info/features to %s."
                " Unable to read file: %s",
                json_sensitive_file,
                e,
            )
            return
        except (json.JSONDecodeError, TypeError) as e:
            LOG.warning(
                "Skipping write of system_info/features to %s."
                " Invalid JSON found: %s",
                json_sensitive_file,
                e,
            )
            return
        instance_json["system_info"] = combined_cloud_cfg["system_info"]
        instance_json["features"] = combined_cloud_cfg["features"]
        atomic_helper.write_json(
            json_sensitive_file,
            instance_json,
            mode=0o600,
        )

    def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE):
        """
        Consume the vendordata and run the part handlers on it
        """

        # User-data should have been consumed first.
        # So we merge the other available cloud-configs (everything except
        # vendor provided), and check whether or not we should consume
        # vendor data at all. That gives user or system a chance to override.
        if vendor_source == "vendordata":
            if not self.datasource.get_vendordata_raw():
                LOG.debug("no vendordata from datasource")
                return
            cfg_name = "vendor_data"
        elif vendor_source == "vendordata2":
            if not self.datasource.get_vendordata2_raw():
                LOG.debug("no vendordata2 from datasource")
                return
            cfg_name = "vendor_data2"
        else:
            raise RuntimeError(
                "vendor_source arg must be either 'vendordata'"
                " or 'vendordata2'"
            )

        _cc_merger = helpers.ConfigMerger(
            paths=self._paths,
            datasource=self.datasource,
            additional_fns=[],
            base_cfg=self.cfg,
            include_vendor=False,
        )
        vdcfg = _cc_merger.cfg.get(cfg_name, {})

        if not isinstance(vdcfg, dict):
            vdcfg = {"enabled": False}
            LOG.warning(
                "invalid %s setting. resetting to: %s", cfg_name, vdcfg
            )

        enabled = vdcfg.get("enabled")
        no_handlers = vdcfg.get("disabled_handlers", None)

        if not util.is_true(enabled):
            LOG.debug("%s consumption is disabled.", vendor_source)
            return

        if isinstance(enabled, str):
            util.deprecate(
                deprecated=f"Use of string '{enabled}' for "
                "'vendor_data:enabled' field",
                deprecated_version="23.1",
                extra_message="Use boolean value instead.",
            )

        LOG.debug(
            "%s will be consumed. disabled_handlers=%s",
            vendor_source,
            no_handlers,
        )

        # Ensure vendordata source fetched before activation (just in case.)

        # c_handlers_list keeps track of all the active handlers, while
        # excluding what the users doesn't want run, i.e. boot_hook,
        # cloud_config, shell_script
        if vendor_source == "vendordata":
            vendor_data_msg = self.datasource.get_vendordata()
            c_handlers_list = self._default_vendordata_handlers()
        else:
            vendor_data_msg = self.datasource.get_vendordata2()
            c_handlers_list = self._default_vendordata2_handlers()

        # Run the handlers
        self._do_handlers(
            vendor_data_msg, c_handlers_list, frequency, excluded=no_handlers
        )

    def _consume_userdata(self, frequency=PER_INSTANCE):
        """
        Consume the userdata and run the part handlers
        """

        # Ensure datasource fetched before activation (just incase)
        user_data_msg = self.datasource.get_userdata(True)

        # This keeps track of all the active handlers
        c_handlers_list = self._default_handlers()

        # Run the handlers
        self._do_handlers(user_data_msg, c_handlers_list, frequency)

    def _get_network_key_contents(self, cfg) -> dict:
        """
        Network configuration can be passed as a dict under a "network" key, or
        optionally at the top level. In both cases, return the config.
        """
        if cfg and "network" in cfg:
            return cfg["network"]
        return cfg

    def _find_networking_config(self):
        disable_file = os.path.join(
            self.paths.get_cpath("data"), "upgraded-network"
        )
        if os.path.exists(disable_file):
            return (None, disable_file)

        available_cfgs = {
            NetworkConfigSource.CMD_LINE: cmdline.read_kernel_cmdline_config(),
            NetworkConfigSource.INITRAMFS: cmdline.read_initramfs_config(),
            NetworkConfigSource.DS: None,
            NetworkConfigSource.SYSTEM_CFG: self.cfg.get("network"),
        }

        if self.datasource and hasattr(self.datasource, "network_config"):
            available_cfgs[
                NetworkConfigSource.DS
            ] = self.datasource.network_config

        if self.datasource:
            order = self.datasource.network_config_sources
        else:
            order = sources.DataSource.network_config_sources
        for cfg_source in order:
            if not isinstance(cfg_source, NetworkConfigSource):
                LOG.warning(
                    "data source specifies an invalid network cfg_source: %s",
                    cfg_source,
                )
                continue
            if cfg_source not in available_cfgs:
                LOG.warning(
                    "data source specifies an unavailable network"
                    " cfg_source: %s",
                    cfg_source,
                )
                continue
            ncfg = self._get_network_key_contents(available_cfgs[cfg_source])
            if net.is_disabled_cfg(ncfg):
                LOG.debug("network config disabled by %s", cfg_source)
                return (None, cfg_source)
            if ncfg:
                return (ncfg, cfg_source)
        if not self.cfg.get("network", True):
            LOG.warning("Empty network config found")
        return (
            self.distro.generate_fallback_config(),
            NetworkConfigSource.FALLBACK,
        )

    def _apply_netcfg_names(self, netcfg):
        try:
            LOG.debug("applying net config names for %s", netcfg)
            self.distro.networking.apply_network_config_names(netcfg)
        except Exception as e:
            LOG.warning("Failed to rename devices: %s", e)

    def _get_per_boot_network_semaphore(self):
        return namedtuple("Semaphore", "semaphore args")(
            helpers.FileSemaphores(self.paths.get_runpath("sem")),
            ("apply_network_config", PER_ONCE),
        )

    def _network_already_configured(self) -> bool:
        sem = self._get_per_boot_network_semaphore()
        return sem.semaphore.has_run(*sem.args)

    def apply_network_config(self, bring_up):
        """Apply the network config.

        Find the config, determine whether to apply it, apply it via
        the distro, and optionally bring it up
        """
        netcfg, src = self._find_networking_config()
        if netcfg is None:
            LOG.info("network config is disabled by %s", src)
            return

        def event_enabled_and_metadata_updated(event_type):
            return update_event_enabled(
                datasource=self.datasource,
                cfg=self.cfg,
                event_source_type=event_type,
                scope=EventScope.NETWORK,
            ) and self.datasource.update_metadata_if_supported([event_type])

        def should_run_on_boot_event():
            return (
                not self._network_already_configured()
                and event_enabled_and_metadata_updated(EventType.BOOT)
            )

        if (
            self.datasource is not None
            and not self.is_new_instance()
            and not should_run_on_boot_event()
            and not event_enabled_and_metadata_updated(EventType.BOOT_LEGACY)
        ):
            LOG.debug(
                "No network config applied. Neither a new instance"
                " nor datasource network update allowed"
            )
            # nothing new, but ensure proper names
            self._apply_netcfg_names(netcfg)
            return

        # refresh netcfg after update
        netcfg, src = self._find_networking_config()

        # ensure all physical devices in config are present
        self.distro.networking.wait_for_physdevs(netcfg)

        # apply renames from config
        self._apply_netcfg_names(netcfg)

        # rendering config
        LOG.info(
            "Applying network configuration from %s bringup=%s: %s",
            src,
            bring_up,
            netcfg,
        )

        sem = self._get_per_boot_network_semaphore()
        try:
            with sem.semaphore.lock(*sem.args):
                return self.distro.apply_network_config(
                    netcfg, bring_up=bring_up
                )
        except net.RendererNotFoundError as e:
            LOG.error(
                "Unable to render networking. Network config is "
                "likely broken: %s",
                e,
            )
            return
        except NotImplementedError:
            LOG.warning(
                "distro '%s' does not implement apply_network_config. "
                "networking may not be configured properly.",
                self.distro,
            )
            return


def read_runtime_config():
    return util.read_conf(RUN_CLOUD_CONFIG)


def fetch_base_config(*, instance_data_file=None) -> dict:
    return util.mergemanydict(
        [
            # builtin config, hardcoded in settings.py.
            util.get_builtin_cfg(),
            # Anything in your conf.d or 'default' cloud.cfg location.
            util.read_conf_with_confd(
                CLOUD_CONFIG, instance_data_file=instance_data_file
            ),
            # runtime config. I.e., /run/cloud-init/cloud.cfg
            read_runtime_config(),
            # Kernel/cmdline parameters override system config
            util.read_conf_from_cmdline(),
        ],
        reverse=True,
    )
