Coverage for src/debputy/transformation_rules.py: 73%
271 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import dataclasses
2import os
3from typing import (
4 NoReturn,
5 Optional,
6 Callable,
7 Sequence,
8 Tuple,
9 List,
10 Literal,
11 Dict,
12 TypeVar,
13 cast,
14)
16from debputy.exceptions import (
17 DebputyRuntimeError,
18 PureVirtualPathError,
19 TestPathWithNonExistentFSPathError,
20)
21from debputy.filesystem_scan import FSPath
22from debputy.interpreter import (
23 extract_shebang_interpreter_from_file,
24)
25from debputy.manifest_conditions import ConditionContext, ManifestCondition
26from debputy.manifest_parser.base_types import (
27 FileSystemMode,
28 StaticFileSystemOwner,
29 StaticFileSystemGroup,
30 DebputyDispatchableType,
31)
32from debputy.manifest_parser.util import AttributePath
33from debputy.path_matcher import MatchRule
34from debputy.plugin.api import VirtualPath
35from debputy.plugin.debputy.types import DebputyCapability
36from debputy.util import _warn
39class TransformationRuntimeError(DebputyRuntimeError):
40 pass
43CreateSymlinkReplacementRule = Literal[
44 "error-if-exists",
45 "error-if-directory",
46 "abort-on-non-empty-directory",
47 "discard-existing",
48]
51VP = TypeVar("VP", bound=VirtualPath)
54@dataclasses.dataclass(frozen=True, slots=True)
55class PreProvidedExclusion:
56 tag: str
57 description: str
58 pruner: Callable[[FSPath], None]
61class TransformationRule(DebputyDispatchableType):
62 __slots__ = ()
64 def transform_file_system(
65 self, fs_root: FSPath, condition_context: ConditionContext
66 ) -> None:
67 raise NotImplementedError
69 def _evaluate_condition(
70 self,
71 condition: Optional[ManifestCondition],
72 condition_context: ConditionContext,
73 result_if_condition_is_missing: bool = True,
74 ) -> bool:
75 if condition is None: 75 ↛ 77line 75 didn't jump to line 77, because the condition on line 75 was never false
76 return result_if_condition_is_missing
77 return condition.evaluate(condition_context)
79 def _error(
80 self,
81 msg: str,
82 *,
83 caused_by: Optional[BaseException] = None,
84 ) -> NoReturn:
85 raise TransformationRuntimeError(msg) from caused_by
87 def _match_rule_had_no_matches(
88 self, match_rule: MatchRule, definition_source: str
89 ) -> NoReturn:
90 self._error(
91 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did'
92 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is"
93 " incorrect."
94 )
96 def _fs_path_as_dir(
97 self,
98 path: VP,
99 definition_source: str,
100 ) -> VP:
101 if path.is_dir: 101 ↛ 103line 101 didn't jump to line 103, because the condition on line 101 was never false
102 return path
103 path_type = "file" if path.is_file else 'symlink/"special file system object"'
104 self._error(
105 f"The path {path.path} was expected to be a directory (or non-existing) due to"
106 f" {definition_source}. However that path existed and is a {path_type}."
107 f" You may need a `remove: {path.path}` prior to {definition_source} to"
108 " to make this transformation succeed."
109 )
111 def _ensure_is_directory(
112 self,
113 fs_root: FSPath,
114 path_to_directory: str,
115 definition_source: str,
116 ) -> FSPath:
117 current, missing_parts = fs_root.attempt_lookup(path_to_directory)
118 current = self._fs_path_as_dir(cast("FSPath", current), definition_source)
119 if missing_parts:
120 return current.mkdirs("/".join(missing_parts))
121 return current
124class RemoveTransformationRule(TransformationRule):
125 __slots__ = (
126 "_match_rules",
127 "_keep_empty_parent_dirs",
128 "_definition_source",
129 )
131 def __init__(
132 self,
133 match_rules: Sequence[MatchRule],
134 keep_empty_parent_dirs: bool,
135 definition_source: AttributePath,
136 ) -> None:
137 self._match_rules = match_rules
138 self._keep_empty_parent_dirs = keep_empty_parent_dirs
139 self._definition_source = definition_source.path
141 def transform_file_system(
142 self,
143 fs_root: FSPath,
144 condition_context: ConditionContext,
145 ) -> None:
146 matched_any = False
147 for match_rule in self._match_rules:
148 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a
149 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52
150 matches = list(match_rule.finditer(fs_root))
151 for m in matches:
152 matched_any = True
153 parent = m.parent_dir
154 if parent is None: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true
155 self._error(
156 f"Cannot remove the root directory (triggered by {self._definition_source})"
157 )
158 m.unlink(recursive=True)
159 if not self._keep_empty_parent_dirs:
160 parent.prune_if_empty_dir()
161 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures
162 if not matched_any:
163 self._match_rule_had_no_matches(match_rule, self._definition_source)
166class MoveTransformationRule(TransformationRule):
167 __slots__ = (
168 "_match_rule",
169 "_dest_path",
170 "_dest_is_dir",
171 "_definition_source",
172 "_condition",
173 )
175 def __init__(
176 self,
177 match_rule: MatchRule,
178 dest_path: str,
179 dest_is_dir: bool,
180 definition_source: AttributePath,
181 condition: Optional[ManifestCondition],
182 ) -> None:
183 self._match_rule = match_rule
184 self._dest_path = dest_path
185 self._dest_is_dir = dest_is_dir
186 self._definition_source = definition_source.path
187 self._condition = condition
189 def transform_file_system(
190 self, fs_root: FSPath, condition_context: ConditionContext
191 ) -> None:
192 if not self._evaluate_condition(self._condition, condition_context): 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true
193 return
194 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la)
195 matches = list(self._match_rule.finditer(fs_root))
196 if not matches:
197 self._match_rule_had_no_matches(self._match_rule, self._definition_source)
199 target_dir: Optional[VirtualPath]
200 if self._dest_is_dir: 200 ↛ 201line 200 didn't jump to line 201, because the condition on line 200 was never true
201 target_dir = self._ensure_is_directory(
202 fs_root,
203 self._dest_path,
204 self._definition_source,
205 )
206 else:
207 dir_part, basename = os.path.split(self._dest_path)
208 target_parent_dir = self._ensure_is_directory(
209 fs_root,
210 dir_part,
211 self._definition_source,
212 )
213 target_dir = target_parent_dir.get(basename)
215 if target_dir is None or not target_dir.is_dir: 215 ↛ 235line 215 didn't jump to line 235, because the condition on line 215 was never false
216 if len(matches) > 1: 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true
217 self._error(
218 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
219 f" (from: {self._definition_source}). Multiple paths matched the pattern and the"
220 " destination was not a directory. Either correct the pattern to only match only source"
221 " OR define the destination to be a directory (E.g., add a trailing slash - example:"
222 f' "{self._dest_path}/")'
223 )
224 p = matches[0]
225 if p.path == self._dest_path: 225 ↛ 226line 225 didn't jump to line 226, because the condition on line 225 was never true
226 self._error(
227 f"Error in {self._definition_source}, the source"
228 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the"
229 " rename redundant!?"
230 )
231 p.parent_dir = target_parent_dir
232 p.name = basename
233 return
235 assert target_dir is not None and target_dir.is_dir
236 basenames: Dict[str, VirtualPath] = dict()
237 target_dir_path = target_dir.path
239 for m in matches:
240 if m.path == target_dir_path:
241 self._error(
242 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}"
243 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into"
244 " itself"
245 )
246 if m.name in basenames:
247 alt_path = basenames[m.name]
248 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be
249 # possible for a *single* glob to match the same path twice.
250 assert alt_path is not m
251 self._error(
252 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
253 f" (from: {self._definition_source}). Multiple paths matched the pattern had the"
254 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the'
255 f" pattern, so it only matches one path with that basename to avoid this conflict."
256 )
257 existing = m.get(m.name)
258 if existing and existing.is_dir:
259 self._error(
260 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
261 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace"
262 f" the existing directory {existing.path}. If this replacement is intentional, then please"
263 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)'
264 )
265 basenames[m.name] = m
266 m.parent_dir = target_dir
269class CreateSymlinkPathTransformationRule(TransformationRule):
270 __slots__ = (
271 "_link_dest",
272 "_link_target",
273 "_replacement_rule",
274 "_definition_source",
275 "_condition",
276 )
278 def __init__(
279 self,
280 link_target: str,
281 link_dest: str,
282 replacement_rule: CreateSymlinkReplacementRule,
283 definition_source: AttributePath,
284 condition: Optional[ManifestCondition],
285 ) -> None:
286 self._link_target = link_target
287 self._link_dest = link_dest
288 self._replacement_rule = replacement_rule
289 self._definition_source = definition_source.path
290 self._condition = condition
292 def transform_file_system(
293 self,
294 fs_root: FSPath,
295 condition_context: ConditionContext,
296 ) -> None:
297 if not self._evaluate_condition(self._condition, condition_context): 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 return
299 dir_path_part, link_name = os.path.split(self._link_dest)
300 dir_path = self._ensure_is_directory(
301 fs_root,
302 dir_path_part,
303 self._definition_source,
304 )
305 existing = dir_path.get(link_name)
306 if existing:
307 self._handle_existing_path(existing)
308 dir_path.add_symlink(link_name, self._link_target)
310 def _handle_existing_path(self, existing: VirtualPath) -> None:
311 replacement_rule = self._replacement_rule
312 if replacement_rule == "abort-on-non-empty-directory":
313 unlink = not existing.is_dir or not any(existing.iterdir)
314 reason = "the path is a non-empty directory"
315 elif replacement_rule == "discard-existing": 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true
316 unlink = True
317 reason = "<<internal error: you should not see an error with this message>>"
318 elif replacement_rule == "error-if-directory":
319 unlink = not existing.is_dir
320 reason = "the path is a directory"
321 else:
322 assert replacement_rule == "error-if-exists"
323 unlink = False
324 reason = "the path exists"
326 if unlink:
327 existing.unlink(recursive=True)
328 else:
329 self._error(
330 f"Refusing to replace {existing.path} with a symlink; {reason} and"
331 f" the active replacement-rule was {self._replacement_rule}. You can"
332 f' set the replacement-rule to "discard-existing", if you are not interested'
333 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}."
334 )
337class CreateDirectoryTransformationRule(TransformationRule):
338 __slots__ = (
339 "_directories",
340 "_owner",
341 "_group",
342 "_mode",
343 "_definition_source",
344 "_condition",
345 )
347 def __init__(
348 self,
349 directories: Sequence[str],
350 owner: Optional[StaticFileSystemOwner],
351 group: Optional[StaticFileSystemGroup],
352 mode: Optional[FileSystemMode],
353 definition_source: str,
354 condition: Optional[ManifestCondition],
355 ) -> None:
356 super().__init__()
357 self._directories = directories
358 self._owner = owner
359 self._group = group
360 self._mode = mode
361 self._definition_source = definition_source
362 self._condition = condition
364 def transform_file_system(
365 self,
366 fs_root: FSPath,
367 condition_context: ConditionContext,
368 ) -> None:
369 if not self._evaluate_condition(self._condition, condition_context): 369 ↛ 370line 369 didn't jump to line 370, because the condition on line 369 was never true
370 return
371 owner = self._owner
372 group = self._group
373 mode = self._mode
374 for directory in self._directories:
375 dir_path = self._ensure_is_directory(
376 fs_root,
377 directory,
378 self._definition_source,
379 )
381 if mode is not None:
382 try:
383 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir)
384 except ValueError as e:
385 self._error(
386 f"Could not compute desired mode for {dir_path.path} as"
387 f" requested in {self._definition_source}: {e.args[0]}",
388 caused_by=e,
389 )
390 dir_path.mode = desired_mode
391 dir_path.chown(owner, group)
394def _apply_owner_and_mode(
395 path: VirtualPath,
396 owner: Optional[StaticFileSystemOwner],
397 group: Optional[StaticFileSystemGroup],
398 mode: Optional[FileSystemMode],
399 capabilities: Optional[str],
400 capability_mode: Optional[FileSystemMode],
401 definition_source: str,
402) -> None:
403 if owner is not None or group is not None: 403 ↛ 405line 403 didn't jump to line 405, because the condition on line 403 was never false
404 path.chown(owner, group)
405 if mode is not None: 405 ↛ 415line 405 didn't jump to line 415, because the condition on line 405 was never false
406 try:
407 desired_mode = mode.compute_mode(path.mode, path.is_dir)
408 except ValueError as e:
409 raise TransformationRuntimeError(
410 f"Could not compute desired mode for {path.path} as"
411 f" requested in {definition_source}: {e.args[0]}"
412 ) from e
413 path.mode = desired_mode
415 if path.is_file and capabilities is not None: 415 ↛ 416line 415 didn't jump to line 416, because the condition on line 415 was never true
416 cap_ref = path.metadata(DebputyCapability)
417 cap_value = cap_ref.value
418 if cap_value is not None:
419 _warn(
420 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due"
421 f" to {definition_source}."
422 )
423 assert capability_mode is not None
424 cap_ref.value = DebputyCapability(
425 capabilities,
426 capability_mode,
427 definition_source,
428 )
431class PathMetadataTransformationRule(TransformationRule):
432 __slots__ = (
433 "_match_rules",
434 "_owner",
435 "_group",
436 "_mode",
437 "_capabilities",
438 "_capability_mode",
439 "_recursive",
440 "_definition_source",
441 "_condition",
442 )
444 def __init__(
445 self,
446 match_rules: Sequence[MatchRule],
447 owner: Optional[StaticFileSystemOwner],
448 group: Optional[StaticFileSystemGroup],
449 mode: Optional[FileSystemMode],
450 recursive: bool,
451 capabilities: Optional[str],
452 capability_mode: Optional[FileSystemMode],
453 definition_source: str,
454 condition: Optional[ManifestCondition],
455 ) -> None:
456 super().__init__()
457 self._match_rules = match_rules
458 self._owner = owner
459 self._group = group
460 self._mode = mode
461 self._capabilities = capabilities
462 self._capability_mode = capability_mode
463 self._recursive = recursive
464 self._definition_source = definition_source
465 self._condition = condition
466 if self._capabilities is None and self._capability_mode is not None: 466 ↛ 467line 466 didn't jump to line 467, because the condition on line 466 was never true
467 raise ValueError("capability_mode without capabilities")
468 if self._capabilities is not None and self._capability_mode is None: 468 ↛ 469line 468 didn't jump to line 469, because the condition on line 468 was never true
469 raise ValueError("capabilities without capability_mode")
471 def transform_file_system(
472 self,
473 fs_root: FSPath,
474 condition_context: ConditionContext,
475 ) -> None:
476 if not self._evaluate_condition(self._condition, condition_context): 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true
477 return
478 owner = self._owner
479 group = self._group
480 mode = self._mode
481 capabilities = self._capabilities
482 capability_mode = self._capability_mode
483 definition_source = self._definition_source
484 d: Optional[List[FSPath]] = [] if self._recursive else None
485 needs_file_match = False
486 if self._owner is not None or self._group is not None or self._mode is not None: 486 ↛ 489line 486 didn't jump to line 489, because the condition on line 486 was never false
487 needs_file_match = True
489 for match_rule in self._match_rules:
490 match_ok = False
491 saw_symlink = False
492 saw_directory = False
494 for path in match_rule.finditer(fs_root):
495 if path.is_symlink: 495 ↛ 496line 495 didn't jump to line 496, because the condition on line 495 was never true
496 saw_symlink = True
497 continue
498 if path.is_file or not needs_file_match: 498 ↛ 500line 498 didn't jump to line 500, because the condition on line 498 was never false
499 match_ok = True
500 if path.is_dir: 500 ↛ 501line 500 didn't jump to line 501, because the condition on line 500 was never true
501 saw_directory = True
502 if not match_ok and needs_file_match and self._recursive:
503 match_ok = any(p.is_file for p in path.all_paths())
504 _apply_owner_and_mode(
505 path,
506 owner,
507 group,
508 mode,
509 capabilities,
510 capability_mode,
511 definition_source,
512 )
513 if path.is_dir and d is not None: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true
514 d.append(path)
516 if not match_ok: 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true
517 if needs_file_match and (saw_directory or saw_symlink):
518 _warn(
519 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
520 " did not match any files, but given the attributes it can only apply to files."
521 )
522 elif saw_symlink:
523 _warn(
524 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
525 ' matched symlinks, but "path-metadata" cannot apply to symlinks.'
526 )
527 self._match_rule_had_no_matches(match_rule, self._definition_source)
529 if not d: 529 ↛ 531line 529 didn't jump to line 531, because the condition on line 529 was never false
530 return
531 for recurse_dir in d:
532 for path in recurse_dir.all_paths():
533 if path.is_symlink:
534 continue
535 _apply_owner_and_mode(
536 path,
537 owner,
538 group,
539 mode,
540 capabilities,
541 capability_mode,
542 definition_source,
543 )
546class ModeNormalizationTransformationRule(TransformationRule):
547 __slots__ = ("_normalizations",)
549 def __init__(
550 self,
551 normalizations: Sequence[Tuple[MatchRule, FileSystemMode]],
552 ) -> None:
553 self._normalizations = normalizations
555 def transform_file_system(
556 self,
557 fs_root: FSPath,
558 condition_context: ConditionContext,
559 ) -> None:
560 seen = set()
561 for match_rule, fs_mode in self._normalizations:
562 for path in match_rule.finditer(
563 fs_root, ignore_paths=lambda p: p.path in seen
564 ):
565 if path.is_symlink or path.path in seen:
566 continue
567 seen.add(path.path)
568 try:
569 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir)
570 except ValueError as e:
571 raise AssertionError(
572 "Error while applying built-in mode normalization rule"
573 ) from e
574 path.mode = desired_mode
577class NormalizeShebangLineTransformation(TransformationRule):
578 def transform_file_system(
579 self,
580 fs_root: VirtualPath,
581 condition_context: ConditionContext,
582 ) -> None:
583 for path in fs_root.all_paths():
584 if not path.is_file:
585 continue
586 try:
587 with path.open(byte_io=True, buffering=4096) as fd:
588 interpreter = extract_shebang_interpreter_from_file(fd)
589 except (PureVirtualPathError, TestPathWithNonExistentFSPathError):
590 # Do not make tests unnecessarily complex to write
591 continue
592 if interpreter is None:
593 continue
595 if interpreter.fixup_needed:
596 interpreter.replace_shebang_line(path)