summaryrefslogtreecommitdiffstats
path: root/src/debputy/transformation_rules.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/transformation_rules.py')
-rw-r--r--src/debputy/transformation_rules.py596
1 files changed, 596 insertions, 0 deletions
diff --git a/src/debputy/transformation_rules.py b/src/debputy/transformation_rules.py
new file mode 100644
index 0000000..8d9caae
--- /dev/null
+++ b/src/debputy/transformation_rules.py
@@ -0,0 +1,596 @@
+import dataclasses
+import os
+from typing import (
+ NoReturn,
+ Optional,
+ Callable,
+ Sequence,
+ Tuple,
+ List,
+ Literal,
+ Dict,
+ TypeVar,
+ cast,
+)
+
+from debputy.exceptions import (
+ DebputyRuntimeError,
+ PureVirtualPathError,
+ TestPathWithNonExistentFSPathError,
+)
+from debputy.filesystem_scan import FSPath
+from debputy.interpreter import (
+ extract_shebang_interpreter_from_file,
+)
+from debputy.manifest_conditions import ConditionContext, ManifestCondition
+from debputy.manifest_parser.base_types import (
+ FileSystemMode,
+ StaticFileSystemOwner,
+ StaticFileSystemGroup,
+ DebputyDispatchableType,
+)
+from debputy.manifest_parser.util import AttributePath
+from debputy.path_matcher import MatchRule
+from debputy.plugin.api import VirtualPath
+from debputy.plugin.debputy.types import DebputyCapability
+from debputy.util import _warn
+
+
+class TransformationRuntimeError(DebputyRuntimeError):
+ pass
+
+
+CreateSymlinkReplacementRule = Literal[
+ "error-if-exists",
+ "error-if-directory",
+ "abort-on-non-empty-directory",
+ "discard-existing",
+]
+
+
+VP = TypeVar("VP", bound=VirtualPath)
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class PreProvidedExclusion:
+ tag: str
+ description: str
+ pruner: Callable[[FSPath], None]
+
+
+class TransformationRule(DebputyDispatchableType):
+ __slots__ = ()
+
+ def transform_file_system(
+ self, fs_root: FSPath, condition_context: ConditionContext
+ ) -> None:
+ raise NotImplementedError
+
+ def _evaluate_condition(
+ self,
+ condition: Optional[ManifestCondition],
+ condition_context: ConditionContext,
+ result_if_condition_is_missing: bool = True,
+ ) -> bool:
+ if condition is None:
+ return result_if_condition_is_missing
+ return condition.evaluate(condition_context)
+
+ def _error(
+ self,
+ msg: str,
+ *,
+ caused_by: Optional[BaseException] = None,
+ ) -> NoReturn:
+ raise TransformationRuntimeError(msg) from caused_by
+
+ def _match_rule_had_no_matches(
+ self, match_rule: MatchRule, definition_source: str
+ ) -> NoReturn:
+ self._error(
+ f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did'
+ " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is"
+ " incorrect."
+ )
+
+ def _fs_path_as_dir(
+ self,
+ path: VP,
+ definition_source: str,
+ ) -> VP:
+ if path.is_dir:
+ return path
+ path_type = "file" if path.is_file else 'symlink/"special file system object"'
+ self._error(
+ f"The path {path.path} was expected to be a directory (or non-existing) due to"
+ f" {definition_source}. However that path existed and is a {path_type}."
+ f" You may need a `remove: {path.path}` prior to {definition_source} to"
+ " to make this transformation succeed."
+ )
+
+ def _ensure_is_directory(
+ self,
+ fs_root: FSPath,
+ path_to_directory: str,
+ definition_source: str,
+ ) -> FSPath:
+ current, missing_parts = fs_root.attempt_lookup(path_to_directory)
+ current = self._fs_path_as_dir(cast("FSPath", current), definition_source)
+ if missing_parts:
+ return current.mkdirs("/".join(missing_parts))
+ return current
+
+
+class RemoveTransformationRule(TransformationRule):
+ __slots__ = (
+ "_match_rules",
+ "_keep_empty_parent_dirs",
+ "_definition_source",
+ )
+
+ def __init__(
+ self,
+ match_rules: Sequence[MatchRule],
+ keep_empty_parent_dirs: bool,
+ definition_source: AttributePath,
+ ) -> None:
+ self._match_rules = match_rules
+ self._keep_empty_parent_dirs = keep_empty_parent_dirs
+ self._definition_source = definition_source.path
+
+ def transform_file_system(
+ self,
+ fs_root: FSPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ matched_any = False
+ for match_rule in self._match_rules:
+ # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a
+ # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52
+ matches = list(match_rule.finditer(fs_root))
+ for m in matches:
+ matched_any = True
+ parent = m.parent_dir
+ if parent is None:
+ self._error(
+ f"Cannot remove the root directory (triggered by {self._definition_source})"
+ )
+ m.unlink(recursive=True)
+ if not self._keep_empty_parent_dirs:
+ parent.prune_if_empty_dir()
+ # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures
+ if not matched_any:
+ self._match_rule_had_no_matches(match_rule, self._definition_source)
+
+
+class MoveTransformationRule(TransformationRule):
+ __slots__ = (
+ "_match_rule",
+ "_dest_path",
+ "_dest_is_dir",
+ "_definition_source",
+ "_condition",
+ )
+
+ def __init__(
+ self,
+ match_rule: MatchRule,
+ dest_path: str,
+ dest_is_dir: bool,
+ definition_source: AttributePath,
+ condition: Optional[ManifestCondition],
+ ) -> None:
+ self._match_rule = match_rule
+ self._dest_path = dest_path
+ self._dest_is_dir = dest_is_dir
+ self._definition_source = definition_source.path
+ self._condition = condition
+
+ def transform_file_system(
+ self, fs_root: FSPath, condition_context: ConditionContext
+ ) -> None:
+ if not self._evaluate_condition(self._condition, condition_context):
+ return
+ # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la)
+ matches = list(self._match_rule.finditer(fs_root))
+ if not matches:
+ self._match_rule_had_no_matches(self._match_rule, self._definition_source)
+
+ target_dir: Optional[VirtualPath]
+ if self._dest_is_dir:
+ target_dir = self._ensure_is_directory(
+ fs_root,
+ self._dest_path,
+ self._definition_source,
+ )
+ else:
+ dir_part, basename = os.path.split(self._dest_path)
+ target_parent_dir = self._ensure_is_directory(
+ fs_root,
+ dir_part,
+ self._definition_source,
+ )
+ target_dir = target_parent_dir.get(basename)
+
+ if target_dir is None or not target_dir.is_dir:
+ if len(matches) > 1:
+ self._error(
+ f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
+ f" (from: {self._definition_source}). Multiple paths matched the pattern and the"
+ " destination was not a directory. Either correct the pattern to only match ony source"
+ " OR define the destination to be a directory (E.g., add a trailing slash - example:"
+ f' "{self._dest_path}/")'
+ )
+ p = matches[0]
+ if p.path == self._dest_path:
+ self._error(
+ f"Error in {self._definition_source}, the source"
+ f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the"
+ " rename redundant!?"
+ )
+ p.parent_dir = target_parent_dir
+ p.name = basename
+ return
+
+ assert target_dir is not None and target_dir.is_dir
+ basenames: Dict[str, VirtualPath] = dict()
+ target_dir_path = target_dir.path
+
+ for m in matches:
+ if m.path == target_dir_path:
+ self._error(
+ f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}"
+ f"matched {self._dest_path} (among other), but it is not possible to copy a directory into"
+ " itself"
+ )
+ if m.name in basenames:
+ alt_path = basenames[m.name]
+ # We document "two *distinct*" paths. However, as the glob matches are written, it should not be
+ # possible for a *single* glob to match the same path twice.
+ assert alt_path is not m
+ self._error(
+ f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
+ f" (from: {self._definition_source}). Multiple paths matched the pattern had the"
+ f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the'
+ f" pattern, so it only matches one path with that basename to avoid this conflict."
+ )
+ existing = m.get(m.name)
+ if existing and existing.is_dir:
+ self._error(
+ f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
+ f" (from: {self._definition_source}). The pattern matched {m.path} which would replace"
+ f" the existing directory {existing.path}. If this replacement is intentional, then please"
+ f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)'
+ )
+ basenames[m.name] = m
+ m.parent_dir = target_dir
+
+
+class CreateSymlinkPathTransformationRule(TransformationRule):
+ __slots__ = (
+ "_link_dest",
+ "_link_target",
+ "_replacement_rule",
+ "_definition_source",
+ "_condition",
+ )
+
+ def __init__(
+ self,
+ link_target: str,
+ link_dest: str,
+ replacement_rule: CreateSymlinkReplacementRule,
+ definition_source: AttributePath,
+ condition: Optional[ManifestCondition],
+ ) -> None:
+ self._link_target = link_target
+ self._link_dest = link_dest
+ self._replacement_rule = replacement_rule
+ self._definition_source = definition_source.path
+ self._condition = condition
+
+ def transform_file_system(
+ self,
+ fs_root: FSPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ if not self._evaluate_condition(self._condition, condition_context):
+ return
+ dir_path_part, link_name = os.path.split(self._link_dest)
+ dir_path = self._ensure_is_directory(
+ fs_root,
+ dir_path_part,
+ self._definition_source,
+ )
+ existing = dir_path.get(link_name)
+ if existing:
+ self._handle_existing_path(existing)
+ dir_path.add_symlink(link_name, self._link_target)
+
+ def _handle_existing_path(self, existing: VirtualPath) -> None:
+ replacement_rule = self._replacement_rule
+ if replacement_rule == "abort-on-non-empty-directory":
+ unlink = not existing.is_dir or not any(existing.iterdir)
+ reason = "the path is a non-empty directory"
+ elif replacement_rule == "discard-existing":
+ unlink = True
+ reason = "<<internal error: you should not see an error with this message>>"
+ elif replacement_rule == "error-if-directory":
+ unlink = not existing.is_dir
+ reason = "the path is a directory"
+ else:
+ assert replacement_rule == "error-if-exists"
+ unlink = False
+ reason = "the path exists"
+
+ if unlink:
+ existing.unlink(recursive=True)
+ else:
+ self._error(
+ f"Refusing to replace {existing.path} with a symlink; {reason} and"
+ f" the active replacement-rule was {self._replacement_rule}. You can"
+ f' set the replacement-rule to "discard-existing", if you are not interested'
+ f" in the contents of {existing.path}. This error was triggered by {self._definition_source}."
+ )
+
+
+class CreateDirectoryTransformationRule(TransformationRule):
+ __slots__ = (
+ "_directories",
+ "_owner",
+ "_group",
+ "_mode",
+ "_definition_source",
+ "_condition",
+ )
+
+ def __init__(
+ self,
+ directories: Sequence[str],
+ owner: Optional[StaticFileSystemOwner],
+ group: Optional[StaticFileSystemGroup],
+ mode: Optional[FileSystemMode],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> None:
+ super().__init__()
+ self._directories = directories
+ self._owner = owner
+ self._group = group
+ self._mode = mode
+ self._definition_source = definition_source
+ self._condition = condition
+
+ def transform_file_system(
+ self,
+ fs_root: FSPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ if not self._evaluate_condition(self._condition, condition_context):
+ return
+ owner = self._owner
+ group = self._group
+ mode = self._mode
+ for directory in self._directories:
+ dir_path = self._ensure_is_directory(
+ fs_root,
+ directory,
+ self._definition_source,
+ )
+
+ if mode is not None:
+ try:
+ desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir)
+ except ValueError as e:
+ self._error(
+ f"Could not compute desired mode for {dir_path.path} as"
+ f" requested in {self._definition_source}: {e.args[0]}",
+ caused_by=e,
+ )
+ dir_path.mode = desired_mode
+ dir_path.chown(owner, group)
+
+
+def _apply_owner_and_mode(
+ path: VirtualPath,
+ owner: Optional[StaticFileSystemOwner],
+ group: Optional[StaticFileSystemGroup],
+ mode: Optional[FileSystemMode],
+ capabilities: Optional[str],
+ capability_mode: Optional[FileSystemMode],
+ definition_source: str,
+) -> None:
+ if owner is not None or group is not None:
+ path.chown(owner, group)
+ if mode is not None:
+ try:
+ desired_mode = mode.compute_mode(path.mode, path.is_dir)
+ except ValueError as e:
+ raise TransformationRuntimeError(
+ f"Could not compute desired mode for {path.path} as"
+ f" requested in {definition_source}: {e.args[0]}"
+ ) from e
+ path.mode = desired_mode
+
+ if path.is_file and capabilities is not None:
+ cap_ref = path.metadata(DebputyCapability)
+ cap_value = cap_ref.value
+ if cap_value is not None:
+ _warn(
+ f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due"
+ f" to {definition_source}."
+ )
+ assert capability_mode is not None
+ cap_ref.value = DebputyCapability(
+ capabilities,
+ capability_mode,
+ definition_source,
+ )
+
+
+class PathMetadataTransformationRule(TransformationRule):
+ __slots__ = (
+ "_match_rules",
+ "_owner",
+ "_group",
+ "_mode",
+ "_capabilities",
+ "_capability_mode",
+ "_recursive",
+ "_definition_source",
+ "_condition",
+ )
+
+ def __init__(
+ self,
+ match_rules: Sequence[MatchRule],
+ owner: Optional[StaticFileSystemOwner],
+ group: Optional[StaticFileSystemGroup],
+ mode: Optional[FileSystemMode],
+ recursive: bool,
+ capabilities: Optional[str],
+ capability_mode: Optional[FileSystemMode],
+ definition_source: str,
+ condition: Optional[ManifestCondition],
+ ) -> None:
+ super().__init__()
+ self._match_rules = match_rules
+ self._owner = owner
+ self._group = group
+ self._mode = mode
+ self._capabilities = capabilities
+ self._capability_mode = capability_mode
+ self._recursive = recursive
+ self._definition_source = definition_source
+ self._condition = condition
+ if self._capabilities is None and self._capability_mode is not None:
+ raise ValueError("capability_mode without capabilities")
+ if self._capabilities is not None and self._capability_mode is None:
+ raise ValueError("capabilities without capability_mode")
+
+ def transform_file_system(
+ self,
+ fs_root: FSPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ if not self._evaluate_condition(self._condition, condition_context):
+ return
+ owner = self._owner
+ group = self._group
+ mode = self._mode
+ capabilities = self._capabilities
+ capability_mode = self._capability_mode
+ definition_source = self._definition_source
+ d: Optional[List[FSPath]] = [] if self._recursive else None
+ needs_file_match = False
+ if self._owner is not None or self._group is not None or self._mode is not None:
+ needs_file_match = True
+
+ for match_rule in self._match_rules:
+ match_ok = False
+ saw_symlink = False
+ saw_directory = False
+
+ for path in match_rule.finditer(fs_root):
+ if path.is_symlink:
+ saw_symlink = True
+ continue
+ if path.is_file or not needs_file_match:
+ match_ok = True
+ if path.is_dir:
+ saw_directory = True
+ if not match_ok and needs_file_match and self._recursive:
+ match_ok = any(p.is_file for p in path.all_paths())
+ _apply_owner_and_mode(
+ path,
+ owner,
+ group,
+ mode,
+ capabilities,
+ capability_mode,
+ definition_source,
+ )
+ if path.is_dir and d is not None:
+ d.append(path)
+
+ if not match_ok:
+ if needs_file_match and (saw_directory or saw_symlink):
+ _warn(
+ f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
+ " did not match any files, but given the attributes it can only apply to files."
+ )
+ elif saw_symlink:
+ _warn(
+ f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
+ ' matched symlinks, but "path-metadata" cannot apply to symlinks.'
+ )
+ self._match_rule_had_no_matches(match_rule, self._definition_source)
+
+ if not d:
+ return
+ for recurse_dir in d:
+ for path in recurse_dir.all_paths():
+ if path.is_symlink:
+ continue
+ _apply_owner_and_mode(
+ path,
+ owner,
+ group,
+ mode,
+ capabilities,
+ capability_mode,
+ definition_source,
+ )
+
+
+class ModeNormalizationTransformationRule(TransformationRule):
+ __slots__ = ("_normalizations",)
+
+ def __init__(
+ self,
+ normalizations: Sequence[Tuple[MatchRule, FileSystemMode]],
+ ) -> None:
+ self._normalizations = normalizations
+
+ def transform_file_system(
+ self,
+ fs_root: FSPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ seen = set()
+ for match_rule, fs_mode in self._normalizations:
+ for path in match_rule.finditer(
+ fs_root, ignore_paths=lambda p: p.path in seen
+ ):
+ if path.is_symlink or path.path in seen:
+ continue
+ seen.add(path.path)
+ try:
+ desired_mode = fs_mode.compute_mode(path.mode, path.is_dir)
+ except ValueError as e:
+ raise AssertionError(
+ "Error while applying built-in mode normalization rule"
+ ) from e
+ path.mode = desired_mode
+
+
+class NormalizeShebangLineTransformation(TransformationRule):
+ def transform_file_system(
+ self,
+ fs_root: VirtualPath,
+ condition_context: ConditionContext,
+ ) -> None:
+ for path in fs_root.all_paths():
+ if not path.is_file:
+ continue
+ try:
+ with path.open(byte_io=True, buffering=4096) as fd:
+ interpreter = extract_shebang_interpreter_from_file(fd)
+ except (PureVirtualPathError, TestPathWithNonExistentFSPathError):
+ # Do not make tests unnecessarily complex to write
+ continue
+ if interpreter is None:
+ continue
+
+ if interpreter.fixup_needed:
+ interpreter.replace_shebang_line(path)