summaryrefslogtreecommitdiffstats
path: root/src/debputy/installations.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/installations.py')
-rw-r--r--src/debputy/installations.py1162
1 files changed, 1162 insertions, 0 deletions
diff --git a/src/debputy/installations.py b/src/debputy/installations.py
new file mode 100644
index 0000000..2310cfa
--- /dev/null
+++ b/src/debputy/installations.py
@@ -0,0 +1,1162 @@
+import collections
+import dataclasses
+import os.path
+import re
+from enum import IntEnum
+from typing import (
+ List,
+ Dict,
+ FrozenSet,
+ Callable,
+ Union,
+ Iterator,
+ Tuple,
+ Set,
+ Sequence,
+ Optional,
+ Iterable,
+ TYPE_CHECKING,
+ cast,
+ Any,
+ Mapping,
+)
+
+from debputy.exceptions import DebputyRuntimeError
+from debputy.filesystem_scan import FSPath
+from debputy.manifest_conditions import (
+ ConditionContext,
+ ManifestCondition,
+ _BUILD_DOCS_BDO,
+)
+from debputy.manifest_parser.base_types import (
+ FileSystemMatchRule,
+ FileSystemExactMatchRule,
+ DebputyDispatchableType,
+)
+from debputy.packages import BinaryPackage
+from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
+from debputy.substitution import Substitution
+from debputy.util import _error, _warn
+
+if TYPE_CHECKING:
+ from debputy.packager_provided_files import PackagerProvidedFile
+ from debputy.plugin.api import VirtualPath
+ from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
+
+
+_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
+_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
+_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
+_MAN_REAL_SECTION = re.compile(r"^(\d+)")
+_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
+MAN_GUESS_LANG_FROM_PATH = re.compile(
+ r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
+)
+MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
+
+
+class InstallRuleError(DebputyRuntimeError):
+ pass
+
+
+class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
+ @property
+ def path(self) -> str:
+ return cast("str", self.args[0])
+
+ @property
+ def into(self) -> FrozenSet[BinaryPackage]:
+ return cast("FrozenSet[BinaryPackage]", self.args[1])
+
+ @property
+ def definition_source(self) -> str:
+ return cast("str", self.args[2])
+
+
+class ExactPathMatchTwiceError(InstallRuleError):
+ @property
+ def path(self) -> str:
+ return cast("str", self.args[1])
+
+ @property
+ def into(self) -> BinaryPackage:
+ return cast("BinaryPackage", self.args[2])
+
+ @property
+ def definition_source(self) -> str:
+ return cast("str", self.args[3])
+
+
+class NoMatchForInstallPatternError(InstallRuleError):
+ @property
+ def pattern(self) -> str:
+ return cast("str", self.args[1])
+
+ @property
+ def search_dirs(self) -> Sequence["SearchDir"]:
+ return cast("Sequence[SearchDir]", self.args[2])
+
+ @property
+ def definition_source(self) -> str:
+ return cast("str", self.args[3])
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class SearchDir:
+ search_dir: "VirtualPath"
+ applies_to: FrozenSet[BinaryPackage]
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class BinaryPackageInstallRuleContext:
+ binary_package: BinaryPackage
+ fs_root: FSPath
+ doc_main_package: BinaryPackage
+
+ def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
+ return dataclasses.replace(self, **changes)
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class InstallSearchDirContext:
+ search_dirs: Sequence[SearchDir]
+ check_for_uninstalled_dirs: Sequence["VirtualPath"]
+ # TODO: Support search dirs per-package
+ debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
+ default_factory=dict
+ )
+
+
+@dataclasses.dataclass(slots=True)
+class InstallRuleContext:
+ # TODO: Search dirs should be per-package
+ search_dirs: Sequence[SearchDir]
+ binary_package_contexts: Dict[str, BinaryPackageInstallRuleContext] = (
+ dataclasses.field(default_factory=dict)
+ )
+
+ def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
+ return self.binary_package_contexts[item]
+
+ def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
+ self.binary_package_contexts[key] = value
+
+ def replace(self, **changes: Any) -> "InstallRuleContext":
+ return dataclasses.replace(self, **changes)
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class PathMatch:
+ path: "VirtualPath"
+ search_dir: "VirtualPath"
+ is_exact_match: bool
+ into: FrozenSet[BinaryPackage]
+
+
+class DiscardState(IntEnum):
+ UNCHECKED = 0
+ NOT_DISCARDED = 1
+ DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
+ DISCARDED_BY_MANIFEST_RULE = 3
+
+
+def _determine_manpage_section(
+ match_rule: PathMatch,
+ provided_section: Optional[int],
+ definition_source: str,
+) -> Optional[str]:
+ section = str(provided_section) if provided_section is not None else None
+ if section is None:
+ detected_section = None
+ with open(match_rule.path.fs_path, "r") as fd:
+ for line in fd:
+ if not line.startswith((".TH", ".Dt")):
+ continue
+
+ m = _MAN_DT_LINE.match(line)
+ if not m:
+ m = _MAN_TH_LINE.match(line)
+ if not m:
+ continue
+ detected_section = m.group(1)
+ if "." in detected_section:
+ _warn(
+ f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
+ f" (detected via {definition_source}): It looks too much like a version"
+ )
+ detected_section = None
+ break
+ if detected_section is None:
+ m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
+ if m:
+ detected_section = m.group(1)
+ section = detected_section
+
+ return section
+
+
+def _determine_manpage_real_section(
+ match_rule: PathMatch,
+ section: Optional[str],
+ definition_source: str,
+) -> int:
+ real_section = None
+ if section is not None:
+ m = _MAN_REAL_SECTION.match(section)
+ if m:
+ real_section = int(m.group(1))
+ if real_section is None or real_section < 0 or real_section > 9:
+ if real_section is not None:
+ _warn(
+ f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
+ f" which is not a valid section (must be between 1 and 9 incl.)"
+ )
+ _error(
+ f"Could not determine the section for {match_rule.path.fs_path} automatically. The manpage"
+ f" was detected via {definition_source}. Consider using `section: <number>` to"
+ " explicitly declare the section. Keep in mind that it applies to all manpages for that"
+ " rule and you may have to split the rule into two for this reason."
+ )
+ return real_section
+
+
+def _determine_manpage_language(
+ match_rule: PathMatch,
+ provided_language: Optional[str],
+) -> Optional[str]:
+ if provided_language is not None:
+ if provided_language not in ("derive-from-basename", "derive-from-path"):
+ return provided_language if provided_language != "C" else None
+ if provided_language == "derive-from-basename":
+ m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
+ if m is None:
+ return None
+ return m.group(1)
+ # Fall-through for derive-from-path case
+ m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
+ if m is None:
+ return None
+ return m.group(1)
+
+
+def _dest_path_for_manpage(
+ provided_section: Optional[int],
+ provided_language: Optional[str],
+ definition_source: str,
+) -> Callable[["PathMatch"], str]:
+ def _manpage_dest_path(match_rule: PathMatch) -> str:
+ inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
+ section = _determine_manpage_section(
+ match_rule, provided_section, definition_source
+ )
+ real_section = _determine_manpage_real_section(
+ match_rule, section, definition_source
+ )
+ assert section is not None
+ language = _determine_manpage_language(match_rule, provided_language)
+ if language is None:
+ maybe_language = ""
+ else:
+ maybe_language = f"{language}/"
+ lang_suffix = f".{language}"
+ if inst_basename.endswith(lang_suffix):
+ inst_basename = inst_basename[: -len(lang_suffix)]
+
+ return (
+ f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
+ )
+
+ return _manpage_dest_path
+
+
+class SourcePathMatcher:
+ def __init__(self, auto_discard_rules: List["PluginProvidedDiscardRule"]) -> None:
+ self._already_matched: Dict[
+ str,
+ Tuple[FrozenSet[BinaryPackage], str],
+ ] = {}
+ self._exact_match_request: Set[Tuple[str, str]] = set()
+ self._discarded: Dict[str, DiscardState] = {}
+ self._auto_discard_rules = auto_discard_rules
+ self.used_auto_discard_rules: Dict[str, Set[str]] = collections.defaultdict(set)
+
+ def is_reserved(self, path: "VirtualPath") -> bool:
+ fs_path = path.fs_path
+ if fs_path in self._already_matched:
+ return True
+ result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
+ if result == DiscardState.UNCHECKED:
+ result = self._check_plugin_provided_exclude_state_for(path)
+ if result == DiscardState.NOT_DISCARDED:
+ return False
+
+ return True
+
+ def exclude(self, path: str) -> None:
+ self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
+
+ def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
+ for dr in self._auto_discard_rules:
+ verdict = dr.should_discard(path)
+ if verdict:
+ self.used_auto_discard_rules[dr.name].add(path.fs_path)
+ return True
+ return False
+
+ def _check_plugin_provided_exclude_state_for(
+ self,
+ path: "VirtualPath",
+ ) -> DiscardState:
+ cache_misses = []
+ current_path = path
+ while True:
+ fs_path = current_path.fs_path
+ exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
+ if exclude_state != DiscardState.UNCHECKED:
+ verdict = exclude_state
+ break
+ cache_misses.append(fs_path)
+ if self._run_plugin_provided_discard_rules_on(current_path):
+ verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
+ break
+ # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
+ # be excluded without the files in it triggering the rule).
+ parent_dir = current_path.parent_dir
+ if not parent_dir:
+ verdict = DiscardState.NOT_DISCARDED
+ break
+ current_path = parent_dir
+ if cache_misses:
+ for p in cache_misses:
+ self._discarded[p] = verdict
+ return verdict
+
+ def may_match(
+ self,
+ match: PathMatch,
+ *,
+ is_exact_match: bool = False,
+ ) -> Tuple[FrozenSet[BinaryPackage], bool]:
+ m = self._already_matched.get(match.path.fs_path)
+ if m:
+ return m[0], False
+ current_path = match.path.fs_path
+ discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
+
+ if discard_state == DiscardState.UNCHECKED:
+ discard_state = self._check_plugin_provided_exclude_state_for(match.path)
+
+ assert discard_state is not None and discard_state != DiscardState.UNCHECKED
+
+ is_discarded = discard_state != DiscardState.NOT_DISCARDED
+ if (
+ is_exact_match
+ and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
+ ):
+ is_discarded = False
+ return frozenset(), is_discarded
+
+ def reserve(
+ self,
+ path: "VirtualPath",
+ reserved_by: FrozenSet[BinaryPackage],
+ definition_source: str,
+ *,
+ is_exact_match: bool = False,
+ ) -> None:
+ fs_path = path.fs_path
+ self._already_matched[fs_path] = reserved_by, definition_source
+ if not is_exact_match:
+ return
+ for pkg in reserved_by:
+ m_key = (pkg.name, fs_path)
+ self._exact_match_request.add(m_key)
+ try:
+ del self._discarded[fs_path]
+ except KeyError:
+ pass
+ for discarded_paths in self.used_auto_discard_rules.values():
+ discarded_paths.discard(fs_path)
+
+ def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
+ stack = list(search_dir.iterdir)
+ while stack:
+ m = stack.pop()
+ if m.is_dir:
+ s_len = len(stack)
+ stack.extend(m.iterdir)
+
+ if s_len == len(stack) and not self.is_reserved(m):
+ # "Explicitly" empty dir
+ yield m
+ elif not self.is_reserved(m):
+ yield m
+
+ def find_and_reserve_all_matches(
+ self,
+ match_rule: MatchRule,
+ search_dirs: Sequence[SearchDir],
+ dir_only_match: bool,
+ match_filter: Optional[Callable[["VirtualPath"], bool]],
+ reserved_by: FrozenSet[BinaryPackage],
+ definition_source: str,
+ ) -> Tuple[List[PathMatch], Tuple[int, ...]]:
+ matched = []
+ already_installed_paths = 0
+ already_excluded_paths = 0
+ glob_expand = False if isinstance(match_rule, ExactFileSystemPath) else True
+
+ for match in _resolve_path(
+ match_rule,
+ search_dirs,
+ dir_only_match,
+ match_filter,
+ reserved_by,
+ ):
+ installed_into, excluded = self.may_match(
+ match, is_exact_match=not glob_expand
+ )
+ if installed_into:
+ if glob_expand:
+ already_installed_paths += 1
+ continue
+ packages = ", ".join(p.name for p in installed_into)
+ raise PathAlreadyInstalledOrDiscardedError(
+ f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
+ f" The definition that triggered this issue is {definition_source}.",
+ match,
+ installed_into,
+ definition_source,
+ )
+ if excluded:
+ if glob_expand:
+ already_excluded_paths += 1
+ continue
+ raise PathAlreadyInstalledOrDiscardedError(
+ f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
+ f" above the exclusion rule that excluded it. The definition that triggered this"
+ f" issue is {definition_source}.",
+ match,
+ installed_into,
+ definition_source,
+ )
+ if not glob_expand:
+ for pkg in match.into:
+ m_key = (pkg.name, match.path.fs_path)
+ if m_key in self._exact_match_request:
+ raise ExactPathMatchTwiceError(
+ f'The path "{match.path.fs_path}" (via exact match) has already been installed'
+ f" into {pkg.name}. The second installation triggered by {definition_source}",
+ match.path,
+ pkg,
+ definition_source,
+ )
+ self._exact_match_request.add(m_key)
+
+ if reserved_by:
+ self._already_matched[match.path.fs_path] = (
+ match.into,
+ definition_source,
+ )
+ else:
+ self.exclude(match.path.fs_path)
+ matched.append(match)
+ exclude_counts = already_installed_paths, already_excluded_paths
+ return matched, exclude_counts
+
+
+def _resolve_path(
+ match_rule: MatchRule,
+ search_dirs: Iterable["SearchDir"],
+ dir_only_match: bool,
+ match_filter: Optional[Callable[["VirtualPath"], bool]],
+ into: FrozenSet[BinaryPackage],
+) -> Iterator[PathMatch]:
+ missing_matches = set(into)
+ for sdir in search_dirs:
+ matched = False
+ if into and missing_matches.isdisjoint(sdir.applies_to):
+ # All the packages, where this search dir applies, already got a match
+ continue
+ applicable = sdir.applies_to & missing_matches
+ for matched_path in match_rule.finditer(
+ sdir.search_dir,
+ ignore_paths=match_filter,
+ ):
+ if dir_only_match and not matched_path.is_dir:
+ continue
+ if matched_path.parent_dir is None:
+ if match_rule is MATCH_ANYTHING:
+ continue
+ _error(
+ f"The pattern {match_rule.describe_match_short()} matched the root dir."
+ )
+ yield PathMatch(matched_path, sdir.search_dir, False, applicable)
+ matched = True
+ # continue; we want to match everything we can from this search directory.
+
+ if matched:
+ missing_matches -= applicable
+ if into and not missing_matches:
+ # For install rules, we can stop as soon as all packages had a match
+ # For discard rules, all search directories must be visited. Otherwise,
+ # you would have to repeat the discard rule once per search dir to be
+ # sure something is fully discarded
+ break
+
+
+def _resolve_dest_paths(
+ match: PathMatch,
+ dest_paths: Sequence[Tuple[str, bool]],
+ install_context: "InstallRuleContext",
+) -> Sequence[Tuple[str, "FSPath"]]:
+ dest_and_roots = []
+ for dest_path, dest_path_is_format in dest_paths:
+ if dest_path_is_format:
+ for pkg in match.into:
+ parent_dir = match.path.parent_dir
+ pkg_install_context = install_context[pkg.name]
+ fs_root = pkg_install_context.fs_root
+ dpath = dest_path.format(
+ basename=match.path.name,
+ dirname=parent_dir.path if parent_dir is not None else "",
+ package_name=pkg.name,
+ doc_main_package_name=pkg_install_context.doc_main_package.name,
+ )
+ if dpath.endswith("/"):
+ raise ValueError(
+ f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
+ f' with "/" ("{dest_path}"), which it must not!'
+ )
+ dest_and_roots.append((dpath, fs_root))
+ else:
+ if dest_path.endswith("/"):
+ raise ValueError(
+ f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
+ " which it must not!"
+ )
+ dest_and_roots.extend(
+ (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
+ )
+ return dest_and_roots
+
+
+def _resolve_matches(
+ matches: List[PathMatch],
+ dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
+ install_context: "InstallRuleContext",
+) -> Iterator[Tuple[PathMatch, Sequence[Tuple[str, "FSPath"]]]]:
+ if callable(dest_paths):
+ compute_dest_path = dest_paths
+ for match in matches:
+ dpath = compute_dest_path(match)
+ if dpath.endswith("/"):
+ raise ValueError(
+ f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
+ )
+ dest_and_roots = [
+ (dpath, install_context[pkg.name].fs_root) for pkg in match.into
+ ]
+ yield match, dest_and_roots
+ else:
+ for match in matches:
+ dest_and_roots = _resolve_dest_paths(
+ match,
+ dest_paths,
+ install_context,
+ )
+ yield match, dest_and_roots
+
+
+class InstallRule(DebputyDispatchableType):
+ __slots__ = (
+ "_already_matched",
+ "_exact_match_request",
+ "_condition",
+ "_match_filter",
+ "_definition_source",
+ )
+
+ def __init__(
+ self,
+ condition: Optional[ManifestCondition],
+ definition_source: str,
+ *,
+ match_filter: Optional[Callable[["VirtualPath"], bool]] = None,
+ ) -> None:
+ self._condition = condition
+ self._definition_source = definition_source
+ self._match_filter = match_filter
+
+ def _check_single_match(
+ self, source: FileSystemMatchRule, matches: List[PathMatch]
+ ) -> None:
+ seen_pkgs = set()
+ problem_pkgs = frozenset()
+ for m in matches:
+ problem_pkgs = seen_pkgs & m.into
+ if problem_pkgs:
+ break
+ seen_pkgs.update(problem_pkgs)
+ if problem_pkgs:
+ pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
+ _error(
+ f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
+ "However, it should matched exactly one item. Please tighten the pattern defined"
+ f" in {self._definition_source}"
+ )
+
+ def _match_pattern(
+ self,
+ path_matcher: SourcePathMatcher,
+ fs_match_rule: FileSystemMatchRule,
+ condition_context: ConditionContext,
+ search_dirs: Sequence[SearchDir],
+ into: FrozenSet[BinaryPackage],
+ ) -> List[PathMatch]:
+ (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches(
+ fs_match_rule.match_rule,
+ search_dirs,
+ fs_match_rule.raw_match_rule.endswith("/"),
+ self._match_filter,
+ into,
+ self._definition_source,
+ )
+
+ already_installed_paths, already_excluded_paths = exclude_counts
+
+ if into:
+ allow_empty_match = all(not p.should_be_acted_on for p in into)
+ else:
+ # discard rules must match provided at least one search dir exist. If none of them
+ # exist, then we assume the discard rule is for a package that will not be built
+ allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
+ if self._condition is not None and not self._condition.evaluate(
+ condition_context
+ ):
+ allow_empty_match = True
+
+ if not matched and not allow_empty_match:
+ search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
+ if already_excluded_paths and already_installed_paths:
+ total_paths = already_excluded_paths + already_installed_paths
+ msg = (
+ f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
+ f" {total_paths} path(s) already been matched previously either by install or"
+ f" exclude rules. If you wanted to install some of these paths into multiple"
+ f" packages, please tweak the definition that installed them to install them"
+ f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
+ f" If you wanted to install these paths and exclude rules are getting in your"
+ f" way, then please move this install rule before the exclusion rule that causes"
+ f" issue or, in case of built-in excludes, list the paths explicitly (without"
+ f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
+ f" {fs_match_rule.match_rule.describe_match_exact()}"
+ )
+ elif already_excluded_paths:
+ msg = (
+ f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
+ f" {already_excluded_paths} path(s) that have been excluded."
+ " If you wanted to install some of these paths, please move the install rule"
+ " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
+ f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
+ f" {fs_match_rule.match_rule.describe_match_exact()}"
+ )
+ elif already_installed_paths:
+ msg = (
+ f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
+ f" {already_installed_paths} path(s) already been matched previously."
+ " If you wanted to install some of these paths into multiple packages,"
+ f" please tweak the definition that installed them to install them into"
+ f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
+ f" Source for this issue is {self._definition_source}. Match rule:"
+ f" {fs_match_rule.match_rule.describe_match_exact()}"
+ )
+ else:
+ # TODO: Try harder to find the match and point out possible typos
+ msg = (
+ f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
+ f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
+ )
+ raise NoMatchForInstallPatternError(
+ msg,
+ fs_match_rule,
+ search_dirs,
+ self._definition_source,
+ )
+ return matched
+
+ def _install_matches(
+ self,
+ path_matcher: SourcePathMatcher,
+ matches: List[PathMatch],
+ dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
+ install_context: "InstallRuleContext",
+ into: FrozenSet[BinaryPackage],
+ condition_context: ConditionContext,
+ ) -> None:
+ if (
+ self._condition is not None
+ and not self._condition.evaluate(condition_context)
+ ) or not any(p.should_be_acted_on for p in into):
+ # Rule is disabled; skip all its actions - also allow empty matches
+ # for this particular case.
+ return
+
+ if not matches:
+ raise ValueError("matches must not be empty")
+
+ for match, dest_paths_and_roots in _resolve_matches(
+ matches,
+ dest_paths,
+ install_context,
+ ):
+ install_recursively_into_dirs = []
+ for dest, fs_root in dest_paths_and_roots:
+ dir_part, basename = os.path.split(dest)
+ # We do not associate these with the FS path. First off,
+ # it is complicated to do in most cases (indeed, debhelper
+ # does not preserve these directories either) and secondly,
+ # it is "only" mtime and mode - mostly irrelevant as the
+ # directory is 99.9% likely to be 0755 (we are talking
+ # directories like "/usr", "/usr/share").
+ dir_path = fs_root.mkdirs(dir_part)
+ existing_path = dir_path.get(basename)
+
+ if match.path.is_dir:
+ if existing_path is not None and not existing_path.is_dir:
+ existing_path.unlink()
+ existing_path = None
+ current_dir = existing_path
+
+ if current_dir is None:
+ current_dir = dir_path.mkdir(
+ basename, reference_path=match.path
+ )
+ install_recursively_into_dirs.append(current_dir)
+ else:
+ if existing_path is not None and existing_path.is_dir:
+ _error(
+ f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
+ f" and is a directory. This error was triggered via {self._definition_source}."
+ )
+
+ if match.path.is_symlink:
+ dir_path.add_symlink(
+ basename, match.path.readlink(), reference_path=match.path
+ )
+ else:
+ dir_path.insert_file_from_fs_path(
+ basename,
+ match.path.fs_path,
+ follow_symlinks=False,
+ use_fs_path_mode=True,
+ reference_path=match.path,
+ )
+ if install_recursively_into_dirs:
+ self._install_dir_recursively(
+ path_matcher, install_recursively_into_dirs, match, into
+ )
+
+ def _install_dir_recursively(
+ self,
+ path_matcher: SourcePathMatcher,
+ parent_dirs: Sequence[FSPath],
+ match: PathMatch,
+ into: FrozenSet[BinaryPackage],
+ ) -> None:
+ stack = [
+ (parent_dirs, e)
+ for e in match.path.iterdir
+ if not path_matcher.is_reserved(e)
+ ]
+
+ while stack:
+ current_dirs, dir_entry = stack.pop()
+ path_matcher.reserve(
+ dir_entry,
+ into,
+ self._definition_source,
+ is_exact_match=False,
+ )
+ if dir_entry.is_dir:
+ new_dirs = [
+ d.mkdir(dir_entry.name, reference_path=dir_entry)
+ for d in current_dirs
+ ]
+ stack.extend(
+ (new_dirs, de)
+ for de in dir_entry.iterdir
+ if not path_matcher.is_reserved(de)
+ )
+ elif dir_entry.is_symlink:
+ for current_dir in current_dirs:
+ current_dir.add_symlink(
+ dir_entry.name,
+ dir_entry.readlink(),
+ reference_path=dir_entry,
+ )
+ elif dir_entry.is_file:
+ for current_dir in current_dirs:
+ current_dir.insert_file_from_fs_path(
+ dir_entry.name,
+ dir_entry.fs_path,
+ use_fs_path_mode=True,
+ follow_symlinks=False,
+ reference_path=dir_entry,
+ )
+ else:
+ _error(
+ f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
+ )
+
+ def perform_install(
+ self,
+ path_matcher: SourcePathMatcher,
+ install_context: InstallRuleContext,
+ condition_context: ConditionContext,
+ ) -> None:
+ raise NotImplementedError
+
+ @classmethod
+ def install_as(
+ cls,
+ source: FileSystemMatchRule,
+ dest_path: str,
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ return GenericInstallationRule(
+ [source],
+ [(dest_path, False)],
+ into,
+ condition,
+ definition_source,
+ require_single_match=True,
+ )
+
+ @classmethod
+ def install_dest(
+ cls,
+ sources: Sequence[FileSystemMatchRule],
+ dest_dir: Optional[str],
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ if dest_dir is None:
+ dest_dir = "{dirname}/{basename}"
+ else:
+ dest_dir = os.path.join(dest_dir, "{basename}")
+ return GenericInstallationRule(
+ sources,
+ [(dest_dir, True)],
+ into,
+ condition,
+ definition_source,
+ )
+
+ @classmethod
+ def install_multi_as(
+ cls,
+ source: FileSystemMatchRule,
+ dest_paths: Sequence[str],
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ if len(dest_paths) < 2:
+ raise ValueError(
+ "Please use `install_as` when there is less than 2 dest path"
+ )
+ dps = tuple((dp, False) for dp in dest_paths)
+ return GenericInstallationRule(
+ [source],
+ dps,
+ into,
+ condition,
+ definition_source,
+ require_single_match=True,
+ )
+
+ @classmethod
+ def install_multi_dest(
+ cls,
+ sources: Sequence[FileSystemMatchRule],
+ dest_dirs: Sequence[str],
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ if len(dest_dirs) < 2:
+ raise ValueError(
+ "Please use `install_dest` when there is less than 2 dest dir"
+ )
+ dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
+ return GenericInstallationRule(
+ sources,
+ dest_paths,
+ into,
+ condition,
+ definition_source,
+ )
+
+ @classmethod
+ def install_doc(
+ cls,
+ sources: Sequence[FileSystemMatchRule],
+ dest_dir: Optional[str],
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ cond: ManifestCondition = _BUILD_DOCS_BDO
+ if condition is not None:
+ cond = ManifestCondition.all_of([cond, condition])
+ dest_path_is_format = False
+ if dest_dir is None:
+ dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
+ dest_path_is_format = True
+
+ return GenericInstallationRule(
+ sources,
+ [(dest_dir, dest_path_is_format)],
+ into,
+ cond,
+ definition_source,
+ )
+
+ @classmethod
+ def install_doc_as(
+ cls,
+ source: FileSystemMatchRule,
+ dest_path: str,
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ cond: ManifestCondition = _BUILD_DOCS_BDO
+ if condition is not None:
+ cond = ManifestCondition.all_of([cond, condition])
+
+ return GenericInstallationRule(
+ [source],
+ [(dest_path, False)],
+ into,
+ cond,
+ definition_source,
+ require_single_match=True,
+ )
+
+ @classmethod
+ def install_examples(
+ cls,
+ sources: Sequence[FileSystemMatchRule],
+ into: FrozenSet[BinaryPackage],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ cond: ManifestCondition = _BUILD_DOCS_BDO
+ if condition is not None:
+ cond = ManifestCondition.all_of([cond, condition])
+ return GenericInstallationRule(
+ sources,
+ [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
+ into,
+ cond,
+ definition_source,
+ )
+
+ @classmethod
+ def install_man(
+ cls,
+ sources: Sequence[FileSystemMatchRule],
+ into: FrozenSet[BinaryPackage],
+ section: Optional[int],
+ language: Optional[str],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> "InstallRule":
+ cond: ManifestCondition = _BUILD_DOCS_BDO
+ if condition is not None:
+ cond = ManifestCondition.all_of([cond, condition])
+
+ dest_path_computer = _dest_path_for_manpage(
+ section, language, definition_source
+ )
+
+ return GenericInstallationRule(
+ sources,
+ dest_path_computer,
+ into,
+ cond,
+ definition_source,
+ match_filter=lambda m: not m.is_file,
+ )
+
+ @classmethod
+ def discard_paths(
+ cls,
+ paths: Sequence[FileSystemMatchRule],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ *,
+ limit_to: Optional[Sequence[FileSystemExactMatchRule]] = None,
+ ) -> "InstallRule":
+ return DiscardRule(
+ paths,
+ condition,
+ tuple(limit_to) if limit_to is not None else tuple(),
+ definition_source,
+ )
+
+
+class PPFInstallRule(InstallRule):
+ __slots__ = (
+ "_ppfs",
+ "_substitution",
+ "_into",
+ )
+
+ def __init__(
+ self,
+ into: BinaryPackage,
+ substitution: Substitution,
+ ppfs: Sequence["PackagerProvidedFile"],
+ ) -> None:
+ super().__init__(
+ None,
+ "<built-in; PPF install rule>",
+ )
+ self._substitution = substitution
+ self._ppfs = ppfs
+ self._into = into
+
+ def perform_install(
+ self,
+ path_matcher: SourcePathMatcher,
+ install_context: InstallRuleContext,
+ condition_context: ConditionContext,
+ ) -> None:
+ binary_install_context = install_context[self._into.name]
+ fs_root = binary_install_context.fs_root
+ for ppf in self._ppfs:
+ source_path = ppf.path.fs_path
+ dest_dir, name = ppf.compute_dest()
+ dir_path = fs_root.mkdirs(dest_dir)
+
+ dir_path.insert_file_from_fs_path(
+ name,
+ source_path,
+ follow_symlinks=True,
+ use_fs_path_mode=False,
+ mode=ppf.definition.default_mode,
+ )
+
+
+class GenericInstallationRule(InstallRule):
+ __slots__ = (
+ "_sources",
+ "_into",
+ "_dest_paths",
+ "_require_single_match",
+ )
+
+ def __init__(
+ self,
+ sources: Sequence[FileSystemMatchRule],
+ dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
+ into: FrozenSet[BinaryPackage],
+ condition: Optional[ManifestCondition],
+ definition_source: str,
+ *,
+ require_single_match: bool = False,
+ match_filter: Optional[Callable[["VirtualPath"], bool]] = None,
+ ) -> None:
+ super().__init__(
+ condition,
+ definition_source,
+ match_filter=match_filter,
+ )
+ self._sources = sources
+ self._into = into
+ self._dest_paths = dest_paths
+ self._require_single_match = require_single_match
+ if self._require_single_match and len(sources) != 1:
+ raise ValueError("require_single_match implies sources must have len 1")
+
+ def perform_install(
+ self,
+ path_matcher: SourcePathMatcher,
+ install_context: InstallRuleContext,
+ condition_context: ConditionContext,
+ ) -> None:
+ for source in self._sources:
+ matches = self._match_pattern(
+ path_matcher,
+ source,
+ condition_context,
+ install_context.search_dirs,
+ self._into,
+ )
+ if self._require_single_match and len(matches) > 1:
+ self._check_single_match(source, matches)
+ self._install_matches(
+ path_matcher,
+ matches,
+ self._dest_paths,
+ install_context,
+ self._into,
+ condition_context,
+ )
+
+
+class DiscardRule(InstallRule):
+ __slots__ = ("_fs_match_rules", "_limit_to")
+
+ def __init__(
+ self,
+ fs_match_rules: Sequence[FileSystemMatchRule],
+ condition: Optional[ManifestCondition],
+ limit_to: Sequence[FileSystemExactMatchRule],
+ definition_source: str,
+ ) -> None:
+ super().__init__(condition, definition_source)
+ self._fs_match_rules = fs_match_rules
+ self._limit_to = limit_to
+
+ def perform_install(
+ self,
+ path_matcher: SourcePathMatcher,
+ install_context: InstallRuleContext,
+ condition_context: ConditionContext,
+ ) -> None:
+ into = frozenset()
+ limit_to = self._limit_to
+ if limit_to:
+ matches = {x.match_rule.path for x in limit_to}
+ search_dirs = tuple(
+ s
+ for s in install_context.search_dirs
+ if s.search_dir.fs_path in matches
+ )
+ if len(limit_to) != len(search_dirs):
+ matches.difference(s.search_dir.fs_path for s in search_dirs)
+ paths = ":".join(matches)
+ _error(
+ f"The discard rule defined at {self._definition_source} mentions the following"
+ f" search directories that were not known to debputy: {paths}."
+ " Either the search dir is missing somewhere else or it should be removed from"
+ " the discard rule."
+ )
+ else:
+ search_dirs = install_context.search_dirs
+
+ for fs_match_rule in self._fs_match_rules:
+ self._match_pattern(
+ path_matcher,
+ fs_match_rule,
+ condition_context,
+ search_dirs,
+ into,
+ )