diff options
Diffstat (limited to 'src/debputy/plugin/api/impl.py')
-rw-r--r-- | src/debputy/plugin/api/impl.py | 1926 |
1 files changed, 1926 insertions, 0 deletions
diff --git a/src/debputy/plugin/api/impl.py b/src/debputy/plugin/api/impl.py new file mode 100644 index 0000000..e25713f --- /dev/null +++ b/src/debputy/plugin/api/impl.py @@ -0,0 +1,1926 @@ +import contextlib +import dataclasses +import functools +import importlib +import importlib.util +import itertools +import json +import os +import re +import subprocess +import sys +from abc import ABC +from json import JSONDecodeError +from typing import ( + Optional, + Callable, + Dict, + Tuple, + Iterable, + Sequence, + Type, + List, + Union, + Set, + Iterator, + IO, + Mapping, + AbstractSet, + cast, + FrozenSet, + Any, +) + +from debputy import DEBPUTY_DOC_ROOT_DIR +from debputy.exceptions import ( + DebputySubstitutionError, + PluginConflictError, + PluginMetadataError, + PluginBaseError, + PluginInitializationError, + PluginAPIViolationError, + PluginNotFoundError, +) +from debputy.maintscript_snippet import ( + STD_CONTROL_SCRIPTS, + MaintscriptSnippetContainer, + MaintscriptSnippet, +) +from debputy.manifest_parser.base_types import TypeMapping +from debputy.manifest_parser.exceptions import ManifestParseException +from debputy.manifest_parser.parser_data import ParserContextData +from debputy.manifest_parser.util import AttributePath +from debputy.plugin.api.feature_set import PluginProvidedFeatureSet +from debputy.plugin.api.impl_types import ( + DebputyPluginMetadata, + PackagerProvidedFileClassSpec, + MetadataOrMaintscriptDetector, + PluginProvidedTrigger, + TTP, + DIPHandler, + PF, + SF, + DIPKWHandler, + PluginProvidedManifestVariable, + PluginProvidedPackageProcessor, + PluginProvidedDiscardRule, + AutomaticDiscardRuleExample, + PPFFormatParam, + ServiceManagerDetails, + resolve_package_type_selectors, + KnownPackagingFileInfo, + PluginProvidedKnownPackagingFile, + InstallPatternDHCompatRule, + PluginProvidedTypeMapping, +) +from debputy.plugin.api.plugin_parser import ( + PLUGIN_METADATA_PARSER, + PluginJsonMetadata, + PLUGIN_PPF_PARSER, + PackagerProvidedFileJsonDescription, + PLUGIN_MANIFEST_VARS_PARSER, + PLUGIN_KNOWN_PACKAGING_FILES_PARSER, +) +from debputy.plugin.api.spec import ( + MaintscriptAccessor, + Maintscript, + DpkgTriggerType, + BinaryCtrlAccessor, + PackageProcessingContext, + MetadataAutoDetector, + PluginInitializationEntryPoint, + DebputyPluginInitializer, + PackageTypeSelector, + FlushableSubstvars, + ParserDocumentation, + PackageProcessor, + VirtualPath, + ServiceIntegrator, + ServiceDetector, + ServiceRegistry, + ServiceDefinition, + DSD, + ServiceUpgradeRule, + PackagerProvidedFileReferenceDocumentation, + packager_provided_file_reference_documentation, + TypeMappingDocumentation, +) +from debputy.substitution import ( + Substitution, + VariableNameState, + SUBST_VAR_RE, + VariableContext, +) +from debputy.util import ( + _normalize_path, + POSTINST_DEFAULT_CONDITION, + _error, + print_command, + _warn, +) + +PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") + + +def _validate_known_packaging_file_dh_compat_rules( + dh_compat_rules: Optional[List[InstallPatternDHCompatRule]], +) -> None: + max_compat = None + if not dh_compat_rules: + return + dh_compat_rule: InstallPatternDHCompatRule + for idx, dh_compat_rule in enumerate(dh_compat_rules): + dh_version = dh_compat_rule.get("starting_with_debhelper_version") + compat = dh_compat_rule.get("starting_with_compat_level") + + remaining = dh_compat_rule.keys() - { + "after_debhelper_version", + "starting_with_compat_level", + } + if not remaining: + raise ValueError( + f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?" + ) + if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: + raise ValueError( + f"The dh compat-rule at index {idx} is not the last and is missing either" + " before-debhelper-version or before-compat-level" + ) + if compat is not None and compat < 0: + raise ValueError( + f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" + f" for something that appeared when migrating from {compat} to {compat + 1}." + ) + + if max_compat is None: + max_compat = compat + elif compat is not None: + if compat >= max_compat: + raise ValueError( + f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." + ) + max_compat = compat + + install_pattern = dh_compat_rule.get("install_pattern") + if ( + install_pattern is not None + and _normalize_path(install_pattern, with_prefix=False) != install_pattern + ): + raise ValueError( + f"The install-pattern in dh compat-rule at {idx} must be normalized as" + f' "{_normalize_path(install_pattern, with_prefix=False)}".' + ) + + +class DebputyPluginInitializerProvider(DebputyPluginInitializer): + __slots__ = ( + "_plugin_metadata", + "_feature_set", + "_plugin_detector_ids", + "_substitution", + "_unloaders", + "_load_started", + ) + + def __init__( + self, + plugin_metadata: DebputyPluginMetadata, + feature_set: PluginProvidedFeatureSet, + substitution: Substitution, + ) -> None: + self._plugin_metadata: DebputyPluginMetadata = plugin_metadata + self._feature_set = feature_set + self._plugin_detector_ids: Set[str] = set() + self._substitution = substitution + self._unloaders: List[Callable[[], None]] = [] + self._load_started = False + + def unload_plugin(self) -> None: + if self._load_started: + for unloader in self._unloaders: + unloader() + del self._feature_set.plugin_data[self._plugin_name] + + def load_plugin(self) -> None: + metadata = self._plugin_metadata + if metadata.plugin_name in self._feature_set.plugin_data: + raise PluginConflictError( + f'The plugin "{metadata.plugin_name}" has already been loaded!?' + ) + assert ( + metadata.api_compat_version == 1 + ), f"Unsupported plugin API compat version {metadata.api_compat_version}" + self._feature_set.plugin_data[metadata.plugin_name] = metadata + self._load_started = True + assert not metadata.is_initialized + try: + metadata.initialize_plugin(self) + except Exception as e: + initializer = metadata.plugin_initializer + if ( + isinstance(e, TypeError) + and initializer is not None + and not callable(initializer) + ): + raise PluginMetadataError( + f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" + f" callable (callable returns False). The specified entry point identifies" + f' itself as "{initializer.__qualname__}".' + ) from e + elif isinstance(e, PluginBaseError): + raise + raise PluginInitializationError( + f"Exception while attempting to load plugin {metadata.plugin_name}" + ) from e + + def packager_provided_file( + self, + stem: str, + installed_path: str, + *, + default_mode: int = 0o0644, + default_priority: Optional[int] = None, + allow_name_segment: bool = True, + allow_architecture_segment: bool = False, + post_formatting_rewrite: Optional[Callable[[str], str]] = None, + packageless_is_fallback_for_all_packages: bool = False, + reservation_only: bool = False, + format_callback: Optional[ + Callable[[str, PPFFormatParam, VirtualPath], str] + ] = None, + reference_documentation: Optional[ + PackagerProvidedFileReferenceDocumentation + ] = None, + ) -> None: + packager_provided_files = self._feature_set.packager_provided_files + existing = packager_provided_files.get(stem) + + if format_callback is not None and self._plugin_name != "debputy": + raise ValueError( + "Sorry; Using format_callback is a debputy-internal" + f" API. Triggered by plugin {self._plugin_name}" + ) + + if installed_path.endswith("/"): + raise ValueError( + f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' + f" Triggered by plugin {self._plugin_name}." + ) + + installed_path = _normalize_path(installed_path) + + has_name_var = "{name}" in installed_path + + if installed_path.startswith("./DEBIAN") or reservation_only: + # Special-case, used for control files. + if self._plugin_name != "debputy": + raise ValueError( + "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" + f" API. Triggered by plugin {self._plugin_name}" + ) + elif not has_name_var and "{owning_package}" not in installed_path: + raise ValueError( + 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' + " substitution (or have installed_path end with a slash). Otherwise, the installed" + f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" + ) + + if allow_name_segment and not has_name_var: + raise ValueError( + 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' + " variable. Otherwise, the name segment will not work properly. Triggered by" + f" plugin {self._plugin_name}" + ) + + if ( + default_priority is not None + and "{priority}" not in installed_path + and "{priority:02}" not in installed_path + ): + raise ValueError( + 'When default_priority is not None, the installed_path should have a "{priority}"' + ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' + f" Triggered by plugin {self._plugin_name}" + ) + + if existing is not None: + if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: + message = ( + f'The stem "{stem}" is registered twice for packager provided files.' + f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" + f" by {self._plugin_name}" + ) + else: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' stem "{stem}" twice for packager provided files.' + ) + raise PluginConflictError( + message, existing.debputy_plugin_metadata, self._plugin_metadata + ) + packager_provided_files[stem] = PackagerProvidedFileClassSpec( + self._plugin_metadata, + stem, + installed_path, + default_mode=default_mode, + default_priority=default_priority, + allow_name_segment=allow_name_segment, + allow_architecture_segment=allow_architecture_segment, + post_formatting_rewrite=post_formatting_rewrite, + packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, + reservation_only=reservation_only, + formatting_callback=format_callback, + reference_documentation=reference_documentation, + ) + + def _unload() -> None: + del packager_provided_files[stem] + + self._unloaders.append(_unload) + + def metadata_or_maintscript_detector( + self, + auto_detector_id: str, + auto_detector: MetadataAutoDetector, + *, + package_type: PackageTypeSelector = "deb", + ) -> None: + if auto_detector_id in self._plugin_detector_ids: + raise ValueError( + f"The plugin {self._plugin_name} tried to register" + f' "{auto_detector_id}" twice' + ) + self._plugin_detector_ids.add(auto_detector_id) + all_detectors = self._feature_set.metadata_maintscript_detectors + if self._plugin_name not in all_detectors: + all_detectors[self._plugin_name] = [] + package_types = resolve_package_type_selectors(package_type) + all_detectors[self._plugin_name].append( + MetadataOrMaintscriptDetector( + detector_id=auto_detector_id, + detector=auto_detector, + plugin_metadata=self._plugin_metadata, + applies_to_package_types=package_types, + enabled=True, + ) + ) + + def _unload() -> None: + if self._plugin_name in all_detectors: + del all_detectors[self._plugin_name] + + self._unloaders.append(_unload) + + def document_builtin_variable( + self, + variable_name: str, + variable_reference_documentation: str, + *, + is_context_specific: bool = False, + is_for_special_case: bool = False, + ) -> None: + manifest_variables = self._feature_set.manifest_variables + self._restricted_api() + state = self._substitution.variable_state(variable_name) + if state == VariableNameState.UNDEFINED: + raise ValueError( + f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," + f" but it is not known to be a variable" + ) + + assert variable_name not in manifest_variables + + manifest_variables[variable_name] = PluginProvidedManifestVariable( + self._plugin_metadata, + variable_name, + None, + is_context_specific_variable=is_context_specific, + variable_reference_documentation=variable_reference_documentation, + is_documentation_placeholder=True, + is_for_special_case=is_for_special_case, + ) + + def _unload() -> None: + del manifest_variables[variable_name] + + self._unloaders.append(_unload) + + def manifest_variable_provider( + self, + provider: Callable[[VariableContext], Mapping[str, str]], + variables: Union[Sequence[str], Mapping[str, Optional[str]]], + ) -> None: + self._restricted_api() + cached_provider = functools.lru_cache(None)(provider) + permitted_variables = frozenset(variables) + variables_iter: Iterable[Tuple[str, Optional[str]]] + if not isinstance(variables, Mapping): + variables_iter = zip(variables, itertools.repeat(None)) + else: + variables_iter = variables.items() + + checked_vars = False + manifest_variables = self._feature_set.manifest_variables + plugin_name = self._plugin_name + + def _value_resolver_generator( + variable_name: str, + ) -> Callable[[VariableContext], str]: + def _value_resolver(variable_context: VariableContext) -> str: + res = cached_provider(variable_context) + nonlocal checked_vars + if not checked_vars: + if permitted_variables != res.keys(): + expected = ", ".join(sorted(permitted_variables)) + actual = ", ".join(sorted(res)) + raise PluginAPIViolationError( + f"The plugin {plugin_name} claimed to provide" + f" the following variables {expected}," + f" but when resolving the variables, the plugin provided" + f" {actual}. These two lists should have been the same." + ) + checked_vars = False + return res[variable_name] + + return _value_resolver + + for varname, vardoc in variables_iter: + self._check_variable_name(varname) + manifest_variables[varname] = PluginProvidedManifestVariable( + self._plugin_metadata, + varname, + _value_resolver_generator(varname), + is_context_specific_variable=False, + variable_reference_documentation=vardoc, + ) + + def _unload() -> None: + raise PluginInitializationError( + "Cannot unload manifest_variable_provider (not implemented)" + ) + + self._unloaders.append(_unload) + + def _check_variable_name(self, variable_name: str) -> None: + manifest_variables = self._feature_set.manifest_variables + existing = manifest_variables.get(variable_name) + + if existing is not None: + if existing.plugin_metadata.plugin_name == self._plugin_name: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' manifest variable "{variable_name}" twice.' + ) + else: + message = ( + f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" + f" both tried to provide the manifest variable {variable_name}" + ) + raise PluginConflictError( + message, existing.plugin_metadata, self._plugin_metadata + ) + if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): + raise ValueError( + f"The plugin {self._plugin_name} attempted to declare {variable_name}," + f" which is not a valid variable name" + ) + + namespace = "" + variable_basename = variable_name + if ":" in variable_name: + namespace, variable_basename = variable_name.rsplit(":", 1) + assert namespace != "" + assert variable_name != "" + + if namespace != "" and namespace not in ("token", "path"): + raise ValueError( + f"The plugin {self._plugin_name} attempted to declare {variable_name}," + f" which is in the reserved namespace {namespace}" + ) + + variable_name_upper = variable_name.upper() + if ( + variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) + or variable_basename.startswith("_") + or variable_basename.upper().startswith("DEBPUTY") + ) and self._plugin_name != "debputy": + raise ValueError( + f"The plugin {self._plugin_name} attempted to declare {variable_name}," + f" which is a variable name reserved by debputy" + ) + + state = self._substitution.variable_state(variable_name) + if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": + raise ValueError( + f"The plugin {self._plugin_name} attempted to declare {variable_name}," + f" which would shadow a built-in variable" + ) + + def package_processor( + self, + processor_id: str, + processor: PackageProcessor, + *, + depends_on_processor: Iterable[str] = tuple(), + package_type: PackageTypeSelector = "deb", + ) -> None: + self._restricted_api(allowed_plugins={"lua"}) + package_processors = self._feature_set.all_package_processors + dependencies = set() + processor_key = (self._plugin_name, processor_id) + + if processor_key in package_processors: + raise PluginConflictError( + f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", + self._plugin_metadata, + self._plugin_metadata, + ) + + for depends_ref in depends_on_processor: + if isinstance(depends_ref, str): + if (self._plugin_name, depends_ref) in package_processors: + depends_key = (self._plugin_name, depends_ref) + elif ("debputy", depends_ref) in package_processors: + depends_key = ("debputy", depends_ref) + else: + raise ValueError( + f'Could not resolve dependency "{depends_ref}" for' + f' "{processor_id}". It was not provided by the plugin itself' + f" ({self._plugin_name}) nor debputy." + ) + else: + # TODO: Add proper dependencies first, at which point we should probably resolve "name" + # via the direct dependencies. + assert False + + existing_processor = package_processors.get(depends_key) + if existing_processor is None: + # We currently require the processor to be declared already. If this ever changes, + # PluginProvidedFeatureSet.package_processors_in_order will need an update + dplugin_name, dprocessor_name = depends_key + available_processors = ", ".join( + n for p, n in package_processors.keys() if p == dplugin_name + ) + raise ValueError( + f"The plugin {dplugin_name} does not provide a processor called" + f" {dprocessor_name}. Available processors for that plugin are:" + f" {available_processors}" + ) + dependencies.add(depends_key) + + package_processors[processor_key] = PluginProvidedPackageProcessor( + processor_id, + resolve_package_type_selectors(package_type), + processor, + frozenset(dependencies), + self._plugin_metadata, + ) + + def _unload() -> None: + del package_processors[processor_key] + + self._unloaders.append(_unload) + + def automatic_discard_rule( + self, + name: str, + should_discard: Callable[[VirtualPath], bool], + *, + rule_reference_documentation: Optional[str] = None, + examples: Union[ + AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample] + ] = tuple(), + ) -> None: + """Register an automatic discard rule + + An automatic discard rule is basically applied to *every* path about to be installed in to any package. + If any discard rule concludes that a path should not be installed, then the path is not installed. + In the case where the discard path is a: + + * directory: Then the entire directory is excluded along with anything beneath it. + * symlink: Then the symlink itself (but not its target) is excluded. + * hardlink: Then the current hardlink will not be installed, but other instances of it will be. + + Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. + + Automatic discard rules should be written with the assumption that directories will be tested + before their content *when it is relevant* for the discard rule to examine whether the directory + can be excluded. + + The packager can via the manifest overrule automatic discard rules by explicitly listing the path + without any globs. As example: + + installations: + - install: + sources: + - usr/lib/libfoo.la # <-- This path is always installed + # (Discard rules are never asked in this case) + # + - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches + # Though, they will not examine `libfoo.la` as it has already been installed + # + # Note: usr/lib itself is never tested in this case (it is assumed to be + # explicitly requested). But any subdir of usr/lib will be examined. + + When an automatic discard rule is evaluated, it can see the source path currently being considered + for installation. While it can look at "surrounding" context (like parent directory), it will not + know whether those paths are to be installed or will be installed. + + :param name: A user visible name discard rule. It can be used on the command line, so avoid shell + metacharacters and spaces. + :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive + a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the + path is discarded. If it returns `False`, the path is not discarded (by this rule at least). + A source path will either be from the root of the source tree or the root of a search directory such as + `debian/tmp`. Where the path will be installed is not available at the time the discard rule is + evaluated. + :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user + looks up this automatic discard rule. + :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to + generate the examples. + + """ + self._restricted_api() + auto_discard_rules = self._feature_set.auto_discard_rules + existing = auto_discard_rules.get(name) + if existing is not None: + if existing.plugin_metadata.plugin_name == self._plugin_name: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' automatic discard rule "{name}" twice.' + ) + else: + message = ( + f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" + f" both tried to provide the automatic discard rule {name}" + ) + raise PluginConflictError( + message, existing.plugin_metadata, self._plugin_metadata + ) + examples = ( + (examples,) + if isinstance(examples, AutomaticDiscardRuleExample) + else tuple(examples) + ) + auto_discard_rules[name] = PluginProvidedDiscardRule( + name, + self._plugin_metadata, + should_discard, + rule_reference_documentation, + examples, + ) + + def _unload() -> None: + del auto_discard_rules[name] + + self._unloaders.append(_unload) + + def service_provider( + self, + service_manager: str, + detector: ServiceDetector, + integrator: ServiceIntegrator, + ) -> None: + self._restricted_api() + service_managers = self._feature_set.service_managers + existing = service_managers.get(service_manager) + if existing is not None: + if existing.plugin_metadata.plugin_name == self._plugin_name: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' service manager "{service_manager}" twice.' + ) + else: + message = ( + f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" + f' both tried to provide the service manager "{service_manager}"' + ) + raise PluginConflictError( + message, existing.plugin_metadata, self._plugin_metadata + ) + service_managers[service_manager] = ServiceManagerDetails( + service_manager, + detector, + integrator, + self._plugin_metadata, + ) + + def _unload() -> None: + del service_managers[service_manager] + + self._unloaders.append(_unload) + + def manifest_variable( + self, + variable_name: str, + value: str, + variable_reference_documentation: Optional[str] = None, + ) -> None: + self._check_variable_name(variable_name) + manifest_variables = self._feature_set.manifest_variables + try: + resolved_value = self._substitution.substitute( + value, "Plugin initialization" + ) + depends_on_variable = resolved_value != value + except DebputySubstitutionError: + depends_on_variable = True + if depends_on_variable: + raise ValueError( + f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." + f" This value depends on another variable, which is not supported. This restriction may be" + f" lifted in the future." + ) + + manifest_variables[variable_name] = PluginProvidedManifestVariable( + self._plugin_metadata, + variable_name, + value, + is_context_specific_variable=False, + variable_reference_documentation=variable_reference_documentation, + ) + + def _unload() -> None: + # We need to check it was never resolved + raise PluginInitializationError( + "Cannot unload manifest_variable (not implemented)" + ) + + self._unloaders.append(_unload) + + @property + def _plugin_name(self) -> str: + return self._plugin_metadata.plugin_name + + def provide_manifest_keyword( + self, + rule_type: TTP, + rule_name: Union[str, List[str]], + handler: DIPKWHandler, + *, + inline_reference_documentation: Optional[ParserDocumentation] = None, + ) -> None: + self._restricted_api() + if rule_type not in self._feature_set.dispatchable_table_parsers: + types = ", ".join( + sorted(x.__name__ for x in self._feature_set.dispatchable_table_parsers) + ) + raise ValueError( + f"The rule_type was not a supported type. It must be one of {types}" + ) + dispatching_parser = self._feature_set.dispatchable_table_parsers[rule_type] + dispatching_parser.register_keyword( + rule_name, + handler, + self._plugin_metadata, + inline_reference_documentation=inline_reference_documentation, + ) + + def _unload() -> None: + raise PluginInitializationError( + "Cannot unload provide_manifest_keyword (not implemented)" + ) + + self._unloaders.append(_unload) + + def plugable_object_parser( + self, + rule_type: str, + rule_name: str, + *, + object_parser_key: Optional[str] = None, + on_end_parse_step: Optional[ + Callable[ + [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData], + None, + ] + ] = None, + ) -> None: + self._restricted_api() + if object_parser_key is None: + object_parser_key = rule_name + dispatchable_object_parsers = self._feature_set.dispatchable_object_parsers + if rule_type not in dispatchable_object_parsers: + types = ", ".join(sorted(dispatchable_object_parsers)) + raise ValueError( + f"The rule_type was not a supported type. It must be one of {types}" + ) + if object_parser_key not in dispatchable_object_parsers: + types = ", ".join(sorted(dispatchable_object_parsers)) + raise ValueError( + f"The object_parser_key was not a supported type. It must be one of {types}" + ) + parent_dispatcher = dispatchable_object_parsers[rule_type] + child_dispatcher = dispatchable_object_parsers[object_parser_key] + parent_dispatcher.register_child_parser( + rule_name, + child_dispatcher, + self._plugin_metadata, + on_end_parse_step=on_end_parse_step, + ) + + def _unload() -> None: + raise PluginInitializationError( + "Cannot unload plugable_object_parser (not implemented)" + ) + + self._unloaders.append(_unload) + + def plugable_manifest_rule( + self, + rule_type: Union[TTP, str], + rule_name: Union[str, List[str]], + parsed_format: Type[PF], + handler: DIPHandler, + *, + source_format: Optional[SF] = None, + inline_reference_documentation: Optional[ParserDocumentation] = None, + ) -> None: + self._restricted_api() + feature_set = self._feature_set + if isinstance(rule_type, str): + if rule_type not in feature_set.dispatchable_object_parsers: + types = ", ".join(sorted(feature_set.dispatchable_object_parsers)) + raise ValueError( + f"The rule_type was not a supported type. It must be one of {types}" + ) + dispatching_parser = feature_set.dispatchable_object_parsers[rule_type] + else: + if rule_type not in feature_set.dispatchable_table_parsers: + types = ", ".join( + sorted(x.__name__ for x in feature_set.dispatchable_table_parsers) + ) + raise ValueError( + f"The rule_type was not a supported type. It must be one of {types}" + ) + dispatching_parser = feature_set.dispatchable_table_parsers[rule_type] + + parser = feature_set.manifest_parser_generator.parser_from_typed_dict( + parsed_format, + source_content=source_format, + inline_reference_documentation=inline_reference_documentation, + ) + dispatching_parser.register_parser( + rule_name, + parser, + handler, + self._plugin_metadata, + ) + + def _unload() -> None: + raise PluginInitializationError( + "Cannot unload plugable_manifest_rule (not implemented)" + ) + + self._unloaders.append(_unload) + + def known_packaging_files( + self, + packaging_file_details: KnownPackagingFileInfo, + ) -> None: + known_packaging_files = self._feature_set.known_packaging_files + detection_method = packaging_file_details.get( + "detection_method", cast("Literal['path']", "path") + ) + path = packaging_file_details.get("path") + dhpkgfile = packaging_file_details.get("pkgfile") + + packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy() + + if detection_method == "path": + if dhpkgfile is not None: + raise ValueError( + 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' + ) + if path != _normalize_path(path, with_prefix=False): + raise ValueError( + f"The path for known packaging files must be normalized. Please replace" + f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' + ) + detection_value = path + else: + assert detection_method == "dh.pkgfile" + if path is not None: + raise ValueError( + 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' + ) + if "/" in dhpkgfile: + raise ValueError( + 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' + ) + detection_value = dhpkgfile + key = f"{detection_method}::{detection_value}" + existing = known_packaging_files.get(key) + if existing is not None: + if existing.plugin_metadata.plugin_name != self._plugin_name: + message = ( + f'The key "{key}" is registered twice for known packaging files.' + f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" + ) + else: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' key "{key}" twice for known packaging files.' + ) + raise PluginConflictError( + message, existing.plugin_metadata, self._plugin_metadata + ) + _validate_known_packaging_file_dh_compat_rules( + packaging_file_details.get("dh_compat_rules") + ) + known_packaging_files[key] = PluginProvidedKnownPackagingFile( + packaging_file_details, + detection_method, + detection_value, + self._plugin_metadata, + ) + + def _unload() -> None: + del known_packaging_files[key] + + self._unloaders.append(_unload) + + def register_mapped_type( + self, + type_mapping: TypeMapping, + *, + reference_documentation: Optional[TypeMappingDocumentation] = None, + ) -> None: + self._restricted_api() + target_type = type_mapping.target_type + mapped_types = self._feature_set.mapped_types + existing = mapped_types.get(target_type) + if existing is not None: + if existing.plugin_metadata.plugin_name != self._plugin_name: + message = ( + f'The key "{target_type.__name__}" is registered twice for known packaging files.' + f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" + ) + else: + message = ( + f"Bug in the plugin {self._plugin_name}: It tried to register the" + f' key "{target_type.__name__}" twice for known packaging files.' + ) + raise PluginConflictError( + message, existing.plugin_metadata, self._plugin_metadata + ) + parser_generator = self._feature_set.manifest_parser_generator + mapped_types[target_type] = PluginProvidedTypeMapping( + type_mapping, reference_documentation, self._plugin_metadata + ) + parser_generator.register_mapped_type(type_mapping) + + def _restricted_api( + self, + *, + allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(), + ) -> None: + if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: + raise PluginAPIViolationError( + f"Plugin {self._plugin_name} attempted to access a debputy-only API." + " If you are the maintainer of this plugin and want access to this" + " API, please file a feature request to make this public." + " (The API is currently private as it is unstable.)" + ) + + +class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): + __slots__ = () + + def _append_script( + self, + caller_name: str, + maintscript: Maintscript, + full_script: str, + /, + perform_substitution: bool = True, + ) -> None: + raise NotImplementedError + + @classmethod + def _apply_condition_to_script( + cls, + condition: str, + run_snippet: str, + /, + indent: Optional[bool] = None, + ) -> str: + if indent is None: + # We auto-determine this based on heredocs currently + indent = "<<" not in run_snippet + + if indent: + run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) + if not run_snippet.endswith("\n"): + run_snippet += "\n" + condition_line = f"if {condition}; then\n" + end_line = "fi\n" + return "".join((condition_line, run_snippet, end_line)) + + def on_configure( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + skip_on_rollback: bool = False, + ) -> None: + condition = POSTINST_DEFAULT_CONDITION + if skip_on_rollback: + condition = '[ "$1" = "configure" ]' + return self._append_script( + "on_configure", + "postinst", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_initial_install( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "configure" -a -z "$2" ]' + return self._append_script( + "on_initial_install", + "postinst", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_upgrade( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "configure" -a -n "$2" ]' + return self._append_script( + "on_upgrade", + "postinst", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_upgrade_from( + self, + version: str, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' + return self._append_script( + "on_upgrade_from", + "postinst", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_before_removal( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "remove" ]' + return self._append_script( + "on_before_removal", + "prerm", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_removed( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "remove" ]' + return self._append_script( + "on_removed", + "postrm", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def on_purge( + self, + run_snippet: str, + /, + indent: Optional[bool] = None, + perform_substitution: bool = True, + ) -> None: + condition = '[ "$1" = "purge" ]' + return self._append_script( + "on_purge", + "postrm", + self._apply_condition_to_script(condition, run_snippet, indent=indent), + perform_substitution=perform_substitution, + ) + + def unconditionally_in_script( + self, + maintscript: Maintscript, + run_snippet: str, + /, + perform_substitution: bool = True, + ) -> None: + if maintscript not in STD_CONTROL_SCRIPTS: + raise ValueError( + f'Unknown script "{maintscript}". Should have been one of:' + f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' + ) + return self._append_script( + "unconditionally_in_script", + maintscript, + run_snippet, + perform_substitution=perform_substitution, + ) + + +class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): + __slots__ = ( + "_plugin_metadata", + "_maintscript_snippets", + "_plugin_source_id", + "_package_substitution", + ) + + def __init__( + self, + plugin_metadata: DebputyPluginMetadata, + plugin_source_id: str, + maintscript_snippets: Dict[str, MaintscriptSnippetContainer], + package_substitution: Substitution, + ): + self._plugin_metadata = plugin_metadata + self._plugin_source_id = plugin_source_id + self._maintscript_snippets = maintscript_snippets + self._package_substitution = package_substitution + + def _append_script( + self, + caller_name: str, + maintscript: Maintscript, + full_script: str, + /, + perform_substitution: bool = True, + ) -> None: + def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" + if perform_substitution: + full_script = self._package_substitution.substitute(full_script, def_source) + + snippet = MaintscriptSnippet(snippet=full_script, definition_source=def_source) + self._maintscript_snippets[maintscript].append(snippet) + + +class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): + __slots__ = ( + "_plugin_metadata", + "_plugin_source_id", + "_package_metadata_context", + "_triggers", + "_substvars", + "_maintscript", + "_shlibs_details", + ) + + def __init__( + self, + plugin_metadata: DebputyPluginMetadata, + plugin_source_id: str, + package_metadata_context: PackageProcessingContext, + triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], + substvars: FlushableSubstvars, + shlibs_details: Tuple[Optional[str], Optional[List[str]]], + ) -> None: + self._plugin_metadata = plugin_metadata + self._plugin_source_id = plugin_source_id + self._package_metadata_context = package_metadata_context + self._triggers = triggers + self._substvars = substvars + self._maintscript: Optional[MaintscriptAccessor] = None + self._shlibs_details = shlibs_details + + def _create_maintscript_accessor(self) -> MaintscriptAccessor: + raise NotImplementedError + + def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: + """Register a declarative dpkg level trigger + + The provided trigger will be added to the package's metadata (the triggers file of the control.tar). + + If the trigger has already been added previously, a second call with the same trigger data will be ignored. + """ + key = (trigger_type, trigger_target) + if key in self._triggers: + return + self._triggers[key] = PluginProvidedTrigger( + dpkg_trigger_type=trigger_type, + dpkg_trigger_target=trigger_target, + provider=self._plugin_metadata, + provider_source_id=self._plugin_source_id, + ) + + @property + def maintscript(self) -> MaintscriptAccessor: + maintscript = self._maintscript + if maintscript is None: + maintscript = self._create_maintscript_accessor() + self._maintscript = maintscript + return maintscript + + @property + def substvars(self) -> FlushableSubstvars: + return self._substvars + + def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: + binary_package = self._package_metadata_context.binary_package + with self.substvars.flush() as substvars_file: + dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] + if binary_package.is_udeb: + dpkg_cmd.append("-tudeb") + if binary_package.is_essential: + dpkg_cmd.append("-dPre-Depends") + shlibs_local, shlib_dirs = self._shlibs_details + if shlibs_local is not None: + dpkg_cmd.append(f"-L{shlibs_local}") + if shlib_dirs: + dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) + dpkg_cmd.extend(p.fs_path for p in paths) + print_command(*dpkg_cmd) + try: + subprocess.check_call(dpkg_cmd) + except subprocess.CalledProcessError: + _error( + f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" + " review the output from dpkg-shlibdeps above to understand what went wrong." + ) + + +class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): + __slots__ = ( + "_maintscript", + "_maintscript_snippets", + "_package_substitution", + ) + + def __init__( + self, + plugin_metadata: DebputyPluginMetadata, + plugin_source_id: str, + package_metadata_context: PackageProcessingContext, + triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], + substvars: FlushableSubstvars, + maintscript_snippets: Dict[str, MaintscriptSnippetContainer], + package_substitution: Substitution, + shlibs_details: Tuple[Optional[str], Optional[List[str]]], + ) -> None: + super().__init__( + plugin_metadata, + plugin_source_id, + package_metadata_context, + triggers, + substvars, + shlibs_details, + ) + self._maintscript_snippets = maintscript_snippets + self._package_substitution = package_substitution + self._maintscript = MaintscriptAccessorProvider( + plugin_metadata, + plugin_source_id, + maintscript_snippets, + package_substitution, + ) + + def _create_maintscript_accessor(self) -> MaintscriptAccessor: + return MaintscriptAccessorProvider( + self._plugin_metadata, + self._plugin_source_id, + self._maintscript_snippets, + self._package_substitution, + ) + + +class BinaryCtrlAccessorProviderCreator: + def __init__( + self, + package_metadata_context: PackageProcessingContext, + substvars: FlushableSubstvars, + maintscript_snippets: Dict[str, MaintscriptSnippetContainer], + substitution: Substitution, + ) -> None: + self._package_metadata_context = package_metadata_context + self._substvars = substvars + self._maintscript_snippets = maintscript_snippets + self._substitution = substitution + self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} + self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None + + def for_plugin( + self, + plugin_metadata: DebputyPluginMetadata, + plugin_source_id: str, + ) -> BinaryCtrlAccessor: + return BinaryCtrlAccessorProvider( + plugin_metadata, + plugin_source_id, + self._package_metadata_context, + self._triggers, + self._substvars, + self._maintscript_snippets, + self._substitution, + self.shlibs_details, + ) + + def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: + return self._triggers.values() + + +def plugin_metadata_for_debputys_own_plugin( + loader: Optional[PluginInitializationEntryPoint] = None, +) -> DebputyPluginMetadata: + if loader is None: + from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features + + loader = initialize_debputy_features + return DebputyPluginMetadata( + plugin_name="debputy", + api_compat_version=1, + plugin_initializer=loader, + plugin_loader=None, + plugin_path="<bundled>", + ) + + +def load_plugin_features( + plugin_search_dirs: Sequence[str], + substitution: Substitution, + requested_plugins_only: Optional[Sequence[str]] = None, + required_plugins: Optional[Set[str]] = None, + plugin_feature_set: Optional[PluginProvidedFeatureSet] = None, + debug_mode: bool = False, +) -> PluginProvidedFeatureSet: + if plugin_feature_set is None: + plugin_feature_set = PluginProvidedFeatureSet() + plugins = [plugin_metadata_for_debputys_own_plugin()] + unloadable_plugins = set() + if required_plugins: + plugins.extend( + find_json_plugins( + plugin_search_dirs, + required_plugins, + ) + ) + if requested_plugins_only is not None: + plugins.extend( + find_json_plugins( + plugin_search_dirs, + requested_plugins_only, + ) + ) + else: + auto_loaded = _find_all_json_plugins( + plugin_search_dirs, + required_plugins if required_plugins is not None else frozenset(), + debug_mode=debug_mode, + ) + for plugin_metadata in auto_loaded: + plugins.append(plugin_metadata) + unloadable_plugins.add(plugin_metadata.plugin_name) + + for plugin_metadata in plugins: + api = DebputyPluginInitializerProvider( + plugin_metadata, plugin_feature_set, substitution + ) + try: + api.load_plugin() + except PluginBaseError as e: + if plugin_metadata.plugin_name not in unloadable_plugins: + raise + if debug_mode: + raise + try: + api.unload_plugin() + except Exception: + _warn( + f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" + " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" + " module might have tainted the feature set." + ) + raise e from None + else: + if debug_mode: + _warn( + f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" + f" to --debug/-d." + ) + _warn( + f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" + f" deactivated. Use debug mode (--debug) to show the stacktrace (the warning will become an error)" + ) + + return plugin_feature_set + + +def find_json_plugin( + search_dirs: Sequence[str], + requested_plugin: str, +) -> DebputyPluginMetadata: + r = list(find_json_plugins(search_dirs, [requested_plugin])) + assert len(r) == 1 + return r[0] + + +def find_related_implementation_files_for_plugin( + plugin_metadata: DebputyPluginMetadata, +) -> List[str]: + plugin_path = plugin_metadata.plugin_path + if not os.path.isfile(plugin_path): + plugin_name = plugin_metadata.plugin_name + _error( + f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" + " or loaded via a mechanism that does not support detecting its tests." + ) + files = [] + module_name, module_file = _find_plugin_implementation_file( + plugin_metadata.plugin_name, + plugin_metadata.plugin_path, + ) + if os.path.isfile(module_file): + files.append(module_file) + else: + if not plugin_metadata.is_loaded: + plugin_metadata.load_plugin() + if module_name in sys.modules: + _error( + f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' + f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " + f" installed via this method. The related Python would not be installed" + f" (which would result in a plugin that would fail to load)" + ) + + return files + + +def find_tests_for_plugin( + plugin_metadata: DebputyPluginMetadata, +) -> List[str]: + plugin_name = plugin_metadata.plugin_name + plugin_path = plugin_metadata.plugin_path + + if not os.path.isfile(plugin_path): + _error( + f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" + " mechanism that does not support detecting its tests." + ) + + plugin_dir = os.path.dirname(plugin_path) + test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") + tests = [] + with os.scandir(plugin_dir) as dir_iter: + for p in dir_iter: + if ( + p.is_file() + and p.name.startswith(test_basename_prefix) + and PLUGIN_TEST_SUFFIX.search(p.name) + ): + tests.append(p.path) + return tests + + +def find_json_plugins( + search_dirs: Sequence[str], + requested_plugins: Iterable[str], +) -> Iterable[DebputyPluginMetadata]: + for plugin_name_or_path in requested_plugins: + found = False + if "/" in plugin_name_or_path: + if not os.path.isfile(plugin_name_or_path): + raise PluginNotFoundError( + f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." + ' (Because the plugin name contains "/", it is assumed to be a path and search path' + " is not used." + ) + yield parse_json_plugin_desc(plugin_name_or_path) + return + for search_dir in search_dirs: + path = os.path.join( + search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" + ) + if not os.path.isfile(path): + continue + found = True + yield parse_json_plugin_desc(path) + if not found: + search_dir_str = ":".join(search_dirs) + raise PluginNotFoundError( + f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" + f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" + ) + + +def _find_all_json_plugins( + search_dirs: Sequence[str], + required_plugins: AbstractSet[str], + debug_mode: bool = False, +) -> Iterable[DebputyPluginMetadata]: + seen = set(required_plugins) + error_seen = False + for search_dir in search_dirs: + try: + dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) + except FileNotFoundError: + continue + with dir_fd: + for entry in dir_fd: + if ( + not entry.is_file(follow_symlinks=True) + or not entry.name.endswith(".json") + or entry.name in seen + ): + continue + try: + plugin_metadata = parse_json_plugin_desc(entry.path) + except PluginBaseError as e: + if debug_mode: + raise + if not error_seen: + error_seen = True + _warn( + f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" + ) + else: + _warn( + f"Failed to load plugin in {entry.path} due to errors (not shown)." + ) + else: + yield plugin_metadata + + +def _find_plugin_implementation_file( + plugin_name: str, + json_file_path: str, +) -> Tuple[str, str]: + guessed_module_basename = plugin_name.replace("-", "_") + module_name = f"debputy.plugin.{guessed_module_basename}" + module_fs_path = os.path.join( + os.path.dirname(json_file_path), f"{guessed_module_basename}.py" + ) + return module_name, module_fs_path + + +def _resolve_module_initializer( + plugin_name: str, + plugin_initializer_name: str, + module_name: Optional[str], + json_file_path: str, +) -> PluginInitializationEntryPoint: + module = None + module_fs_path = None + if module_name is None: + module_name, module_fs_path = _find_plugin_implementation_file( + plugin_name, json_file_path + ) + if os.path.isfile(module_fs_path): + spec = importlib.util.spec_from_file_location(module_name, module_fs_path) + if spec is None: + raise PluginInitializationError( + f"Failed to load {plugin_name} (path: {module_fs_path})." + " The spec_from_file_location function returned None." + ) + mod = importlib.util.module_from_spec(spec) + loader = spec.loader + if loader is None: + raise PluginInitializationError( + f"Failed to load {plugin_name} (path: {module_fs_path})." + " Python could not find a suitable loader (spec.loader was None)" + ) + sys.modules[module_name] = mod + try: + loader.exec_module(mod) + except (Exception, GeneratorExit) as e: + raise PluginInitializationError( + f"Failed to load {plugin_name} (path: {module_fs_path})." + " The module threw an exception while being loaded." + ) from e + module = mod + + if module is None: + try: + module = importlib.import_module(module_name) + except ModuleNotFoundError as e: + if module_fs_path is None: + raise PluginMetadataError( + f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' + " this module is not available in the python search path" + ) from e + raise PluginInitializationError( + f"Failed to load {plugin_name}. Tried loading it from" + f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' + f" {module_name} (where it was not found either). Please ensure" + " the module code is installed in the correct spot or provide an" + f' explicit "module" definition in {json_file_path}.' + ) from e + + plugin_initializer = getattr(module, plugin_initializer_name) + + if plugin_initializer is None: + raise PluginMetadataError( + f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' + f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin" + f" metadata or initializer name in the Python module." + ) + return cast("PluginInitializationEntryPoint", plugin_initializer) + + +def _json_plugin_loader( + plugin_name: str, + plugin_json_metadata: PluginJsonMetadata, + json_file_path: str, + attribute_path: AttributePath, +) -> Callable[["DebputyPluginInitializer"], None]: + api_compat = plugin_json_metadata["api_compat_version"] + module_name = plugin_json_metadata.get("module") + plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") + packager_provided_files_raw = plugin_json_metadata.get( + "packager_provided_files", [] + ) + manifest_variables_raw = plugin_json_metadata.get("manifest_variables") + known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") + if api_compat != 1: + raise PluginMetadataError( + f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' + f" version of debputy only supports API compat version of 1" + ) + if plugin_initializer_name is not None and "." in plugin_initializer_name: + p = attribute_path["plugin_initializer"] + raise PluginMetadataError( + f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' + ) + + plugin_initializers = [] + + if plugin_initializer_name is not None: + plugin_initializer = _resolve_module_initializer( + plugin_name, + plugin_initializer_name, + module_name, + json_file_path, + ) + plugin_initializers.append(plugin_initializer) + + if known_packaging_files_raw: + kpf_root_path = attribute_path["known_packaging_files"] + known_packaging_files = [] + for k, v in enumerate(known_packaging_files_raw): + kpf_path = kpf_root_path[k] + p = v.get("path") + if isinstance(p, str): + kpf_path.path_hint = p + if plugin_name.startswith("debputy-") and isinstance(v, dict): + docs = v.get("documentation-uris") + if docs is not None and isinstance(docs, list): + docs = [ + ( + d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) + if isinstance(d, str) + else d + ) + for d in docs + ] + v["documentation-uris"] = docs + known_packaging_file: KnownPackagingFileInfo = ( + PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( + v, + kpf_path, + ) + ) + known_packaging_files.append((kpf_path, known_packaging_file)) + + def _initialize_json_provided_known_packaging_files( + api: DebputyPluginInitializerProvider, + ) -> None: + for p, details in known_packaging_files: + try: + api.known_packaging_files(details) + except ValueError as ex: + raise PluginMetadataError( + f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" + ) + + plugin_initializers.append(_initialize_json_provided_known_packaging_files) + + if manifest_variables_raw: + manifest_var_path = attribute_path["manifest_variables"] + manifest_variables = [ + PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) + for i, p in enumerate(manifest_variables_raw) + ] + + def _initialize_json_provided_manifest_vars( + api: DebputyPluginInitializer, + ) -> None: + for idx, manifest_variable in enumerate(manifest_variables): + name = manifest_variable["name"] + value = manifest_variable["value"] + doc = manifest_variable.get("reference_documentation") + try: + api.manifest_variable( + name, value, variable_reference_documentation=doc + ) + except ValueError as ex: + var_path = manifest_var_path[idx] + raise PluginMetadataError( + f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" + ) + + plugin_initializers.append(_initialize_json_provided_manifest_vars) + + if packager_provided_files_raw: + ppf_path = attribute_path["packager_provided_files"] + ppfs = [ + PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) + for i, p in enumerate(packager_provided_files_raw) + ] + + def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: + ppf: PackagerProvidedFileJsonDescription + for idx, ppf in enumerate(ppfs): + c = dict(ppf) + stem = ppf["stem"] + installed_path = ppf["installed_path"] + default_mode = ppf.get("default_mode") + ref_doc_dict = ppf.get("reference_documentation") + if default_mode is not None: + c["default_mode"] = default_mode.octal_mode + + if ref_doc_dict is not None: + ref_doc = packager_provided_file_reference_documentation( + **ref_doc_dict + ) + else: + ref_doc = None + + for k in [ + "stem", + "installed_path", + "reference_documentation", + ]: + try: + del c[k] + except KeyError: + pass + + try: + api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore + except ValueError as ex: + p_path = ppf_path[idx] + raise PluginMetadataError( + f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" + ) + + plugin_initializers.append(_initialize_json_provided_ppfs) + + if not plugin_initializers: + raise PluginMetadataError( + f"The plugin defined in {json_file_path} does not seem to provide features, " + f" such as module + plugin-initializer or packager-provided-files." + ) + + if len(plugin_initializers) == 1: + return plugin_initializers[0] + + def _chain_loader(api: DebputyPluginInitializer) -> None: + for initializer in plugin_initializers: + initializer(api) + + return _chain_loader + + +@contextlib.contextmanager +def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]: + if fd is not None: + yield fd + else: + with open(path, "rb") as fd: + yield fd + + +def parse_json_plugin_desc( + path: str, *, fd: Optional[IO[bytes]] = None +) -> DebputyPluginMetadata: + with _open(path, fd=fd) as rfd: + try: + raw = json.load(rfd) + except JSONDecodeError as e: + raise PluginMetadataError( + f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' + ) from e + plugin_name = os.path.basename(path) + if plugin_name.endswith(".json"): + plugin_name = plugin_name[:-5] + elif plugin_name.endswith(".json.in"): + plugin_name = plugin_name[:-8] + + if plugin_name == "debputy": + # Provide a better error message than "The plugin has already loaded!?" + raise PluginMetadataError( + f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' + f" clash with the bundled plugin of same name." + ) + + attribute_path = AttributePath.root_path() + + try: + plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( + raw, + attribute_path, + ) + except ManifestParseException as e: + raise PluginMetadataError( + f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' + ) from e + api_compat = plugin_json_metadata["api_compat_version"] + + return DebputyPluginMetadata( + plugin_name=plugin_name, + plugin_loader=lambda: _json_plugin_loader( + plugin_name, + plugin_json_metadata, + path, + attribute_path, + ), + api_compat_version=api_compat, + plugin_initializer=None, + plugin_path=path, + ) + + +@dataclasses.dataclass(slots=True, frozen=True) +class ServiceDefinitionImpl(ServiceDefinition[DSD]): + name: str + names: Sequence[str] + path: VirtualPath + type_of_service: str + service_scope: str + auto_enable_on_install: bool + auto_start_in_install: bool + on_upgrade: ServiceUpgradeRule + definition_source: str + is_plugin_provided_definition: bool + service_context: Optional[DSD] + + +class ServiceRegistryImpl(ServiceRegistry[DSD]): + __slots__ = ("_service_manager_details", "_service_definitions") + + def __init__(self, service_manager_details: ServiceManagerDetails) -> None: + self._service_manager_details = service_manager_details + self._service_definitions: List[ServiceDefinition[DSD]] = [] + + @property + def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: + return self._service_definitions + + def register_service( + self, + path: VirtualPath, + name: Union[str, List[str]], + *, + type_of_service: str = "service", # "timer", etc. + service_scope: str = "system", + enable_by_default: bool = True, + start_by_default: bool = True, + default_upgrade_rule: ServiceUpgradeRule = "restart", + service_context: Optional[DSD] = None, + ) -> None: + names = name if isinstance(name, list) else [name] + if len(names) < 1: + raise ValueError( + f"The service must have at least one name - {path.absolute} did not have any" + ) + # TODO: We cannot create a service definition immediate once the manifest is involved + self._service_definitions.append( + ServiceDefinitionImpl( + names[0], + names, + path, + type_of_service, + service_scope, + enable_by_default, + start_by_default, + default_upgrade_rule, + f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", + True, + service_context, + ) + ) |