diff options
Diffstat (limited to 'src/debputy/highlevel_manifest.py')
-rw-r--r-- | src/debputy/highlevel_manifest.py | 1608 |
1 files changed, 1608 insertions, 0 deletions
diff --git a/src/debputy/highlevel_manifest.py b/src/debputy/highlevel_manifest.py new file mode 100644 index 0000000..bae5cdb --- /dev/null +++ b/src/debputy/highlevel_manifest.py @@ -0,0 +1,1608 @@ +import dataclasses +import functools +import os +import textwrap +from contextlib import suppress +from dataclasses import dataclass, field +from typing import ( + List, + Dict, + Iterable, + Mapping, + Any, + Union, + Optional, + TypeVar, + Generic, + cast, + Set, + Tuple, + Sequence, + FrozenSet, +) + +from debian.debian_support import DpkgArchTable +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap, CommentedSeq + +from ._deb_options_profiles import DebBuildOptionsAndProfiles +from ._manifest_constants import * +from .architecture_support import DpkgArchitectureBuildProcessValuesTable +from .builtin_manifest_rules import builtin_mode_normalization_rules +from .debhelper_emulation import ( + dhe_dbgsym_root_dir, + assert_no_dbgsym_migration, + read_dbgsym_file, +) +from .exceptions import DebputySubstitutionError +from .filesystem_scan import FSPath, FSRootDir, FSROOverlay +from .installations import ( + InstallRule, + SourcePathMatcher, + PathAlreadyInstalledOrDiscardedError, + NoMatchForInstallPatternError, + InstallRuleContext, + BinaryPackageInstallRuleContext, + InstallSearchDirContext, + SearchDir, +) +from .intermediate_manifest import TarMember, PathType, IntermediateManifest +from .maintscript_snippet import ( + DpkgMaintscriptHelperCommand, + MaintscriptSnippetContainer, +) +from .manifest_conditions import ConditionContext +from .manifest_parser.base_types import FileSystemMatchRule, FileSystemExactMatchRule +from .manifest_parser.util import AttributePath +from .packager_provided_files import PackagerProvidedFile +from .packages import BinaryPackage, SourcePackage +from .plugin.api.impl import BinaryCtrlAccessorProviderCreator +from .plugin.api.impl_types import ( + PackageProcessingContextProvider, + PackageDataTable, +) +from .plugin.api.feature_set import PluginProvidedFeatureSet +from .plugin.api.spec import FlushableSubstvars, VirtualPath +from .substitution import Substitution +from .transformation_rules import ( + TransformationRule, + ModeNormalizationTransformationRule, + NormalizeShebangLineTransformation, +) +from .util import ( + _error, + _warn, + debian_policy_normalize_symlink_target, + generated_content_dir, + _info, +) + +MANIFEST_YAML = YAML() + + +@dataclass(slots=True) +class DbgsymInfo: + dbgsym_fs_root: FSPath + dbgsym_ids: List[str] + + +@dataclass(slots=True, frozen=True) +class BinaryPackageData: + source_package: SourcePackage + binary_package: BinaryPackage + binary_staging_root_dir: str + control_output_dir: Optional[str] + fs_root: FSPath + substvars: FlushableSubstvars + package_metadata_context: PackageProcessingContextProvider + ctrl_creator: BinaryCtrlAccessorProviderCreator + dbgsym_info: DbgsymInfo + + +@dataclass(slots=True) +class PackageTransformationDefinition: + binary_package: BinaryPackage + substitution: Substitution + is_auto_generated_package: bool + binary_version: Optional[str] = None + search_dirs: Optional[List[FileSystemExactMatchRule]] = None + dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field( + default_factory=list + ) + maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field( + default_factory=dict + ) + transformations: List[TransformationRule] = field(default_factory=list) + reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field( + default_factory=dict + ) + install_rules: List[InstallRule] = field(default_factory=list) + + +def _path_to_tar_member( + path: FSPath, + clamp_mtime_to: int, +) -> TarMember: + mtime = float(clamp_mtime_to) + owner, uid, group, gid = path.tar_owner_info + mode = path.mode + + if path.has_fs_path: + mtime = min(mtime, path.mtime) + + if path.is_dir: + path_type = PathType.DIRECTORY + elif path.is_file: + # TODO: someday we will need to deal with hardlinks and it might appear here. + path_type = PathType.FILE + elif path.is_symlink: + # Special-case that we resolve immediately (since we need to normalize the target anyway) + link_target = debian_policy_normalize_symlink_target( + path.path, + path.readlink(), + ) + return TarMember.virtual_path( + path.tar_path, + PathType.SYMLINK, + mtime, + link_target=link_target, + # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set + # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. + mode=0o0777, + owner=owner, + uid=uid, + group=group, + gid=gid, + ) + else: + assert not path.is_symlink + raise AssertionError( + f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" + ) + + if not path.has_fs_path: + assert not path.is_file + return TarMember.virtual_path( + path.tar_path, + path_type, + mtime, + mode=mode, + owner=owner, + uid=uid, + group=group, + gid=gid, + ) + may_steal_fs_path = path._can_replace_inline + return TarMember.from_file( + path.tar_path, + path.fs_path, + mode=mode, + uid=uid, + owner=owner, + gid=gid, + group=group, + path_type=path_type, + path_mtime=mtime, + clamp_mtime_to=clamp_mtime_to, + may_steal_fs_path=may_steal_fs_path, + ) + + +def _generate_intermediate_manifest( + fs_root: FSPath, + clamp_mtime_to: int, +) -> Iterable[TarMember]: + symlinks = [] + for path in fs_root.all_paths(): + tar_member = _path_to_tar_member(path, clamp_mtime_to) + if tar_member.path_type == PathType.SYMLINK: + symlinks.append(tar_member) + continue + yield tar_member + yield from symlinks + + +ST = TypeVar("ST") +T = TypeVar("T") + + +class AbstractYAMLSubStore(Generic[ST]): + def __init__( + self, + parent_store: Any, + parent_key: Optional[Union[int, str]], + store: Optional[ST] = None, + ) -> None: + if parent_store is not None and parent_key is not None: + try: + from_parent_store = parent_store[parent_key] + except (KeyError, IndexError): + from_parent_store = None + if ( + store is not None + and from_parent_store is not None + and store is not parent_store + ): + raise ValueError( + "Store is provided but is not the one already in the parent store" + ) + if store is None: + store = from_parent_store + self._parent_store = parent_store + self._parent_key = parent_key + self._is_detached = ( + parent_key is None or parent_store is None or parent_key not in parent_store + ) + assert self._is_detached or store is not None + if store is None: + store = self._create_new_instance() + self._store: ST = store + + def _create_new_instance(self) -> ST: + raise NotImplementedError + + def create_definition_if_missing(self) -> None: + if self._is_detached: + self.create_definition() + + def create_definition(self) -> None: + if not self._is_detached: + raise RuntimeError("Definition is already present") + parent_store = self._parent_store + if parent_store is None: + raise RuntimeError( + f"Definition is not attached to any parent!? ({self.__class__.__name__})" + ) + if isinstance(parent_store, list): + assert self._parent_key is None + self._parent_key = len(parent_store) + self._parent_store.append(self._store) + else: + parent_store[self._parent_key] = self._store + self._is_detached = False + + def remove_definition(self) -> None: + self._ensure_attached() + del self._parent_store[self._parent_key] + if isinstance(self._parent_store, list): + self._parent_key = None + self._is_detached = True + + def _ensure_attached(self) -> None: + if self._is_detached: + raise RuntimeError("The definition has been removed!") + + +class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]): + def _create_new_instance(self) -> List[T]: + return CommentedSeq() + + +class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]): + def _create_new_instance(self) -> Dict[str, T]: + return CommentedMap() + + +class MutableCondition: + @classmethod + def arch_matches(cls, arch_filter: str) -> CommentedMap: + return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) + + @classmethod + def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: + return CommentedMap( + {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} + ) + + +class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): + @classmethod + def new_symlink( + cls, link_path: str, link_target: str, condition: Optional[Any] + ) -> "MutableYAMLSymlink": + inner = { + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, + } + content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} + if condition is not None: + inner["when"] = condition + return cls(None, None, store=CommentedMap(content)) + + @property + def symlink_path(self) -> str: + return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH + ] + + @symlink_path.setter + def symlink_path(self, path: str) -> None: + self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH + ] = path + + @property + def symlink_target(self) -> Optional[str]: + return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET + ] + + @symlink_target.setter + def symlink_target(self, target: str) -> None: + self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ + MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET + ] = target + + +class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): + @classmethod + def rm_conffile( + cls, + conffile: str, + prior_to_version: Optional[str], + owning_package: Optional[str], + ) -> "MutableYAMLConffileManagementItem": + r = cls( + None, + None, + store=CommentedMap( + { + MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( + {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} + ) + } + ), + ) + r.prior_to_version = prior_to_version + r.owning_package = owning_package + return r + + @classmethod + def mv_conffile( + cls, + old_conffile: str, + new_conffile: str, + prior_to_version: Optional[str], + owning_package: Optional[str], + ) -> "MutableYAMLConffileManagementItem": + r = cls( + None, + None, + store=CommentedMap( + { + MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( + { + MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, + MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, + } + ) + } + ), + ) + r.prior_to_version = prior_to_version + r.owning_package = owning_package + return r + + @property + def _container(self) -> Dict[str, Any]: + assert len(self._store) == 1 + return next(iter(self._store.values())) + + @property + def command(self) -> str: + assert len(self._store) == 1 + return next(iter(self._store)) + + @property + def obsolete_conffile(self) -> str: + if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: + return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] + assert self.command == MK_CONFFILE_MANAGEMENT_RENAME + return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] + + @obsolete_conffile.setter + def obsolete_conffile(self, value: str) -> None: + if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: + self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value + else: + assert self.command == MK_CONFFILE_MANAGEMENT_RENAME + self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value + + @property + def new_conffile(self) -> str: + if self.command != MK_CONFFILE_MANAGEMENT_RENAME: + raise TypeError( + f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." + f" This is a {self.command}" + ) + return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] + + @new_conffile.setter + def new_conffile(self, value: str) -> None: + if self.command != MK_CONFFILE_MANAGEMENT_RENAME: + raise TypeError( + f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." + f" This is a {self.command}" + ) + self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value + + @property + def prior_to_version(self) -> Optional[str]: + return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) + + @prior_to_version.setter + def prior_to_version(self, value: Optional[str]) -> None: + if value is None: + try: + del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] + except KeyError: + pass + else: + self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value + + @property + def owning_package(self) -> Optional[str]: + return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] + + @owning_package.setter + def owning_package(self, value: Optional[str]) -> None: + if value is None: + try: + del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] + except KeyError: + pass + else: + self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value + + +class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): + def _list_store( + self, key, *, create_if_absent: bool = False + ) -> Optional[List[Dict[str, Any]]]: + if self._is_detached or key not in self._store: + if create_if_absent: + return None + self.create_definition_if_missing() + self._store[key] = [] + return self._store[key] + + def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: + parent_store = self._list_store(key, create_if_absent=True) + assert parent_store is not None + if not item._is_detached or ( + item._parent_store is not None and item._parent_store is not parent_store + ): + raise RuntimeError( + "Item is already attached or associated with a different container" + ) + item._parent_store = parent_store + item.create_definition() + + def add_symlink(self, symlink: MutableYAMLSymlink) -> None: + self._insert_item(MK_TRANSFORMATIONS, symlink) + + def symlinks(self) -> Iterable[MutableYAMLSymlink]: + store = self._list_store(MK_TRANSFORMATIONS) + if store is None: + return + for i in range(len(store)): + d = store[i] + if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: + yield MutableYAMLSymlink(store, i) + + def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: + store = self._list_store(MK_CONFFILE_MANAGEMENT) + if store is None: + return + yield from ( + MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) + ) + + def add_conffile_management( + self, conffile_management_item: MutableYAMLConffileManagementItem + ) -> None: + self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) + + +class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): + @property + def _container(self) -> Dict[str, Any]: + assert len(self._store) == 1 + return next(iter(self._store.values())) + + @property + def into(self) -> Optional[List[str]]: + v = self._container[MK_INSTALLATIONS_INSTALL_INTO] + if v is None: + return None + if isinstance(v, str): + return [v] + return v + + @into.setter + def into(self, new_value: Optional[Union[str, List[str]]]) -> None: + if new_value is None: + with suppress(KeyError): + del self._container[MK_INSTALLATIONS_INSTALL_INTO] + return + if isinstance(new_value, str): + self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value + return + new_list = CommentedSeq(new_value) + self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list + + @property + def when(self) -> Optional[Union[str, Mapping[str, Any]]]: + return self._container[MK_CONDITION_WHEN] + + @when.setter + def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None: + if new_value is None: + with suppress(KeyError): + del self._container[MK_CONDITION_WHEN] + return + if isinstance(new_value, str): + self._container[MK_CONDITION_WHEN] = new_value + return + new_map = CommentedMap(new_value) + self._container[MK_CONDITION_WHEN] = new_map + + @classmethod + def install_dest( + cls, + sources: Union[str, List[str]], + into: Optional[Union[str, List[str]]], + *, + dest_dir: Optional[str] = None, + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstall": + k = MK_INSTALLATIONS_INSTALL_SOURCES + if isinstance(sources, str): + k = MK_INSTALLATIONS_INSTALL_SOURCE + r = MutableYAMLInstallRuleInstall( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL: CommentedMap( + { + k: sources, + } + ) + } + ), + ) + r.dest_dir = dest_dir + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def multi_dest_install( + cls, + sources: Union[str, List[str]], + dest_dirs: Sequence[str], + into: Optional[Union[str, List[str]]], + *, + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstall": + k = MK_INSTALLATIONS_INSTALL_SOURCES + if isinstance(sources, str): + k = MK_INSTALLATIONS_INSTALL_SOURCE + r = MutableYAMLInstallRuleInstall( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( + { + k: sources, + "dest-dirs": dest_dirs, + } + ) + } + ), + ) + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def install_as( + cls, + source: str, + install_as: str, + into: Optional[Union[str, List[str]]], + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstall": + r = MutableYAMLInstallRuleInstall( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL: CommentedMap( + { + MK_INSTALLATIONS_INSTALL_SOURCE: source, + MK_INSTALLATIONS_INSTALL_AS: install_as, + } + ) + } + ), + ) + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def install_doc_as( + cls, + source: str, + install_as: str, + into: Optional[Union[str, List[str]]], + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstall": + r = MutableYAMLInstallRuleInstall( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( + { + MK_INSTALLATIONS_INSTALL_SOURCE: source, + MK_INSTALLATIONS_INSTALL_AS: install_as, + } + ) + } + ), + ) + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def install_docs( + cls, + sources: Union[str, List[str]], + into: Optional[Union[str, List[str]]], + *, + dest_dir: Optional[str] = None, + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstall": + k = MK_INSTALLATIONS_INSTALL_SOURCES + if isinstance(sources, str): + k = MK_INSTALLATIONS_INSTALL_SOURCE + r = MutableYAMLInstallRuleInstall( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( + { + k: sources, + } + ) + } + ), + ) + r.into = into + r.dest_dir = dest_dir + if when is not None: + r.when = when + return r + + @classmethod + def install_examples( + cls, + sources: Union[str, List[str]], + into: Optional[Union[str, List[str]]], + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleInstallExamples": + k = MK_INSTALLATIONS_INSTALL_SOURCES + if isinstance(sources, str): + k = MK_INSTALLATIONS_INSTALL_SOURCE + r = MutableYAMLInstallRuleInstallExamples( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( + { + k: sources, + } + ) + } + ), + ) + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def install_man( + cls, + sources: Union[str, List[str]], + into: Optional[Union[str, List[str]]], + language: Optional[str], + when: Optional[Union[str, Mapping[str, Any]]] = None, + ) -> "MutableYAMLInstallRuleMan": + k = MK_INSTALLATIONS_INSTALL_SOURCES + if isinstance(sources, str): + k = MK_INSTALLATIONS_INSTALL_SOURCE + r = MutableYAMLInstallRuleMan( + None, + None, + store=CommentedMap( + { + MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( + { + k: sources, + } + ) + } + ), + ) + r.language = language + r.into = into + if when is not None: + r.when = when + return r + + @classmethod + def discard( + cls, + sources: Union[str, List[str]], + ) -> "MutableYAMLInstallRuleDiscard": + return MutableYAMLInstallRuleDiscard( + None, + None, + store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), + ) + + +class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): + pass + + +class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): + @property + def language(self) -> Optional[str]: + return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] + + @language.setter + def language(self, new_value: Optional[str]) -> None: + if new_value is not None: + self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value + return + with suppress(KeyError): + del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] + + +class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): + pass + + +class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): + @property + def sources(self) -> List[str]: + v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] + if isinstance(v, str): + return [v] + return v + + @sources.setter + def sources(self, new_value: Union[str, List[str]]) -> None: + if isinstance(new_value, str): + self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value + return + new_list = CommentedSeq(new_value) + self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list + + @property + def dest_dir(self) -> Optional[str]: + return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) + + @dest_dir.setter + def dest_dir(self, new_value: Optional[str]) -> None: + if new_value is not None and self.dest_as is not None: + raise ValueError( + f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' + f' "{MK_INSTALLATIONS_INSTALL_AS}"' + ) + if new_value is not None: + self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value + else: + with suppress(KeyError): + del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] + + @property + def dest_as(self) -> Optional[str]: + return self._container.get(MK_INSTALLATIONS_INSTALL_AS) + + @dest_as.setter + def dest_as(self, new_value: Optional[str]) -> None: + if new_value is not None: + if self.dest_dir is not None: + raise ValueError( + f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' + f' "{MK_INSTALLATIONS_INSTALL_AS}"' + ) + + sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] + if isinstance(sources, list): + if len(sources) != 1: + raise ValueError( + f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' + f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' + ) + self.sources = sources[0] + self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value + else: + with suppress(KeyError): + del self._container[MK_INSTALLATIONS_INSTALL_AS] + + +class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): + def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: + parent_store = self._store + if not install_rule._is_detached or ( + install_rule._parent_store is not None + and install_rule._parent_store is not parent_store + ): + raise RuntimeError( + "Item is already attached or associated with a different container" + ) + self.create_definition_if_missing() + install_rule._parent_store = parent_store + install_rule.create_definition() + + def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: + parent_store = self._store + for install_rule in install_rules: + if not install_rule._is_detached or ( + install_rule._parent_store is not None + and install_rule._parent_store is not parent_store + ): + raise RuntimeError( + "Item is already attached or associated with a different container" + ) + self.create_definition_if_missing() + install_rule._parent_store = parent_store + install_rule.create_definition() + + +class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): + @property + def variables(self) -> Dict[str, Any]: + return self._store + + def __setitem__(self, key: str, value: Any) -> None: + self._store[key] = value + self.create_definition_if_missing() + + +class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): + def manifest_variables( + self, *, create_if_absent: bool = True + ) -> MutableYAMLManifestVariables: + d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) + if create_if_absent: + d.create_definition_if_missing() + return d + + +class MutableYAMLManifest: + def __init__(self, store: Any) -> None: + self._store = store + + @classmethod + def empty_manifest(cls) -> "MutableYAMLManifest": + return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) + + @property + def manifest_version(self) -> str: + return self._store[MK_MANIFEST_VERSION] + + @manifest_version.setter + def manifest_version(self, version: str) -> None: + if version not in SUPPORTED_MANIFEST_VERSIONS: + raise ValueError("Unsupported version") + self._store[MK_MANIFEST_VERSION] = version + + def installations( + self, + *, + create_if_absent: bool = True, + ) -> MutableYAMLInstallationsDefinition: + d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) + if create_if_absent: + d.create_definition_if_missing() + return d + + def manifest_definitions( + self, + *, + create_if_absent: bool = True, + ) -> MutableYAMLManifestDefinitions: + d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) + if create_if_absent: + d.create_definition_if_missing() + return d + + def package( + self, name: str, *, create_if_absent: bool = True + ) -> MutableYAMLPackageDefinition: + if MK_PACKAGES not in self._store: + self._store[MK_PACKAGES] = CommentedMap() + packages_store = self._store[MK_PACKAGES] + package = packages_store.get(name) + if package is None: + if not create_if_absent: + raise KeyError(name) + assert packages_store is not None + d = MutableYAMLPackageDefinition(packages_store, name) + d.create_definition() + else: + d = MutableYAMLPackageDefinition(packages_store, name) + return d + + def write_to(self, fd) -> None: + MANIFEST_YAML.dump(self._store, fd) + + +def _describe_missing_path(entry: VirtualPath) -> str: + if entry.is_dir: + return f"{entry.fs_path}/ (empty directory; possible integration point)" + if entry.is_symlink: + target = os.readlink(entry.fs_path) + return f"{entry.fs_path} (symlink; links to {target})" + if entry.is_file: + return f"{entry.fs_path} (file)" + return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" + + +def _detect_missing_installations( + path_matcher: SourcePathMatcher, + search_dir: VirtualPath, +) -> None: + if not os.path.isdir(search_dir.fs_path): + return + missing = list(path_matcher.detect_missing(search_dir)) + if not missing: + return + + _warn( + f"The following paths were present in {search_dir.fs_path}, but not installed (nor explicitly discarded)." + ) + _warn("") + for entry in missing: + desc = _describe_missing_path(entry) + _warn(f" * {desc}") + _warn("") + + excl = textwrap.dedent( + """\ + - discard: "*" + """ + ) + + _error( + "Please review the list and add either install rules or exclusions to `installations` in" + " debian/debputy.manifest. If you do not need any of these paths, add the following to the" + f" end of your 'installations`:\n\n{excl}\n" + ) + + +def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: + used_discard_rules = path_matcher.used_auto_discard_rules + # Discard rules can match and then be overridden. In that case, they appear + # but have 0 matches. + if not sum((len(v) for v in used_discard_rules.values()), 0): + return + _info("The following automatic discard rules were triggered:") + example_path: Optional[str] = None + for rule in sorted(used_discard_rules): + for fs_path in sorted(used_discard_rules[rule]): + if example_path is None: + example_path = fs_path + _info(f" * {rule} -> {fs_path}") + assert example_path is not None + _info("") + _info( + "Note that some of these may have been overruled. The overrule detection logic is not" + ) + _info("100% reliable.") + _info("") + _info( + "You can overrule an automatic discard rule by explicitly listing the path. As an example:" + ) + _info(" installations:") + _info(" - install:") + _info(f" source: {example_path}") + + +def _install_everything_from_source_dir_if_present( + dctrl_bin: BinaryPackage, + substitution: Substitution, + path_matcher: SourcePathMatcher, + install_rule_context: InstallRuleContext, + source_condition_context: ConditionContext, + source_dir: VirtualPath, + *, + into_dir: Optional[VirtualPath] = None, +) -> None: + attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] + pkg_set = frozenset([dctrl_bin]) + install_rule = InstallRule.install_dest( + [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], + None, + pkg_set, + f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", + None, + ) + pkg_search_dir: Tuple[SearchDir] = ( + SearchDir( + source_dir, + pkg_set, + ), + ) + replacements = { + "search_dirs": pkg_search_dir, + } + if into_dir is not None: + binary_package_contexts = dict(install_rule_context.binary_package_contexts) + updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) + binary_package_contexts[dctrl_bin.name] = updated + replacements["binary_package_contexts"] = binary_package_contexts + + fake_install_rule_context = install_rule_context.replace(**replacements) + try: + install_rule.perform_install( + path_matcher, + fake_install_rule_context, + source_condition_context, + ) + except ( + NoMatchForInstallPatternError, + PathAlreadyInstalledOrDiscardedError, + ): + # Empty directory or everything excluded by default; ignore the error + pass + + +class HighLevelManifest: + def __init__( + self, + manifest_path: str, + mutable_manifest: Optional[MutableYAMLManifest], + install_rules: Optional[List[InstallRule]], + source_package: SourcePackage, + binary_packages: Mapping[str, BinaryPackage], + substitution: Substitution, + package_transformations: Mapping[str, PackageTransformationDefinition], + dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, + dpkg_arch_query_table: DpkgArchTable, + build_env: DebBuildOptionsAndProfiles, + plugin_provided_feature_set: PluginProvidedFeatureSet, + debian_dir: VirtualPath, + ) -> None: + self.manifest_path = manifest_path + self.mutable_manifest = mutable_manifest + self._install_rules = install_rules + self._source_package = source_package + self._binary_packages = binary_packages + self.substitution = substitution + self.package_transformations = package_transformations + self._dpkg_architecture_variables = dpkg_architecture_variables + self._dpkg_arch_query_table = dpkg_arch_query_table + self._build_env = build_env + self._used_for: Set[str] = set() + self._plugin_provided_feature_set = plugin_provided_feature_set + self._debian_dir = debian_dir + + def source_version(self, include_binnmu_version: bool = True) -> str: + # TODO: There should an easier way to determine the source version; really. + version_var = "{{DEB_VERSION}}" + if not include_binnmu_version: + version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" + try: + return self.substitution.substitute( + version_var, "internal (resolve version)" + ) + except DebputySubstitutionError as e: + raise AssertionError(f"Could not resolve {version_var}") from e + + @property + def debian_dir(self) -> VirtualPath: + return self._debian_dir + + @property + def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: + return self._dpkg_architecture_variables + + @property + def build_env(self) -> DebBuildOptionsAndProfiles: + return self._build_env + + @property + def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: + return self._plugin_provided_feature_set + + @property + def active_packages(self) -> Iterable[BinaryPackage]: + yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) + + @property + def all_packages(self) -> Iterable[BinaryPackage]: + yield from self._binary_packages.values() + + def package_state_for(self, package: str) -> PackageTransformationDefinition: + return self.package_transformations[package] + + def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: + name = package.name + # If it is not a -doc package, then docs should be installed + # under its own package name. + if not name.endswith("-doc"): + return package + name = name[:-4] + main_package = self._binary_packages.get(name) + if main_package: + return main_package + if name.startswith("lib"): + dev_pkg = self._binary_packages.get(f"{name}-dev") + if dev_pkg: + return dev_pkg + + # If we found no better match; default to the doc package itself. + return package + + def perform_installations( + self, + *, + install_request_context: Optional[InstallSearchDirContext] = None, + enable_manifest_installation_feature: bool = True, + ) -> PackageDataTable: + package_data_dict = {} + package_data_table = PackageDataTable(package_data_dict) + if install_request_context is None: + + @functools.lru_cache(None) + def _as_path(fs_path: str) -> VirtualPath: + return FSROOverlay.create_root_dir(".", fs_path) + + dtmp_dir = _as_path("debian/tmp") + source_root_dir = _as_path(".") + into = frozenset(self._binary_packages.values()) + default_search_dirs = [dtmp_dir] + per_package_search_dirs = { + t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] + for t in self.package_transformations.values() + if t.search_dirs is not None + } + search_dirs = _determine_search_dir_order( + per_package_search_dirs, + into, + default_search_dirs, + source_root_dir, + ) + check_for_uninstalled_dirs = tuple( + s.search_dir + for s in search_dirs + if s.search_dir.fs_path != source_root_dir.fs_path + ) + _present_installation_dirs(search_dirs, check_for_uninstalled_dirs, into) + else: + dtmp_dir = None + search_dirs = install_request_context.search_dirs + into = frozenset(self._binary_packages.values()) + seen = set() + for search_dir in search_dirs: + seen.update(search_dir.applies_to) + + missing = into - seen + if missing: + names = ", ".join(p.name for p in missing) + raise ValueError( + f"The following package(s) had no search dirs: {names}." + " (Generally, the source root would be applicable to all packages)" + ) + extra_names = seen - into + if extra_names: + names = ", ".join(p.name for p in extra_names) + raise ValueError( + f"The install_request_context referenced the following unknown package(s): {names}" + ) + + check_for_uninstalled_dirs = ( + install_request_context.check_for_uninstalled_dirs + ) + + install_rule_context = InstallRuleContext(search_dirs) + + if ( + enable_manifest_installation_feature + and self._install_rules is None + and dtmp_dir is not None + and os.path.isdir(dtmp_dir.fs_path) + ): + msg = ( + "The build system appears to have provided the output of upstream build system's" + " install in debian/tmp. However, these are no provisions for debputy to install" + " any of that into any of the debian packages listed in debian/control." + " To avoid accidentally creating empty packages, debputy will insist that you " + " explicitly define an empty installation definition if you did not want to " + " install any of those files even though they have been provided." + ' Example: "installations: []"' + ) + _error(msg) + elif ( + not enable_manifest_installation_feature and self._install_rules is not None + ): + _error( + f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." + f" Please remove or comment out the `installations` keyword." + ) + + for dctrl_bin in self.all_packages: + package = dctrl_bin.name + doc_main_package = self._detect_doc_main_package_for(dctrl_bin) + + install_rule_context[package] = BinaryPackageInstallRuleContext( + dctrl_bin, + FSRootDir(), + doc_main_package, + ) + + if enable_manifest_installation_feature: + discard_rules = list( + self.plugin_provided_feature_set.auto_discard_rules.values() + ) + else: + discard_rules = [ + self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] + ] + path_matcher = SourcePathMatcher(discard_rules) + + source_condition_context = ConditionContext( + binary_package=None, + substitution=self.substitution, + build_env=self._build_env, + dpkg_architecture_variables=self._dpkg_architecture_variables, + dpkg_arch_query_table=self._dpkg_arch_query_table, + ) + + for dctrl_bin in self.active_packages: + package = dctrl_bin.name + if install_request_context: + build_system_staging_dir = install_request_context.debian_pkg_dirs.get( + package + ) + else: + build_system_staging_dir_fs_path = os.path.join("debian", package) + if os.path.isdir(build_system_staging_dir_fs_path): + build_system_staging_dir = FSROOverlay.create_root_dir( + ".", + build_system_staging_dir_fs_path, + ) + else: + build_system_staging_dir = None + + if build_system_staging_dir is not None: + _install_everything_from_source_dir_if_present( + dctrl_bin, + self.substitution, + path_matcher, + install_rule_context, + source_condition_context, + build_system_staging_dir, + ) + + if self._install_rules: + # FIXME: Check that every install rule remains used after transformations have run. + # What we want to check is transformations do not exclude everything from an install + # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string + # match. + for install_rule in self._install_rules: + install_rule.perform_install( + path_matcher, + install_rule_context, + source_condition_context, + ) + + if enable_manifest_installation_feature: + for search_dir in check_for_uninstalled_dirs: + _detect_missing_installations(path_matcher, search_dir) + + for dctrl_bin in self.all_packages: + package = dctrl_bin.name + binary_install_rule_context = install_rule_context[package] + build_system_pkg_staging_dir = os.path.join("debian", package) + fs_root = binary_install_rule_context.fs_root + + context = self.package_transformations[package] + if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: + for special_install_rule in context.install_rules: + special_install_rule.perform_install( + path_matcher, + install_rule_context, + source_condition_context, + ) + + if dctrl_bin.should_be_acted_on: + self.apply_fs_transformations(package, fs_root) + substvars_file = f"debian/{package}.substvars" + substvars = FlushableSubstvars.load_from_path( + substvars_file, missing_ok=True + ) + # We do not want to touch the substvars file (non-clean rebuild contamination) + substvars.substvars_path = None + control_output_dir = generated_content_dir( + package=dctrl_bin, subdir_key="DEBIAN" + ) + else: + substvars = FlushableSubstvars() + control_output_dir = None + + udeb_package = self._binary_packages.get(f"{package}-udeb") + if udeb_package and not udeb_package.is_udeb: + udeb_package = None + + package_metadata_context = PackageProcessingContextProvider( + self, + dctrl_bin, + udeb_package, + package_data_table, + # FIXME: source_package + ) + + ctrl_creator = BinaryCtrlAccessorProviderCreator( + package_metadata_context, + substvars, + context.maintscript_snippets, + context.substitution, + ) + + if not enable_manifest_installation_feature: + assert_no_dbgsym_migration(dctrl_bin) + dh_dbgsym_root_fs = FSROOverlay.create_root_dir( + "", dhe_dbgsym_root_dir(dctrl_bin) + ) + dbgsym_root_fs = FSRootDir() + _install_everything_from_source_dir_if_present( + dctrl_bin, + self.substitution, + path_matcher, + install_rule_context, + source_condition_context, + dh_dbgsym_root_fs, + into_dir=dbgsym_root_fs, + ) + dbgsym_build_ids = read_dbgsym_file(dctrl_bin) + dbgsym_info = DbgsymInfo( + dbgsym_root_fs, + dbgsym_build_ids, + ) + else: + dbgsym_info = DbgsymInfo( + FSRootDir(), + [], + ) + + package_data_dict[package] = BinaryPackageData( + self._source_package, + dctrl_bin, + build_system_pkg_staging_dir, + control_output_dir, + fs_root, + substvars, + package_metadata_context, + ctrl_creator, + dbgsym_info, + ) + + _list_automatic_discard_rules(path_matcher) + + return package_data_table + + def condition_context( + self, binary_package: Optional[Union[BinaryPackage, str]] + ) -> ConditionContext: + if binary_package is None: + return ConditionContext( + binary_package=None, + substitution=self.substitution, + build_env=self._build_env, + dpkg_architecture_variables=self._dpkg_architecture_variables, + dpkg_arch_query_table=self._dpkg_arch_query_table, + ) + if not isinstance(binary_package, str): + binary_package = binary_package.name + + package_transformation = self.package_transformations[binary_package] + return ConditionContext( + binary_package=package_transformation.binary_package, + substitution=package_transformation.substitution, + build_env=self._build_env, + dpkg_architecture_variables=self._dpkg_architecture_variables, + dpkg_arch_query_table=self._dpkg_arch_query_table, + ) + + def apply_fs_transformations( + self, + package: str, + fs_root: FSPath, + ) -> None: + if package in self._used_for: + raise ValueError( + f"data.tar contents for {package} has already been finalized!?" + ) + if package not in self.package_transformations: + raise ValueError( + f'The package "{package}" was not relevant for the manifest!?' + ) + package_transformation = self.package_transformations[package] + condition_context = ConditionContext( + binary_package=package_transformation.binary_package, + substitution=package_transformation.substitution, + build_env=self._build_env, + dpkg_architecture_variables=self._dpkg_architecture_variables, + dpkg_arch_query_table=self._dpkg_arch_query_table, + ) + norm_rules = list( + builtin_mode_normalization_rules( + self._dpkg_architecture_variables, + package_transformation.binary_package, + package_transformation.substitution, + ) + ) + norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) + norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) + for transformation in package_transformation.transformations: + transformation.transform_file_system(fs_root, condition_context) + interpreter_normalization = NormalizeShebangLineTransformation() + interpreter_normalization.transform_file_system(fs_root, condition_context) + + def finalize_data_tar_contents( + self, + package: str, + fs_root: FSPath, + clamp_mtime_to: int, + ) -> IntermediateManifest: + if package in self._used_for: + raise ValueError( + f"data.tar contents for {package} has already been finalized!?" + ) + if package not in self.package_transformations: + raise ValueError( + f'The package "{package}" was not relevant for the manifest!?' + ) + self._used_for.add(package) + + # At this point, there so be no further mutations to the file system (because the will not + # be present in the intermediate manifest) + cast("FSRootDir", fs_root).is_read_write = False + + intermediate_manifest = list( + _generate_intermediate_manifest( + fs_root, + clamp_mtime_to, + ) + ) + return intermediate_manifest + + def apply_to_binary_staging_directory( + self, + package: str, + fs_root: FSPath, + clamp_mtime_to: int, + ) -> IntermediateManifest: + self.apply_fs_transformations(package, fs_root) + return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) + + +@dataclasses.dataclass(slots=True) +class SearchDirOrderState: + search_dir: VirtualPath + applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field( + default_factory=set + ) + after: Set[str] = dataclasses.field(default_factory=set) + + +def _present_installation_dirs( + search_dirs: Sequence[SearchDir], + checked_missing_dirs: Sequence[VirtualPath], + all_pkgs: FrozenSet[BinaryPackage], +) -> None: + _info("The following directories are considered search dirs (in order):") + max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) + for search_dir in search_dirs: + applies_to = "" + if search_dir.applies_to < all_pkgs: + names = ", ".join(p.name for p in search_dir.applies_to) + applies_to = f" [only applicable to: {names}]" + remark = "" + if not os.path.isdir(search_dir.search_dir.fs_path): + remark = " (skipped; absent)" + _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") + + if checked_missing_dirs: + _info('The following directories are considered for "not-installed" paths;') + for d in checked_missing_dirs: + remark = "" + if not os.path.isdir(d.fs_path): + remark = " (skipped; absent)" + _info(f" * {d.fs_path:{max_len}}{remark}") + + +def _determine_search_dir_order( + requested: Mapping[BinaryPackage, List[VirtualPath]], + all_pkgs: FrozenSet[BinaryPackage], + default_search_dirs: List[VirtualPath], + source_root: VirtualPath, +) -> Sequence[SearchDir]: + search_dir_table = {} + assert requested.keys() <= all_pkgs + for pkg in all_pkgs: + paths = requested.get(pkg, default_search_dirs) + previous_search_dir: Optional[SearchDirOrderState] = None + for path in paths: + try: + search_dir_state = search_dir_table[path.fs_path] + except KeyError: + search_dir_state = SearchDirOrderState(path) + search_dir_table[path.fs_path] = search_dir_state + search_dir_state.applies_to.add(pkg) + if previous_search_dir is not None: + search_dir_state.after.add(previous_search_dir.search_dir.fs_path) + previous_search_dir = search_dir_state + + search_dirs_in_order = [] + released = set() + remaining = set() + for search_dir_state in search_dir_table.values(): + if not (search_dir_state.after <= released): + remaining.add(search_dir_state.search_dir.fs_path) + continue + search_dirs_in_order.append(search_dir_state) + released.add(search_dir_state.search_dir.fs_path) + + while remaining: + current_released = len(released) + for fs_path in remaining: + search_dir_state = search_dir_table[fs_path] + if not search_dir_state.after.issubset(released): + remaining.add(search_dir_state.search_dir.fs_path) + continue + search_dirs_in_order.append(search_dir_state) + released.add(search_dir_state.search_dir.fs_path) + + if current_released == len(released): + names = ", ".join(remaining) + _error( + f"There is a circular dependency (somewhere) between the search dirs: {names}." + " Note that the search directories across all packages have to be ordered (and the" + " source root should generally be last)" + ) + remaining -= released + + search_dirs_in_order.append( + SearchDirOrderState( + source_root, + all_pkgs, + ) + ) + + return tuple( + # Avoid duplicating all_pkgs + SearchDir( + s.search_dir, + frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, + ) + for s in search_dirs_in_order + ) |