diff options
Diffstat (limited to 'src/debputy/plugin/api/impl_types.py')
-rw-r--r-- | src/debputy/plugin/api/impl_types.py | 1161 |
1 files changed, 1161 insertions, 0 deletions
diff --git a/src/debputy/plugin/api/impl_types.py b/src/debputy/plugin/api/impl_types.py new file mode 100644 index 0000000..f32b008 --- /dev/null +++ b/src/debputy/plugin/api/impl_types.py @@ -0,0 +1,1161 @@ +import dataclasses +import os.path +import textwrap +from typing import ( + Optional, + Callable, + FrozenSet, + Dict, + List, + Tuple, + Generic, + TYPE_CHECKING, + TypeVar, + cast, + Any, + Sequence, + Union, + Type, + TypedDict, + Iterable, + Mapping, + NotRequired, + Literal, + Set, + Iterator, +) +from weakref import ref + +from debputy import DEBPUTY_DOC_ROOT_DIR +from debputy.exceptions import ( + DebputyFSIsROError, + PluginAPIViolationError, + PluginConflictError, + UnhandledOrUnexpectedErrorFromPluginError, +) +from debputy.filesystem_scan import as_path_def +from debputy.installations import InstallRule +from debputy.maintscript_snippet import DpkgMaintscriptHelperCommand +from debputy.manifest_conditions import ManifestCondition +from debputy.manifest_parser.base_types import DebputyParsedContent, TypeMapping +from debputy.manifest_parser.exceptions import ManifestParseException +from debputy.manifest_parser.util import AttributePath +from debputy.packages import BinaryPackage +from debputy.plugin.api import ( + VirtualPath, + BinaryCtrlAccessor, + PackageProcessingContext, +) +from debputy.plugin.api.spec import ( + DebputyPluginInitializer, + MetadataAutoDetector, + DpkgTriggerType, + ParserDocumentation, + PackageProcessor, + PathDef, + ParserAttributeDocumentation, + undocumented_attr, + documented_attr, + reference_documentation, + PackagerProvidedFileReferenceDocumentation, + TypeMappingDocumentation, +) +from debputy.substitution import VariableContext +from debputy.transformation_rules import TransformationRule +from debputy.util import _normalize_path, package_cross_check_precheck + +if TYPE_CHECKING: + from debputy.plugin.api.spec import ( + ServiceDetector, + ServiceIntegrator, + PackageTypeSelector, + ) + from debputy.manifest_parser.parser_data import ParserContextData + from debputy.highlevel_manifest import ( + HighLevelManifest, + PackageTransformationDefinition, + BinaryPackageData, + ) + + +_PACKAGE_TYPE_DEB_ONLY = frozenset(["deb"]) +_ALL_PACKAGE_TYPES = frozenset(["deb", "udeb"]) + + +TD = TypeVar("TD", bound="DebputyParsedContent") +PF = TypeVar("PF") +SF = TypeVar("SF") +TP = TypeVar("TP") +TTP = Type[TP] + +DIPKWHandler = Callable[[str, AttributePath, "ParserContextData"], TP] +DIPHandler = Callable[[str, PF, AttributePath, "ParserContextData"], TP] + + +def resolve_package_type_selectors( + package_type: "PackageTypeSelector", +) -> FrozenSet[str]: + if package_type is _ALL_PACKAGE_TYPES or package_type is _PACKAGE_TYPE_DEB_ONLY: + return cast("FrozenSet[str]", package_type) + if isinstance(package_type, str): + return ( + _PACKAGE_TYPE_DEB_ONLY + if package_type == "deb" + else frozenset([package_type]) + ) + else: + return frozenset(package_type) + + +@dataclasses.dataclass(slots=True) +class DebputyPluginMetadata: + plugin_name: str + api_compat_version: int + plugin_loader: Optional[Callable[[], Callable[["DebputyPluginInitializer"], None]]] + plugin_initializer: Optional[Callable[["DebputyPluginInitializer"], None]] + plugin_path: str + _is_initialized: bool = False + + @property + def is_loaded(self) -> bool: + return self.plugin_initializer is not None + + @property + def is_initialized(self) -> bool: + return self._is_initialized + + def initialize_plugin(self, api: "DebputyPluginInitializer") -> None: + if self.is_initialized: + raise RuntimeError("Cannot load plugins twice") + if not self.is_loaded: + self.load_plugin() + plugin_initializer = self.plugin_initializer + assert plugin_initializer is not None + plugin_initializer(api) + self._is_initialized = True + + def load_plugin(self) -> None: + plugin_loader = self.plugin_loader + assert plugin_loader is not None + self.plugin_initializer = plugin_loader() + assert self.plugin_initializer is not None + + +@dataclasses.dataclass(slots=True, frozen=True) +class PluginProvidedParser(Generic[PF, TP]): + parser: "DeclarativeInputParser[PF]" + handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP] + plugin_metadata: DebputyPluginMetadata + + def parse( + self, + name: str, + value: object, + attribute_path: "AttributePath", + *, + parser_context: "ParserContextData", + ) -> TP: + parsed_value = self.parser.parse_input( + value, attribute_path, parser_context=parser_context + ) + return self.handler(name, parsed_value, attribute_path, parser_context) + + +class PPFFormatParam(TypedDict): + priority: Optional[int] + name: str + owning_package: str + + +@dataclasses.dataclass(slots=True, frozen=True) +class PackagerProvidedFileClassSpec: + debputy_plugin_metadata: DebputyPluginMetadata + stem: str + installed_as_format: str + default_mode: int + default_priority: Optional[int] + allow_name_segment: bool + allow_architecture_segment: bool + post_formatting_rewrite: Optional[Callable[[str], str]] + packageless_is_fallback_for_all_packages: bool + reservation_only: bool + formatting_callback: Optional[Callable[[str, PPFFormatParam, VirtualPath], str]] = ( + None + ) + reference_documentation: Optional[PackagerProvidedFileReferenceDocumentation] = None + bug_950723: bool = False + + @property + def supports_priority(self) -> bool: + return self.default_priority is not None + + def compute_dest( + self, + assigned_name: str, + # Note this method is currently used 1:1 inside plugin tests. + *, + owning_package: Optional[str] = None, + assigned_priority: Optional[int] = None, + path: Optional[VirtualPath] = None, + ) -> Tuple[str, str]: + if assigned_priority is not None and not self.supports_priority: + raise ValueError( + f"Cannot assign priority to packager provided files with stem" + f' "{self.stem}" (e.g., "debian/foo.{self.stem}"). They' + " do not use priority at all." + ) + + path_format = self.installed_as_format + if self.supports_priority and assigned_priority is None: + assigned_priority = self.default_priority + + if owning_package is None: + owning_package = assigned_name + + params: PPFFormatParam = { + "priority": assigned_priority, + "name": assigned_name, + "owning_package": owning_package, + } + + if self.formatting_callback is not None: + if path is None: + raise ValueError( + "The path parameter is required for PPFs with formatting_callback" + ) + dest_path = self.formatting_callback(path_format, params, path) + else: + dest_path = path_format.format(**params) + + dirname, basename = os.path.split(dest_path) + dirname = _normalize_path(dirname) + + if self.post_formatting_rewrite: + basename = self.post_formatting_rewrite(basename) + return dirname, basename + + +@dataclasses.dataclass(slots=True) +class MetadataOrMaintscriptDetector: + plugin_metadata: DebputyPluginMetadata + detector_id: str + detector: MetadataAutoDetector + applies_to_package_types: FrozenSet[str] + enabled: bool = True + + def applies_to(self, binary_package: BinaryPackage) -> bool: + return binary_package.package_type in self.applies_to_package_types + + def run_detector( + self, + fs_root: "VirtualPath", + ctrl: "BinaryCtrlAccessor", + context: "PackageProcessingContext", + ) -> None: + try: + self.detector(fs_root, ctrl, context) + except DebputyFSIsROError as e: + nv = self.plugin_metadata.plugin_name + raise PluginAPIViolationError( + f'The plugin {nv} violated the API contract for "metadata detectors"' + " by attempting to mutate the provided file system in its metadata detector" + f" with id {self.detector_id}. File system mutation is *not* supported at" + " this stage (file system layout is committed and the attempted changes" + " would be lost)." + ) from e + except (ChildProcessError, RuntimeError, AttributeError) as e: + nv = f"{self.plugin_metadata.plugin_name}" + raise UnhandledOrUnexpectedErrorFromPluginError( + f"The plugin {nv} threw an unhandled or unexpected exception from its metadata" + f" detector with id {self.detector_id}." + ) from e + + +class DeclarativeInputParser(Generic[TD]): + @property + def inline_reference_documentation(self) -> Optional[ParserDocumentation]: + return None + + @property + def reference_documentation_url(self) -> Optional[str]: + doc = self.inline_reference_documentation + return doc.documentation_reference_url if doc is not None else None + + def parse_input( + self, + value: object, + path: "AttributePath", + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + raise NotImplementedError + + +class DispatchingParserBase(Generic[TP]): + def __init__(self, manifest_attribute_path_template: str) -> None: + self.manifest_attribute_path_template = manifest_attribute_path_template + self._parsers: Dict[str, PluginProvidedParser[Any, TP]] = {} + + def is_known_keyword(self, keyword: str) -> bool: + return keyword in self._parsers + + def registered_keywords(self) -> Iterable[str]: + yield from self._parsers + + def parser_for(self, keyword: str) -> PluginProvidedParser[Any, TP]: + return self._parsers[keyword] + + def register_keyword( + self, + keyword: Union[str, Sequence[str]], + handler: DIPKWHandler, + plugin_metadata: DebputyPluginMetadata, + *, + inline_reference_documentation: Optional[ParserDocumentation] = None, + ) -> None: + reference_documentation_url = None + if inline_reference_documentation: + if inline_reference_documentation.attribute_doc: + raise ValueError( + "Cannot provide per-attribute documentation for a value-less keyword!" + ) + if inline_reference_documentation.alt_parser_description: + raise ValueError( + "Cannot provide non-mapping-format documentation for a value-less keyword!" + ) + reference_documentation_url = ( + inline_reference_documentation.documentation_reference_url + ) + parser = DeclarativeValuelessKeywordInputParser( + inline_reference_documentation, + documentation_reference=reference_documentation_url, + ) + + def _combined_handler( + name: str, + _ignored: Any, + attr_path: AttributePath, + context: "ParserContextData", + ) -> TP: + return handler(name, attr_path, context) + + p = PluginProvidedParser( + parser, + _combined_handler, + plugin_metadata, + ) + + self._add_parser(keyword, p) + + def register_parser( + self, + keyword: Union[str, List[str]], + parser: "DeclarativeInputParser[PF]", + handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP], + plugin_metadata: DebputyPluginMetadata, + ) -> None: + p = PluginProvidedParser( + parser, + handler, + plugin_metadata, + ) + self._add_parser(keyword, p) + + def _add_parser( + self, + keyword: Union[str, List[str]], + ppp: "PluginProvidedParser[PF, TP]", + ) -> None: + ks = [keyword] if isinstance(keyword, str) else keyword + for k in ks: + existing_parser = self._parsers.get(k) + if existing_parser is not None: + message = ( + f'The rule name "{k}" is already taken by the plugin' + f" {existing_parser.plugin_metadata.plugin_name}. This conflict was triggered" + f" when plugin {ppp.plugin_metadata.plugin_name} attempted to register its parser." + ) + raise PluginConflictError( + message, + existing_parser.plugin_metadata, + ppp.plugin_metadata, + ) + self._new_parser(k, ppp) + + def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None: + self._parsers[keyword] = ppp + + def parse( + self, + orig_value: object, + attribute_path: "AttributePath", + *, + parser_context: "ParserContextData", + ) -> TP: + raise NotImplementedError + + +class DispatchingObjectParser( + DispatchingParserBase[Mapping[str, Any]], + DeclarativeInputParser[Mapping[str, Any]], +): + def __init__( + self, + manifest_attribute_path_template: str, + *, + parser_documentation: Optional[ParserDocumentation] = None, + ) -> None: + super().__init__(manifest_attribute_path_template) + self._attribute_documentation: List[ParserAttributeDocumentation] = [] + if parser_documentation is None: + parser_documentation = reference_documentation() + self._parser_documentation = parser_documentation + + @property + def reference_documentation_url(self) -> Optional[str]: + return self._parser_documentation.documentation_reference_url + + @property + def inline_reference_documentation(self) -> Optional[ParserDocumentation]: + ref_doc = self._parser_documentation + return reference_documentation( + title=ref_doc.title, + description=ref_doc.description, + attributes=self._attribute_documentation, + reference_documentation_url=self.reference_documentation_url, + ) + + def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None: + super()._new_parser(keyword, ppp) + doc = ppp.parser.inline_reference_documentation + if doc is None or doc.description is None: + self._attribute_documentation.append(undocumented_attr(keyword)) + else: + self._attribute_documentation.append( + documented_attr(keyword, doc.description) + ) + + def register_child_parser( + self, + keyword: str, + parser: "DispatchingObjectParser", + plugin_metadata: DebputyPluginMetadata, + *, + on_end_parse_step: Optional[ + Callable[ + [str, Optional[Mapping[str, Any]], AttributePath, "ParserContextData"], + None, + ] + ] = None, + ) -> None: + def _handler( + name: str, + value: Mapping[str, Any], + path: AttributePath, + parser_context: "ParserContextData", + ) -> Mapping[str, Any]: + on_end_parse_step(name, value, path, parser_context) + return value + + p = PluginProvidedParser( + parser, + _handler, + plugin_metadata, + ) + self._add_parser(keyword, p) + + # FIXME: Agree on naming (parse vs. parse_input) + def parse_input( + self, + value: object, + path: "AttributePath", + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + return self.parse(value, path, parser_context=parser_context) + + def parse( + self, + orig_value: object, + attribute_path: "AttributePath", + *, + parser_context: "ParserContextData", + ) -> TP: + doc_ref = "" + if self.reference_documentation_url is not None: + doc_ref = ( + f" Please see {self.reference_documentation_url} for the documentation." + ) + if not isinstance(orig_value, dict): + raise ManifestParseException( + f"The attribute {attribute_path.path} must be a non-empty mapping.{doc_ref}" + ) + if not orig_value: + raise ManifestParseException( + f"The attribute {attribute_path.path} must be a non-empty mapping.{doc_ref}" + ) + result = {} + unknown_keys = orig_value.keys() - self._parsers.keys() + if unknown_keys: + first_key = next(iter(unknown_keys)) + remaining_valid_attributes = self._parsers.keys() - orig_value.keys() + if not remaining_valid_attributes: + raise ManifestParseException( + f'The attribute "{first_key}" is not applicable at {attribute_path.path} (with the' + f" current set of plugins).{doc_ref}" + ) + remaining_valid_attribute_names = ", ".join(remaining_valid_attributes) + raise ManifestParseException( + f'The attribute "{first_key}" is not applicable at {attribute_path.path}(with the current set' + " of plugins). Possible attributes available (and not already used) are:" + f" {remaining_valid_attribute_names}.{doc_ref}" + ) + # Parse order is important for the root level (currently we use rule registration order) + for key, provided_parser in self._parsers.items(): + value = orig_value.get(key) + if value is None: + if isinstance(provided_parser.parser, DispatchingObjectParser): + provided_parser.handler( + key, {}, attribute_path[key], parser_context + ) + continue + value_path = attribute_path[key] + if provided_parser is None: + valid_keys = ", ".join(sorted(self._parsers.keys())) + raise ManifestParseException( + f'Unknown or unsupported option "{key}" at {value_path.path}.' + " Valid options at this location are:" + f" {valid_keys}\n{doc_ref}" + ) + parsed_value = provided_parser.parse( + key, value, value_path, parser_context=parser_context + ) + result[key] = parsed_value + return result + + +class DispatchingTableParser(DispatchingParserBase[TP]): + def __init__(self, base_type: TTP, manifest_attribute_path_template: str) -> None: + super().__init__(manifest_attribute_path_template) + self.base_type = base_type + + def parse( + self, + orig_value: object, + attribute_path: "AttributePath", + *, + parser_context: "ParserContextData", + ) -> TP: + if isinstance(orig_value, str): + key = orig_value + value = None + value_path = attribute_path + elif isinstance(orig_value, dict): + if len(orig_value) != 1: + valid_keys = ", ".join(sorted(self._parsers.keys())) + raise ManifestParseException( + f'The mapping "{attribute_path.path}" had two keys, but it should only have one top level key.' + " Maybe you are missing a list marker behind the second key or some indentation. The" + f" possible keys are: {valid_keys}" + ) + key, value = next(iter(orig_value.items())) + value_path = attribute_path[key] + else: + raise ManifestParseException( + f"The attribute {attribute_path.path} must be a string or a mapping." + ) + provided_parser = self._parsers.get(key) + if provided_parser is None: + valid_keys = ", ".join(sorted(self._parsers.keys())) + raise ManifestParseException( + f'Unknown or unsupported action "{key}" at {value_path.path}.' + " Valid actions at this location are:" + f" {valid_keys}" + ) + return provided_parser.parse( + key, value, value_path, parser_context=parser_context + ) + + +@dataclasses.dataclass(slots=True) +class DeclarativeValuelessKeywordInputParser(DeclarativeInputParser[None]): + inline_reference_documentation: Optional[ParserDocumentation] = None + documentation_reference: Optional[str] = None + + def parse_input( + self, + value: object, + path: "AttributePath", + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + if value is None: + return cast("TD", value) + if self.documentation_reference is not None: + doc_ref = f" (Documentation: {self.documentation_reference})" + else: + doc_ref = "" + raise ManifestParseException( + f"Expected attribute {path.path} to be a string.{doc_ref}" + ) + + +SUPPORTED_DISPATCHABLE_TABLE_PARSERS = { + InstallRule: "installations", + TransformationRule: "packages.{{PACKAGE}}.transformations", + DpkgMaintscriptHelperCommand: "packages.{{PACKAGE}}.conffile-management", + ManifestCondition: "*.when", +} + +OPARSER_MANIFEST_ROOT = "<ROOT>" +OPARSER_PACKAGES = "packages.{{PACKAGE}}" +OPARSER_MANIFEST_DEFINITIONS = "definitions" + +SUPPORTED_DISPATCHABLE_OBJECT_PARSERS = { + OPARSER_MANIFEST_ROOT: reference_documentation( + reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md", + ), + OPARSER_MANIFEST_DEFINITIONS: reference_documentation( + title="Packager provided definitions", + description="Reusable packager provided definitions such as manifest variables.", + reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#packager-provided-definitions", + ), + OPARSER_PACKAGES: reference_documentation( + title="Binary package rules", + description=textwrap.dedent( + """\ + Inside the manifest, the `packages` mapping can be used to define requests for the binary packages + you want `debputy` to produce. Each key inside `packages` must be the name of a binary package + defined in `debian/control`. The value is a dictionary defining which features that `debputy` + should apply to that binary package. An example could be: + + packages: + foo: + transformations: + - create-symlink: + path: usr/share/foo/my-first-symlink + target: /usr/share/bar/symlink-target + - create-symlink: + path: usr/lib/{{DEB_HOST_MULTIARCH}}/my-second-symlink + target: /usr/lib/{{DEB_HOST_MULTIARCH}}/baz/symlink-target + bar: + transformations: + - create-directories: + - some/empty/directory.d + - another/empty/integration-point.d + - create-directories: + path: a/third-empty/directory.d + owner: www-data + group: www-data + + In this case, `debputy` will create some symlinks inside the `foo` package and some directories for + the `bar` package. The following subsections define the keys you can use under each binary package. + """ + ), + reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#binary-package-rules", + ), +} + + +@dataclasses.dataclass(slots=True) +class PluginProvidedManifestVariable: + plugin_metadata: DebputyPluginMetadata + variable_name: str + variable_value: Optional[Union[str, Callable[[VariableContext], str]]] + is_context_specific_variable: bool + variable_reference_documentation: Optional[str] = None + is_documentation_placeholder: bool = False + is_for_special_case: bool = False + + @property + def is_internal(self) -> bool: + return self.variable_name.startswith("_") or ":_" in self.variable_name + + @property + def is_token(self) -> bool: + return self.variable_name.startswith("token:") + + def resolve(self, variable_context: VariableContext) -> str: + value_resolver = self.variable_value + if isinstance(value_resolver, str): + res = value_resolver + else: + res = value_resolver(variable_context) + return res + + +@dataclasses.dataclass(slots=True, frozen=True) +class AutomaticDiscardRuleExample: + content: Sequence[Tuple[PathDef, bool]] + description: Optional[str] = None + + +def automatic_discard_rule_example( + *content: Union[str, PathDef, Tuple[Union[str, PathDef], bool]], + example_description: Optional[str] = None, +) -> AutomaticDiscardRuleExample: + """Provide an example for an automatic discard rule + + The return value of this method should be passed to the `examples` parameter of + `automatic_discard_rule` method - either directly for a single example or as a + part of a sequence of examples. + + >>> # Possible example for an exclude rule for ".la" files + >>> # Example shows two files; The ".la" file that will be removed and another file that + >>> # will be kept. + >>> automatic_discard_rule_example( # doctest: +ELLIPSIS + ... "usr/lib/libfoo.la", + ... ("usr/lib/libfoo.so.1.0.0", False), + ... ) + AutomaticDiscardRuleExample(...) + + Keep in mind that you have to explicitly include directories that are relevant for the test + if you want them shown. Also, if a directory is excluded, all path beneath it will be + automatically excluded in the example as well. Your example data must account for that. + + >>> # Possible example for python cache file discard rule + >>> # In this example, we explicitly list the __pycache__ directory itself because we + >>> # want it shown in the output (otherwise, we could have omitted it) + >>> automatic_discard_rule_example( # doctest: +ELLIPSIS + ... (".../foo.py", False), + ... ".../__pycache__/", + ... ".../__pycache__/...", + ... ".../foo.pyc", + ... ".../foo.pyo", + ... ) + AutomaticDiscardRuleExample(...) + + Note: Even if `__pycache__` had been implicit, the result would have been the same. However, + the rendered example would not have shown the directory on its own. The use of `...` as + path names is useful for denoting "anywhere" or "anything". Though, there is nothing "magic" + about this name - it happens to be allowed as a path name (unlike `.` or `..`). + + These examples can be seen via `debputy plugin show automatic-discard-rules <name-here>`. + + :param content: The content of the example. Each element can be either a path definition or + a tuple of a path definition followed by a verdict (boolean). Each provided path definition + describes the paths to be presented in the example. Implicit paths such as parent + directories will be created but not shown in the example. Therefore, if a directory is + relevant to the example, be sure to explicitly list it. + + The verdict associated with a path determines whether the path should be discarded (when + True) or kept (when False). When a path is not explicitly associated with a verdict, the + verdict is assumed to be discarded (True). + :param example_description: An optional description displayed together with the example. + :return: An opaque data structure containing the example. + """ + example = [] + for d in content: + if not isinstance(d, tuple): + pd = d + verdict = True + else: + pd, verdict = d + + path_def = as_path_def(pd) + example.append((path_def, verdict)) + + if not example: + raise ValueError("At least one path must be given for an example") + + return AutomaticDiscardRuleExample( + tuple(example), + description=example_description, + ) + + +@dataclasses.dataclass(slots=True, frozen=True) +class PluginProvidedPackageProcessor: + processor_id: str + applies_to_package_types: FrozenSet[str] + package_processor: PackageProcessor + dependencies: FrozenSet[Tuple[str, str]] + plugin_metadata: DebputyPluginMetadata + + def applies_to(self, binary_package: BinaryPackage) -> bool: + return binary_package.package_type in self.applies_to_package_types + + @property + def dependency_id(self) -> Tuple[str, str]: + return self.plugin_metadata.plugin_name, self.processor_id + + def run_package_processor( + self, + fs_root: "VirtualPath", + unused: None, + context: "PackageProcessingContext", + ) -> None: + self.package_processor(fs_root, unused, context) + + +@dataclasses.dataclass(slots=True, frozen=True) +class PluginProvidedDiscardRule: + name: str + plugin_metadata: DebputyPluginMetadata + discard_check: Callable[[VirtualPath], bool] + reference_documentation: Optional[str] + examples: Sequence[AutomaticDiscardRuleExample] = tuple() + + def should_discard(self, path: VirtualPath) -> bool: + return self.discard_check(path) + + +@dataclasses.dataclass(slots=True, frozen=True) +class ServiceManagerDetails: + service_manager: str + service_detector: "ServiceDetector" + service_integrator: "ServiceIntegrator" + plugin_metadata: DebputyPluginMetadata + + +ReferenceValue = TypedDict( + "ReferenceValue", + { + "description": str, + }, +) + + +def _reference_data_value( + *, + description: str, +) -> ReferenceValue: + return { + "description": description, + } + + +KnownPackagingFileCategories = Literal[ + "generated", + "generic-template", + "ppf-file", + "ppf-control-file", + "maint-config", + "pkg-metadata", + "pkg-helper-config", + "testing", + "lint-config", +] +KNOWN_PACKAGING_FILE_CATEGORY_DESCRIPTIONS: Mapping[ + KnownPackagingFileCategories, ReferenceValue +] = { + "generated": _reference_data_value( + description="The file is (likely) generated from another file" + ), + "generic-template": _reference_data_value( + description="The file is (likely) a generic template that generates a known packaging file. While the" + " file is annotated as if it was the target file, the file might uses a custom template" + " language inside it." + ), + "ppf-file": _reference_data_value( + description="Packager provided file to be installed on the file system - usually as-is." + " When `install-pattern` or `install-path` are provided, this is where the file is installed." + ), + "ppf-control-file": _reference_data_value( + description="Packager provided file that becomes a control file - possible after processing. " + " If `install-pattern` or `install-path` are provided, they denote where the is placed" + " (generally, this will be of the form `DEBIAN/<name>`)" + ), + "maint-config": _reference_data_value( + description="Maintenance configuration for a specific tool that the maintainer uses (tool / style preferences)" + ), + "pkg-metadata": _reference_data_value( + description="The file is related to standard package metadata (usually documented in Debian Policy)" + ), + "pkg-helper-config": _reference_data_value( + description="The file is packaging helper configuration or instruction file" + ), + "testing": _reference_data_value( + description="The file is related to automated testing (autopkgtests, salsa/gitlab CI)." + ), + "lint-config": _reference_data_value( + description="The file is related to a linter (such as overrides for false-positives or style preferences)" + ), +} + +KnownPackagingConfigFeature = Literal[ + "dh-filearray", + "dh-filedoublearray", + "dh-hash-subst", + "dh-dollar-subst", + "dh-glob", + "dh-partial-glob", + "dh-late-glob", + "dh-glob-after-execute", + "dh-executable-config", + "dh-custom-format", + "dh-file-list", + "dh-install-list", + "dh-install-list-dest-dir-like-dh_install", + "dh-install-list-fixed-dest-dir", + "dh-fixed-dest-dir", + "dh-exec-rename", + "dh-docs-only", +] + +KNOWN_PACKAGING_FILE_CONFIG_FEATURE_DESCRIPTION: Mapping[ + KnownPackagingConfigFeature, ReferenceValue +] = { + "dh-filearray": _reference_data_value( + description="The file will be read as a list of space/newline separated tokens", + ), + "dh-filedoublearray": _reference_data_value( + description="Each line in the file will be read as a list of space-separated tokens", + ), + "dh-hash-subst": _reference_data_value( + description="Supports debhelper #PACKAGE# style substitutions (udebs often excluded)", + ), + "dh-dollar-subst": _reference_data_value( + description="Supports debhelper ${PACKAGE} style substitutions (usually requires compat 13+)", + ), + "dh-glob": _reference_data_value( + description="Supports standard debhelper globing", + ), + "dh-partial-glob": _reference_data_value( + description="Supports standard debhelper globing but only to a subset of the values (implies dh-late-glob)", + ), + "dh-late-glob": _reference_data_value( + description="Globbing is done separately instead of using the built-in function", + ), + "dh-glob-after-execute": _reference_data_value( + description="When the dh config file is executable, the generated output will be subject to globbing", + ), + "dh-executable-config": _reference_data_value( + description="If marked executable, debhelper will execute the file and read its output", + ), + "dh-custom-format": _reference_data_value( + description="The dh tool will or may have a custom parser for this file", + ), + "dh-file-list": _reference_data_value( + description="The dh file contains a list of paths to be processed", + ), + "dh-install-list": _reference_data_value( + description="The dh file contains a list of paths/globs to be installed but the tool specific knowledge" + " required to understand the file cannot be conveyed via this interface.", + ), + "dh-install-list-dest-dir-like-dh_install": _reference_data_value( + description="The dh file is processed similar to dh_install (notably dest-dir handling derived" + " from the path or the last token on the line)", + ), + "dh-install-list-fixed-dest-dir": _reference_data_value( + description="The dh file is an install list and the dest-dir is always the same for all patterns" + " (when `install-pattern` or `install-path` are provided, they identify the directory - not the file location)", + ), + "dh-exec-rename": _reference_data_value( + description="When `dh-exec` is the interpreter of this dh config file, its renaming (=>) feature can be" + " requested/used", + ), + "dh-docs-only": _reference_data_value( + description="The dh config file is used for documentation only. Implicit <!nodocs> Build-Profiles support", + ), +} + +CONFIG_FEATURE_ALIASES: Dict[ + KnownPackagingConfigFeature, List[Tuple[KnownPackagingConfigFeature, int]] +] = { + "dh-filearray": [ + ("dh-filearray", 0), + ("dh-executable-config", 9), + ("dh-dollar-subst", 13), + ], + "dh-filedoublearray": [ + ("dh-filedoublearray", 0), + ("dh-executable-config", 9), + ("dh-dollar-subst", 13), + ], +} + + +def _implies( + features: List[KnownPackagingConfigFeature], + seen: Set[KnownPackagingConfigFeature], + implying: Sequence[KnownPackagingConfigFeature], + implied: KnownPackagingConfigFeature, +) -> None: + if implied in seen: + return + if all(f in seen for f in implying): + seen.add(implied) + features.append(implied) + + +def expand_known_packaging_config_features( + compat_level: int, + features: List[KnownPackagingConfigFeature], +) -> List[KnownPackagingConfigFeature]: + final_features: List[KnownPackagingConfigFeature] = [] + seen = set() + for feature in features: + expanded = CONFIG_FEATURE_ALIASES.get(feature) + if not expanded: + expanded = [(feature, 0)] + for v, c in expanded: + if compat_level < c or v in seen: + continue + seen.add(v) + final_features.append(v) + if "dh-glob" in seen and "dh-late-glob" in seen: + final_features.remove("dh-glob") + + _implies(final_features, seen, ["dh-partial-glob"], "dh-late-glob") + _implies( + final_features, + seen, + ["dh-late-glob", "dh-executable-config"], + "dh-glob-after-execute", + ) + return sorted(final_features) + + +class InstallPatternDHCompatRule(DebputyParsedContent): + install_pattern: NotRequired[str] + add_config_features: NotRequired[List[KnownPackagingConfigFeature]] + starting_with_compat_level: NotRequired[int] + + +class KnownPackagingFileInfo(DebputyParsedContent): + # Exposed directly in the JSON plugin parsing; be careful with changes + path: NotRequired[str] + pkgfile: NotRequired[str] + detection_method: NotRequired[Literal["path", "dh.pkgfile"]] + file_categories: NotRequired[List[KnownPackagingFileCategories]] + documentation_uris: NotRequired[List[str]] + debputy_cmd_templates: NotRequired[List[List[str]]] + debhelper_commands: NotRequired[List[str]] + config_features: NotRequired[List[KnownPackagingConfigFeature]] + install_pattern: NotRequired[str] + dh_compat_rules: NotRequired[List[InstallPatternDHCompatRule]] + default_priority: NotRequired[int] + post_formatting_rewrite: NotRequired[Literal["period-to-underscore"]] + packageless_is_fallback_for_all_packages: NotRequired[bool] + + +@dataclasses.dataclass(slots=True) +class PluginProvidedKnownPackagingFile: + info: KnownPackagingFileInfo + detection_method: Literal["path", "dh.pkgfile"] + detection_value: str + plugin_metadata: DebputyPluginMetadata + + +@dataclasses.dataclass(slots=True, frozen=True) +class PluginProvidedTypeMapping: + mapped_type: TypeMapping[Any, Any] + reference_documentation: Optional[TypeMappingDocumentation] + plugin_metadata: DebputyPluginMetadata + + +class PackageDataTable: + def __init__(self, package_data_table: Mapping[str, "BinaryPackageData"]) -> None: + self._package_data_table = package_data_table + # This is enabled for metadata-detectors. But it is deliberate not enabled for package processors, + # because it is not clear how it should interact with dependencies. For metadata-detectors, things + # read-only and there are no dependencies, so we cannot "get them wrong". + self.enable_cross_package_checks = False + + def __iter__(self) -> Iterator["BinaryPackageData"]: + return iter(self._package_data_table.values()) + + def __getitem__(self, item: str) -> "BinaryPackageData": + return self._package_data_table[item] + + def __contains__(self, item: str) -> bool: + return item in self._package_data_table + + +class PackageProcessingContextProvider(PackageProcessingContext): + __slots__ = ( + "_manifest", + "_binary_package", + "_related_udeb_package", + "_package_data_table", + "_cross_check_cache", + ) + + def __init__( + self, + manifest: "HighLevelManifest", + binary_package: BinaryPackage, + related_udeb_package: Optional[BinaryPackage], + package_data_table: PackageDataTable, + ) -> None: + self._manifest = manifest + self._binary_package = binary_package + self._related_udeb_package = related_udeb_package + self._package_data_table = ref(package_data_table) + self._cross_check_cache: Optional[ + Sequence[Tuple[BinaryPackage, "VirtualPath"]] + ] = None + + def _package_state_for( + self, + package: BinaryPackage, + ) -> "PackageTransformationDefinition": + return self._manifest.package_state_for(package.name) + + def _package_version_for( + self, + package: BinaryPackage, + ) -> str: + package_state = self._package_state_for(package) + version = package_state.binary_version + if version is not None: + return version + return self._manifest.source_version( + include_binnmu_version=not package.is_arch_all + ) + + @property + def binary_package(self) -> BinaryPackage: + return self._binary_package + + @property + def related_udeb_package(self) -> Optional[BinaryPackage]: + return self._related_udeb_package + + @property + def binary_package_version(self) -> str: + return self._package_version_for(self._binary_package) + + @property + def related_udeb_package_version(self) -> Optional[str]: + udeb = self._related_udeb_package + if udeb is None: + return None + return self._package_version_for(udeb) + + def accessible_package_roots(self) -> Iterable[Tuple[BinaryPackage, "VirtualPath"]]: + package_table = self._package_data_table() + if package_table is None: + raise ReferenceError( + "Internal error: package_table was garbage collected too early" + ) + if not package_table.enable_cross_package_checks: + raise PluginAPIViolationError( + "Cross package content checks are not available at this time." + ) + cache = self._cross_check_cache + if cache is None: + matches = [] + pkg = self.binary_package + for pkg_data in package_table: + if pkg_data.binary_package.name == pkg.name: + continue + res = package_cross_check_precheck(pkg, pkg_data.binary_package) + if not res[0]: + continue + matches.append((pkg_data.binary_package, pkg_data.fs_root)) + cache = tuple(matches) if matches else tuple() + self._cross_check_cache = cache + return cache + + +@dataclasses.dataclass(slots=True, frozen=True) +class PluginProvidedTrigger: + dpkg_trigger_type: DpkgTriggerType + dpkg_trigger_target: str + provider: DebputyPluginMetadata + provider_source_id: str + + def serialized_format(self) -> str: + return f"{self.dpkg_trigger_type} {self.dpkg_trigger_target}" |