Coverage for src/debputy/filesystem_scan.py: 74%
1104 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import stat
9import subprocess
10import tempfile
11import time
12from abc import ABC
13from contextlib import suppress
14from typing import (
15 List,
16 Iterable,
17 Dict,
18 Optional,
19 Tuple,
20 Union,
21 Iterator,
22 Mapping,
23 cast,
24 Any,
25 ContextManager,
26 TextIO,
27 BinaryIO,
28 NoReturn,
29 Type,
30 Generic,
31)
32from weakref import ref, ReferenceType
34from debputy.exceptions import (
35 PureVirtualPathError,
36 DebputyFSIsROError,
37 DebputyMetadataAccessError,
38 TestPathWithNonExistentFSPathError,
39 SymlinkLoopError,
40)
41from debputy.intermediate_manifest import PathType
42from debputy.manifest_parser.base_types import (
43 ROOT_DEFINITION,
44 StaticFileSystemOwner,
45 StaticFileSystemGroup,
46)
47from debputy.plugin.api.spec import (
48 VirtualPath,
49 PathDef,
50 PathMetadataReference,
51 PMT,
52)
53from debputy.types import VP
54from debputy.util import (
55 generated_content_dir,
56 _error,
57 escape_shell,
58 assume_not_none,
59 _normalize_path,
60)
62BY_BASENAME = operator.attrgetter("name")
65class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
66 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
68 def __init__(
69 self,
70 owning_plugin: str,
71 current_plugin: str,
72 metadata_type: Type[PMT],
73 ) -> None:
74 self._owning_plugin = owning_plugin
75 self._current_plugin = current_plugin
76 self._metadata_type = metadata_type
78 @property
79 def is_present(self) -> bool:
80 return False
82 @property
83 def can_read(self) -> bool:
84 return self._owning_plugin == self._current_plugin
86 @property
87 def can_write(self) -> bool:
88 return False
90 @property
91 def value(self) -> Optional[PMT]:
92 if self.can_read: 92 ↛ 94line 92 didn't jump to line 94, because the condition on line 92 was never false
93 return None
94 raise DebputyMetadataAccessError(
95 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
96 f" {self._owning_plugin} as the metadata has not been made"
97 f" readable to the plugin {self._current_plugin}."
98 )
100 @value.setter
101 def value(self, new_value: PMT) -> None:
102 if self._is_owner:
103 raise DebputyFSIsROError(
104 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
105 )
106 raise DebputyMetadataAccessError(
107 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
108 f" {self._owning_plugin} as the metadata has not been made"
109 f" read-write to the plugin {self._current_plugin}."
110 )
112 @property
113 def _is_owner(self) -> bool:
114 return self._owning_plugin == self._current_plugin
117@dataclasses.dataclass(slots=True)
118class PathMetadataValue(Generic[PMT]):
119 owning_plugin: str
120 metadata_type: Type[PMT]
121 value: Optional[PMT] = None
123 def can_read_value(self, current_plugin: str) -> bool:
124 return self.owning_plugin == current_plugin
126 def can_write_value(self, current_plugin: str) -> bool:
127 return self.owning_plugin == current_plugin
130class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
131 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
133 def __init__(
134 self,
135 owning_path: VirtualPath,
136 current_plugin: str,
137 path_metadata_value: PathMetadataValue[PMT],
138 ) -> None:
139 self._owning_path = owning_path
140 self._current_plugin = current_plugin
141 self._path_metadata_value = path_metadata_value
143 @property
144 def is_present(self) -> bool:
145 if not self.can_read: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true
146 return False
147 return self._path_metadata_value.value is not None
149 @property
150 def can_read(self) -> bool:
151 return self._path_metadata_value.can_read_value(self._current_plugin)
153 @property
154 def can_write(self) -> bool:
155 if not self._path_metadata_value.can_write_value(self._current_plugin): 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true
156 return False
157 owning_path = self._owning_path
158 return owning_path.is_read_write and not owning_path.is_detached
160 @property
161 def value(self) -> Optional[PMT]:
162 if self.can_read: 162 ↛ 164line 162 didn't jump to line 164, because the condition on line 162 was never false
163 return self._path_metadata_value.value
164 raise DebputyMetadataAccessError(
165 f"Cannot read the metadata {self._metadata_type_name} owned by"
166 f" {self._owning_plugin} as the metadata has not been made"
167 f" readable to the plugin {self._current_plugin}."
168 )
170 @value.setter
171 def value(self, new_value: PMT) -> None:
172 if not self.can_write: 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true
173 m = "set" if new_value is not None else "delete"
174 raise DebputyMetadataAccessError(
175 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
176 f" {self._owning_plugin} as the metadata has not been made"
177 f" read-write to the plugin {self._current_plugin}."
178 )
179 owning_path = self._owning_path
180 if not owning_path.is_read_write: 180 ↛ 181line 180 didn't jump to line 181, because the condition on line 180 was never true
181 raise DebputyFSIsROError(
182 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
183 )
184 if owning_path.is_detached: 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true
185 raise TypeError(
186 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
187 )
188 self._path_metadata_value.value = new_value
190 @property
191 def _is_owner(self) -> bool:
192 return self._owning_plugin == self._current_plugin
194 @property
195 def _owning_plugin(self) -> str:
196 return self._path_metadata_value.owning_plugin
198 @property
199 def _metadata_type_name(self) -> str:
200 return self._path_metadata_value.metadata_type.__name__
203def _cp_a(source: str, dest: str) -> None:
204 cmd = ["cp", "-a", source, dest]
205 try:
206 subprocess.check_call(cmd)
207 except subprocess.CalledProcessError:
208 full_command = escape_shell(*cmd)
209 _error(
210 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
211 f" above to understand what went wrong. The full command was: {full_command}"
212 )
215def _split_path(path: str) -> Tuple[bool, bool, List[str]]:
216 must_be_dir = True if path.endswith("/") else False
217 absolute = False
218 if path.startswith("/"):
219 absolute = True
220 path = "." + path
221 path_parts = path.rstrip("/").split("/")
222 if must_be_dir:
223 path_parts.append(".")
224 return absolute, must_be_dir, path_parts
227def _root(path: VP) -> VP:
228 current = path
229 while True:
230 parent = current.parent_dir
231 if parent is None:
232 return current
233 current = parent
236def _check_fs_path_is_file(
237 fs_path: str,
238 unlink_on_error: Optional["FSPath"] = None,
239) -> None:
240 had_issue = False
241 try:
242 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
243 st = os.lstat(fs_path)
244 except FileNotFoundError:
245 had_issue = True
246 else:
247 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true
248 had_issue = True
249 if not had_issue: 249 ↛ 252line 249 didn't jump to line 252, because the condition on line 249 was never false
250 return
252 if unlink_on_error:
253 with suppress(FileNotFoundError):
254 os.unlink(fs_path)
255 raise TypeError(
256 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
257 " linked to another file. The entry has been disconnected."
258 )
261class CurrentPluginContextManager:
262 __slots__ = ("_plugin_names",)
264 def __init__(self, initial_plugin_name: str) -> None:
265 self._plugin_names = [initial_plugin_name]
267 @property
268 def current_plugin_name(self) -> str:
269 return self._plugin_names[-1]
271 @contextlib.contextmanager
272 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
273 self._plugin_names.append(new_plugin_name)
274 yield new_plugin_name
275 self._plugin_names.pop()
278class VirtualPathBase(VirtualPath, ABC):
279 __slots__ = ()
281 def _orphan_safe_path(self) -> str:
282 return self.path
284 def _rw_check(self) -> None:
285 if not self.is_read_write:
286 raise DebputyFSIsROError(
287 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
288 " Debputy Virtual File system is R/O."
289 )
291 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
292 match, missing = self.attempt_lookup(path)
293 if missing:
294 return None
295 return match
297 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]:
298 if self.is_detached: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true
299 raise ValueError(
300 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
301 )
302 absolute, must_be_dir, path_parts = _split_path(path)
303 current = _root(self) if absolute else self
304 path_parts.reverse()
305 link_expansions = set()
306 while path_parts:
307 dir_part = path_parts.pop()
308 if dir_part == ".":
309 continue
310 if dir_part == "..":
311 p = current.parent_dir
312 if p is None: 312 ↛ 313line 312 didn't jump to line 313, because the condition on line 312 was never true
313 raise ValueError(f'The path "{path}" escapes the root dir')
314 current = p
315 continue
316 try:
317 current = current[dir_part]
318 except KeyError:
319 path_parts.append(dir_part)
320 path_parts.reverse()
321 if must_be_dir:
322 path_parts.pop()
323 return current, path_parts
324 if current.is_symlink and path_parts:
325 if current.path in link_expansions:
326 # This is our loop detection for now. It might have some false positives where you
327 # could safely resolve the same symlink twice. However, given that this use-case is
328 # basically non-existent in practice for packaging, we just stop here for now.
329 raise SymlinkLoopError(
330 f'The path "{path}" traversed the symlink "{current.path}" multiple'
331 " times. Currently, traversing the same symlink twice is considered"
332 " a loop by `debputy` even if the path would eventually resolve."
333 " Consider filing a feature request if you have a benign case that"
334 " triggers this error."
335 )
336 link_expansions.add(current.path)
337 link_target = current.readlink()
338 link_absolute, _, link_path_parts = _split_path(link_target)
339 if link_absolute:
340 current = _root(current)
341 else:
342 current = assume_not_none(current.parent_dir)
343 link_path_parts.reverse()
344 path_parts.extend(link_path_parts)
345 return current, []
347 def mkdirs(self, path: str) -> "VirtualPath":
348 current: VirtualPath
349 current, missing_parts = self.attempt_lookup(
350 f"{path}/" if not path.endswith("/") else path
351 )
352 if not current.is_dir: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true
353 raise ValueError(
354 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
355 " a directory. However, that path exist AND is a not directory."
356 )
357 for missing_part in missing_parts:
358 assert missing_part not in (".", "..")
359 current = current.mkdir(missing_part)
360 return current
362 def prune_if_empty_dir(self) -> None:
363 """Remove this and all (now) empty parent directories
365 Same as: `rmdir --ignore-fail-on-non-empty --parents`
367 This operation may cause the path (and any of its parent directories) to become "detached"
368 and therefore unsafe to use in further operations.
369 """
370 self._rw_check()
372 if not self.is_dir: 372 ↛ 373line 372 didn't jump to line 373, because the condition on line 372 was never true
373 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
374 if any(self.iterdir):
375 return
376 parent_dir = assume_not_none(self.parent_dir)
378 # Recursive does not matter; we already know the directory is empty.
379 self.unlink()
381 # Note: The root dir must never be deleted. This works because when delegating it to the root
382 # directory, its implementation of this method is a no-op. If this is later rewritten to an
383 # inline loop (rather than recursion), be sure to preserve this feature.
384 parent_dir.prune_if_empty_dir()
386 def _current_plugin(self) -> str:
387 if self.is_detached: 387 ↛ 388line 387 didn't jump to line 388, because the condition on line 387 was never true
388 raise TypeError("Cannot resolve the current plugin; path is detached")
389 current = self
390 while True:
391 next_parent = current.parent_dir
392 if next_parent is None:
393 break
394 current = next_parent
395 assert current is not None
396 return cast("FSRootDir", current)._current_plugin()
399class FSPath(VirtualPathBase, ABC):
400 __slots__ = (
401 "_basename",
402 "_parent_dir",
403 "_children",
404 "_path_cache",
405 "_parent_path_cache",
406 "_last_known_parent_path",
407 "_mode",
408 "_owner",
409 "_group",
410 "_mtime",
411 "_stat_cache",
412 "_metadata",
413 "__weakref__",
414 )
416 def __init__(
417 self,
418 basename: str,
419 parent: Optional["FSPath"],
420 children: Optional[Dict[str, "FSPath"]] = None,
421 initial_mode: Optional[int] = None,
422 mtime: Optional[float] = None,
423 stat_cache: Optional[os.stat_result] = None,
424 ) -> None:
425 self._basename = basename
426 self._path_cache: Optional[str] = None
427 self._parent_path_cache: Optional[str] = None
428 self._children = children
429 self._last_known_parent_path: Optional[str] = None
430 self._mode = initial_mode
431 self._mtime = mtime
432 self._stat_cache = stat_cache
433 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {}
434 self._owner = ROOT_DEFINITION
435 self._group = ROOT_DEFINITION
437 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
438 # is_orphaned, which assumes self._parent_dir is an attribute.
439 self._parent_dir: Optional[ReferenceType["FSPath"]] = None
440 if parent is not None:
441 self.parent_dir = parent
443 def __repr__(self) -> str:
444 return (
445 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
446 f" is_file={self.is_file},"
447 f" is_dir={self.is_dir},"
448 f" is_symlink={self.is_symlink},"
449 f" has_fs_path={self.has_fs_path},"
450 f" children_len={len(self._children) if self._children else 0})"
451 )
453 @property
454 def name(self) -> str:
455 return self._basename
457 @name.setter
458 def name(self, new_name: str) -> None:
459 self._rw_check()
460 if new_name == self._basename: 460 ↛ 461line 460 didn't jump to line 461, because the condition on line 460 was never true
461 return
462 if self.is_detached: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true
463 self._basename = new_name
464 return
465 self._rw_check()
466 parent = self.parent_dir
467 # This little parent_dir dance ensures the parent dir detects the rename properly
468 self.parent_dir = None
469 self._basename = new_name
470 self.parent_dir = parent
472 @property
473 def iterdir(self) -> Iterable["FSPath"]:
474 if self._children is not None:
475 yield from self._children.values()
477 def all_paths(self) -> Iterable["FSPath"]:
478 yield self
479 if not self.is_dir:
480 return
481 by_basename = BY_BASENAME
482 stack = sorted(self.iterdir, key=by_basename, reverse=True)
483 while stack:
484 current = stack.pop()
485 yield current
486 if current.is_dir and not current.is_detached:
487 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True))
489 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]:
490 # FIXME: can this be more "os.walk"-like without making it harder to implement?
491 if not self.is_dir: 491 ↛ 492line 491 didn't jump to line 492, because the condition on line 491 was never true
492 yield self, []
493 return
494 by_basename = BY_BASENAME
495 stack = [self]
496 while stack:
497 current = stack.pop()
498 children = sorted(current.iterdir, key=by_basename)
499 assert not children or current.is_dir
500 yield current, children
501 # Removing the directory counts as discarding the children.
502 if not current.is_detached: 502 ↛ 496line 502 didn't jump to line 496, because the condition on line 502 was never false
503 stack.extend(reversed(children))
505 def _orphan_safe_path(self) -> str:
506 if not self.is_detached or self._last_known_parent_path is not None: 506 ↛ 508line 506 didn't jump to line 508, because the condition on line 506 was never false
507 return self.path
508 return f"<orphaned>/{self.name}"
510 @property
511 def is_detached(self) -> bool:
512 parent = self._parent_dir
513 if parent is None:
514 return True
515 resolved_parent = parent()
516 if resolved_parent is None: 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true
517 return True
518 return resolved_parent.is_detached
520 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
521 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
522 # behaviour to avoid surprises for now.
523 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
524 # to using it)
525 __iter__ = None
527 def __getitem__(self, key) -> "FSPath":
528 if self._children is None:
529 raise KeyError(
530 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
531 )
532 if isinstance(key, FSPath): 532 ↛ 533line 532 didn't jump to line 533, because the condition on line 532 was never true
533 key = key.name
534 return self._children[key]
536 def __delitem__(self, key) -> None:
537 self._rw_check()
538 children = self._children
539 if children is None: 539 ↛ 540line 539 didn't jump to line 540, because the condition on line 539 was never true
540 raise KeyError(key)
541 del children[key]
543 def get(self, key: str) -> "Optional[FSPath]":
544 try:
545 return self[key]
546 except KeyError:
547 return None
549 def __contains__(self, item: object) -> bool:
550 if isinstance(item, VirtualPath): 550 ↛ 551line 550 didn't jump to line 551, because the condition on line 550 was never true
551 return item.parent_dir is self
552 if not isinstance(item, str): 552 ↛ 553line 552 didn't jump to line 553, because the condition on line 552 was never true
553 return False
554 m = self.get(item)
555 return m is not None
557 def _add_child(self, child: "FSPath") -> None:
558 self._rw_check()
559 if not self.is_dir: 559 ↛ 560line 559 didn't jump to line 560, because the condition on line 559 was never true
560 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
561 if self._children is None:
562 self._children = {}
564 conflict_child = self.get(child.name)
565 if conflict_child is not None: 565 ↛ 566line 565 didn't jump to line 566, because the condition on line 565 was never true
566 conflict_child.unlink(recursive=True)
567 self._children[child.name] = child
569 @property
570 def tar_path(self) -> str:
571 path = self.path
572 if self.is_dir:
573 return path + "/"
574 return path
576 @property
577 def path(self) -> str:
578 parent_path = self.parent_dir_path
579 if (
580 self._parent_path_cache is not None
581 and self._parent_path_cache == parent_path
582 ):
583 return assume_not_none(self._path_cache)
584 if parent_path is None: 584 ↛ 585line 584 didn't jump to line 585, because the condition on line 584 was never true
585 raise ReferenceError(
586 f"The path {self.name} is detached! {self.__class__.__name__}"
587 )
588 self._parent_path_cache = parent_path
589 ret = os.path.join(parent_path, self.name)
590 self._path_cache = ret
591 return ret
593 @property
594 def parent_dir(self) -> Optional["FSPath"]:
595 p_ref = self._parent_dir
596 p = p_ref() if p_ref is not None else None
597 if p is None: 597 ↛ 598line 597 didn't jump to line 598, because the condition on line 597 was never true
598 raise ReferenceError(
599 f"The path {self.name} is detached! {self.__class__.__name__}"
600 )
601 return p
603 @parent_dir.setter
604 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
605 self._rw_check()
606 if new_parent is not None:
607 if not new_parent.is_dir: 607 ↛ 608line 607 didn't jump to line 608, because the condition on line 607 was never true
608 raise ValueError(
609 f"The parent {new_parent._orphan_safe_path()} must be a directory"
610 )
611 new_parent._rw_check()
612 old_parent = None
613 self._last_known_parent_path = None
614 if not self.is_detached:
615 old_parent = self.parent_dir
616 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
617 del old_parent_children[self.name]
618 if new_parent is not None:
619 self._parent_dir = ref(new_parent)
620 new_parent._add_child(self)
621 else:
622 if old_parent is not None and not old_parent.is_detached: 622 ↛ 624line 622 didn't jump to line 624, because the condition on line 622 was never false
623 self._last_known_parent_path = old_parent.path
624 self._parent_dir = None
625 self._parent_path_cache = None
627 @property
628 def parent_dir_path(self) -> Optional[str]:
629 if self.is_detached: 629 ↛ 630line 629 didn't jump to line 630, because the condition on line 629 was never true
630 return self._last_known_parent_path
631 return assume_not_none(self.parent_dir).path
633 def chown(
634 self,
635 owner: Optional[StaticFileSystemOwner],
636 group: Optional[StaticFileSystemGroup],
637 ) -> None:
638 """Change the owner/group of this path
640 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
641 :param group: The desired group definition for this path. If None, then no change of group is performed.
642 """
643 self._rw_check()
645 if owner is not None:
646 self._owner = owner.ownership_definition
647 if group is not None:
648 self._group = group.ownership_definition
650 def stat(self) -> os.stat_result:
651 st = self._stat_cache
652 if st is None:
653 st = self._uncached_stat()
654 self._stat_cache = st
655 return st
657 def _uncached_stat(self) -> os.stat_result:
658 return os.lstat(self.fs_path)
660 @property
661 def mode(self) -> int:
662 current_mode = self._mode
663 if current_mode is None: 663 ↛ 664line 663 didn't jump to line 664, because the condition on line 663 was never true
664 current_mode = stat.S_IMODE(self.stat().st_mode)
665 self._mode = current_mode
666 return current_mode
668 @mode.setter
669 def mode(self, new_mode: int) -> None:
670 self._rw_check()
671 min_bit = 0o500 if self.is_dir else 0o400
672 if (new_mode & min_bit) != min_bit: 672 ↛ 673line 672 didn't jump to line 673, because the condition on line 672 was never true
673 omode = oct(new_mode)[2:]
674 omin = oct(min_bit)[2:]
675 raise ValueError(
676 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
677 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
678 " There are no paths that do not need these requirements met and they can cause"
679 " problems during build or on the final system."
680 )
681 self._mode = new_mode
683 @property
684 def mtime(self) -> float:
685 mtime = self._mtime
686 if mtime is None:
687 mtime = self.stat().st_mtime
688 self._mtime = mtime
689 return mtime
691 @mtime.setter
692 def mtime(self, new_mtime: float) -> None:
693 self._rw_check()
694 self._mtime = new_mtime
696 @property
697 def tar_owner_info(self) -> Tuple[str, int, str, int]:
698 owner = self._owner
699 group = self._group
700 return (
701 owner.entity_name,
702 owner.entity_id,
703 group.entity_name,
704 group.entity_id,
705 )
707 @property
708 def _can_replace_inline(self) -> bool:
709 return False
711 @contextlib.contextmanager
712 def add_file(
713 self,
714 name: str,
715 *,
716 unlink_if_exists: bool = True,
717 use_fs_path_mode: bool = False,
718 mode: int = 0o0644,
719 mtime: Optional[float] = None,
720 # Special-case parameters that are not exposed in the API
721 fs_basename_matters: bool = False,
722 subdir_key: Optional[str] = None,
723 ) -> Iterator["FSPath"]:
724 if "/" in name or name in {".", ".."}: 724 ↛ 725line 724 didn't jump to line 725, because the condition on line 724 was never true
725 raise ValueError(f'Invalid file name: "{name}"')
726 if not self.is_dir: 726 ↛ 727line 726 didn't jump to line 727, because the condition on line 726 was never true
727 raise TypeError(
728 f"Cannot create {self._orphan_safe_path()}/{name}:"
729 f" {self._orphan_safe_path()} is not a directory"
730 )
731 self._rw_check()
732 existing = self.get(name)
733 if existing is not None: 733 ↛ 734line 733 didn't jump to line 734, because the condition on line 733 was never true
734 if not unlink_if_exists:
735 raise ValueError(
736 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
737 f" and exist_ok was False"
738 )
739 existing.unlink(recursive=False)
741 if fs_basename_matters and subdir_key is None: 741 ↛ 742line 741 didn't jump to line 742, because the condition on line 741 was never true
742 raise ValueError(
743 "When fs_basename_matters is True, a subdir_key must be provided"
744 )
746 directory = generated_content_dir(subdir_key=subdir_key)
748 if fs_basename_matters: 748 ↛ 749line 748 didn't jump to line 749, because the condition on line 748 was never true
749 fs_path = os.path.join(directory, name)
750 with open(fs_path, "xb") as _:
751 # Ensure that the fs_path exists
752 pass
753 child = FSBackedFilePath(
754 name,
755 self,
756 fs_path,
757 replaceable_inline=True,
758 mtime=mtime,
759 )
760 yield child
761 else:
762 with tempfile.NamedTemporaryFile(
763 dir=directory, suffix=f"__{name}", delete=False
764 ) as fd:
765 fs_path = fd.name
766 child = FSBackedFilePath(
767 name,
768 self,
769 fs_path,
770 replaceable_inline=True,
771 mtime=mtime,
772 )
773 fd.close()
774 yield child
776 if use_fs_path_mode: 776 ↛ 778line 776 didn't jump to line 778, because the condition on line 776 was never true
777 # Ensure the caller can see the current mode
778 os.chmod(fs_path, mode)
779 _check_fs_path_is_file(fs_path, unlink_on_error=child)
780 child._reset_caches()
781 if not use_fs_path_mode: 781 ↛ exitline 781 didn't return from function 'add_file', because the condition on line 781 was never false
782 child.mode = mode
784 def insert_file_from_fs_path(
785 self,
786 name: str,
787 fs_path: str,
788 *,
789 exist_ok: bool = True,
790 use_fs_path_mode: bool = False,
791 mode: int = 0o0644,
792 require_copy_on_write: bool = True,
793 follow_symlinks: bool = True,
794 reference_path: Optional[VirtualPath] = None,
795 ) -> "FSPath":
796 if "/" in name or name in {".", ".."}: 796 ↛ 797line 796 didn't jump to line 797, because the condition on line 796 was never true
797 raise ValueError(f'Invalid file name: "{name}"')
798 if not self.is_dir: 798 ↛ 799line 798 didn't jump to line 799, because the condition on line 798 was never true
799 raise TypeError(
800 f"Cannot create {self._orphan_safe_path()}/{name}:"
801 f" {self._orphan_safe_path()} is not a directory"
802 )
803 self._rw_check()
804 if name in self and not exist_ok: 804 ↛ 805line 804 didn't jump to line 805, because the condition on line 804 was never true
805 raise ValueError(
806 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
807 f" and exist_ok was False"
808 )
809 new_fs_path = fs_path
810 if follow_symlinks:
811 if reference_path is not None: 811 ↛ 812line 811 didn't jump to line 812, because the condition on line 811 was never true
812 raise ValueError(
813 "The reference_path cannot be used with follow_symlinks"
814 )
815 new_fs_path = os.path.realpath(new_fs_path, strict=True)
817 fmode: Optional[int] = mode
818 if use_fs_path_mode:
819 fmode = None
821 st = None
822 if reference_path is None:
823 st = os.lstat(new_fs_path)
824 if stat.S_ISDIR(st.st_mode): 824 ↛ 825line 824 didn't jump to line 825, because the condition on line 824 was never true
825 raise ValueError(
826 f'The provided path "{fs_path}" is a directory. However, this'
827 " method does not support directories"
828 )
830 if not stat.S_ISREG(st.st_mode): 830 ↛ 831line 830 didn't jump to line 831, because the condition on line 830 was never true
831 if follow_symlinks:
832 raise ValueError(
833 f"The resolved fs_path ({new_fs_path}) was not a file."
834 )
835 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
836 return FSBackedFilePath(
837 name,
838 self,
839 new_fs_path,
840 initial_mode=fmode,
841 stat_cache=st,
842 replaceable_inline=not require_copy_on_write,
843 reference_path=reference_path,
844 )
846 def add_symlink(
847 self,
848 link_name: str,
849 link_target: str,
850 *,
851 reference_path: Optional[VirtualPath] = None,
852 ) -> "FSPath":
853 if "/" in link_name or link_name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854, because the condition on line 853 was never true
854 raise ValueError(
855 f'Invalid file name: "{link_name}" (it must be a valid basename)'
856 )
857 if not self.is_dir: 857 ↛ 858line 857 didn't jump to line 858, because the condition on line 857 was never true
858 raise TypeError(
859 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
860 f" {self._orphan_safe_path()} is not a directory"
861 )
862 self._rw_check()
864 existing = self.get(link_name)
865 if existing: 865 ↛ 867line 865 didn't jump to line 867, because the condition on line 865 was never true
866 # Emulate ln -sf with attempts a non-recursive unlink first.
867 existing.unlink(recursive=False)
869 return SymlinkVirtualPath(
870 link_name,
871 self,
872 link_target,
873 reference_path=reference_path,
874 )
876 def mkdir(
877 self,
878 name: str,
879 *,
880 reference_path: Optional[VirtualPath] = None,
881 ) -> "FSPath":
882 if "/" in name or name in {".", ".."}: 882 ↛ 883line 882 didn't jump to line 883, because the condition on line 882 was never true
883 raise ValueError(
884 f'Invalid file name: "{name}" (it must be a valid basename)'
885 )
886 if not self.is_dir: 886 ↛ 887line 886 didn't jump to line 887, because the condition on line 886 was never true
887 raise TypeError(
888 f"Cannot create {self._orphan_safe_path()}/{name}:"
889 f" {self._orphan_safe_path()} is not a directory"
890 )
891 if reference_path is not None and not reference_path.is_dir: 891 ↛ 892line 891 didn't jump to line 892, because the condition on line 891 was never true
892 raise ValueError(
893 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
894 )
895 self._rw_check()
897 existing = self.get(name)
898 if existing: 898 ↛ 899line 898 didn't jump to line 899, because the condition on line 898 was never true
899 raise ValueError(f"Path {existing.path} already exist")
900 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
902 def mkdirs(self, path: str) -> "FSPath":
903 return cast("FSPath", super().mkdirs(path))
905 @property
906 def is_read_write(self) -> bool:
907 """When true, the file system entry may be mutated
909 :return: Whether file system mutations are permitted.
910 """
911 if self.is_detached:
912 return True
913 return assume_not_none(self.parent_dir).is_read_write
915 def unlink(self, *, recursive: bool = False) -> None:
916 """Unlink a file or a directory
918 This operation will detach the path from the file system (causing "is_detached" to return True).
920 Note that the root directory cannot be deleted.
922 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
923 as well. When False, an error is raised if the path is a non-empty directory
924 """
925 if self.is_detached: 925 ↛ 926line 925 didn't jump to line 926, because the condition on line 925 was never true
926 return
927 if not recursive and any(self.iterdir): 927 ↛ 928line 927 didn't jump to line 928, because the condition on line 927 was never true
928 raise ValueError(
929 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
930 )
931 # The .parent_dir setter does a _rw_check() for us.
932 self.parent_dir = None
934 def _reset_caches(self) -> None:
935 self._mtime = None
936 self._stat_cache = None
938 def metadata(
939 self,
940 metadata_type: Type[PMT],
941 *,
942 owning_plugin: Optional[str] = None,
943 ) -> PathMetadataReference[PMT]:
944 current_plugin = self._current_plugin()
945 if owning_plugin is None: 945 ↛ 947line 945 didn't jump to line 947, because the condition on line 945 was never false
946 owning_plugin = current_plugin
947 metadata_key = (owning_plugin, metadata_type)
948 metadata_value = self._metadata.get(metadata_key)
949 if metadata_value is None:
950 if self.is_detached: 950 ↛ 951line 950 didn't jump to line 951, because the condition on line 950 was never true
951 raise TypeError(
952 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
953 )
954 if not self.is_read_write:
955 return AlwaysEmptyReadOnlyMetadataReference(
956 owning_plugin,
957 current_plugin,
958 metadata_type,
959 )
960 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
961 self._metadata[metadata_key] = metadata_value
962 return PathMetadataReferenceImplementation(
963 self,
964 current_plugin,
965 metadata_value,
966 )
968 @contextlib.contextmanager
969 def replace_fs_path_content(
970 self,
971 *,
972 use_fs_path_mode: bool = False,
973 ) -> Iterator[str]:
974 if not self.is_file: 974 ↛ 975line 974 didn't jump to line 975, because the condition on line 974 was never true
975 raise TypeError(
976 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
977 )
978 self._rw_check()
979 fs_path = self.fs_path
980 if not self._can_replace_inline: 980 ↛ 992line 980 didn't jump to line 992, because the condition on line 980 was never false
981 fs_path = self.fs_path
982 directory = generated_content_dir()
983 with tempfile.NamedTemporaryFile(
984 dir=directory, suffix=f"__{self.name}", delete=False
985 ) as new_path_fd:
986 new_path_fd.close()
987 _cp_a(fs_path, new_path_fd.name)
988 fs_path = new_path_fd.name
989 self._replaced_path(fs_path)
990 assert self.fs_path == fs_path
992 current_mtime = self._mtime
993 if current_mtime is not None:
994 os.utime(fs_path, (current_mtime, current_mtime))
996 current_mode = self.mode
997 yield fs_path
998 _check_fs_path_is_file(fs_path, unlink_on_error=self)
999 if not use_fs_path_mode: 999 ↛ 1001line 999 didn't jump to line 1001, because the condition on line 999 was never false
1000 os.chmod(fs_path, current_mode)
1001 self._reset_caches()
1003 def _replaced_path(self, new_fs_path: str) -> None:
1004 raise NotImplementedError
1007class VirtualFSPathBase(FSPath, ABC):
1008 __slots__ = ()
1010 def __init__(
1011 self,
1012 basename: str,
1013 parent: Optional["FSPath"],
1014 children: Optional[Dict[str, "FSPath"]] = None,
1015 initial_mode: Optional[int] = None,
1016 mtime: Optional[float] = None,
1017 stat_cache: Optional[os.stat_result] = None,
1018 ) -> None:
1019 super().__init__(
1020 basename,
1021 parent,
1022 children,
1023 initial_mode=initial_mode,
1024 mtime=mtime,
1025 stat_cache=stat_cache,
1026 )
1028 @property
1029 def mtime(self) -> float:
1030 mtime = self._mtime
1031 if mtime is None:
1032 mtime = time.time()
1033 self._mtime = mtime
1034 return mtime
1036 @property
1037 def has_fs_path(self) -> bool:
1038 return False
1040 def stat(self) -> os.stat_result:
1041 if not self.has_fs_path:
1042 raise PureVirtualPathError(
1043 "stat() is only applicable to paths backed by the file system. The path"
1044 f" {self._orphan_safe_path()!r} is purely virtual"
1045 )
1046 return super().stat()
1048 @property
1049 def fs_path(self) -> str:
1050 if not self.has_fs_path:
1051 raise PureVirtualPathError(
1052 "fs_path is only applicable to paths backed by the file system. The path"
1053 f" {self._orphan_safe_path()!r} is purely virtual"
1054 )
1055 return self.fs_path
1058class FSRootDir(FSPath):
1059 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1061 def __init__(self, fs_path: Optional[str] = None) -> None:
1062 self._fs_path = fs_path
1063 self._fs_read_write = True
1064 super().__init__(
1065 ".",
1066 None,
1067 children={},
1068 initial_mode=0o755,
1069 )
1070 self._plugin_context = CurrentPluginContextManager("debputy")
1072 @property
1073 def is_detached(self) -> bool:
1074 return False
1076 def _orphan_safe_path(self) -> str:
1077 return self.name
1079 @property
1080 def path(self) -> str:
1081 return self.name
1083 @property
1084 def parent_dir(self) -> Optional["FSPath"]:
1085 return None
1087 @parent_dir.setter
1088 def parent_dir(self, new_parent: Optional[FSPath]) -> None:
1089 if new_parent is not None:
1090 raise ValueError("The root directory cannot become a non-root directory")
1092 @property
1093 def parent_dir_path(self) -> Optional[str]:
1094 return None
1096 @property
1097 def is_dir(self) -> bool:
1098 return True
1100 @property
1101 def is_file(self) -> bool:
1102 return False
1104 @property
1105 def is_symlink(self) -> bool:
1106 return False
1108 def readlink(self) -> str:
1109 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1111 @property
1112 def has_fs_path(self) -> bool:
1113 return self._fs_path is not None
1115 def stat(self) -> os.stat_result:
1116 if not self.has_fs_path:
1117 raise PureVirtualPathError(
1118 "stat() is only applicable to paths backed by the file system. The path"
1119 f" {self._orphan_safe_path()!r} is purely virtual"
1120 )
1121 return os.stat(self.fs_path)
1123 @property
1124 def fs_path(self) -> str:
1125 if not self.has_fs_path: 1125 ↛ 1126line 1125 didn't jump to line 1126, because the condition on line 1125 was never true
1126 raise PureVirtualPathError(
1127 "fs_path is only applicable to paths backed by the file system. The path"
1128 f" {self._orphan_safe_path()!r} is purely virtual"
1129 )
1130 return assume_not_none(self._fs_path)
1132 @property
1133 def is_read_write(self) -> bool:
1134 return self._fs_read_write
1136 @is_read_write.setter
1137 def is_read_write(self, new_value: bool) -> None:
1138 self._fs_read_write = new_value
1140 def prune_if_empty_dir(self) -> None:
1141 # No-op for the root directory. There is never a case where you want to delete this directory
1142 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1143 return
1145 def unlink(self, *, recursive: bool = False) -> None:
1146 # There is never a case where you want to delete this directory (and even if you could,
1147 # debputy will need it for technical reasons, so the root dir stays)
1148 raise TypeError("Cannot delete the root directory")
1150 def _current_plugin(self) -> str:
1151 return self._plugin_context.current_plugin_name
1153 @contextlib.contextmanager
1154 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1155 with self._plugin_context.change_plugin_context(new_plugin) as r:
1156 yield r
1159class VirtualPathWithReference(VirtualFSPathBase, ABC):
1160 __slots__ = ("_reference_path",)
1162 def __init__(
1163 self,
1164 basename: str,
1165 parent: FSPath,
1166 *,
1167 default_mode: int,
1168 reference_path: Optional[VirtualPath] = None,
1169 ) -> None:
1170 super().__init__(
1171 basename,
1172 parent=parent,
1173 initial_mode=reference_path.mode if reference_path else default_mode,
1174 )
1175 self._reference_path = reference_path
1177 @property
1178 def has_fs_path(self) -> bool:
1179 ref_path = self._reference_path
1180 return ref_path is not None and ref_path.has_fs_path
1182 @property
1183 def mtime(self) -> float:
1184 mtime = self._mtime
1185 if mtime is None: 1185 ↛ 1192line 1185 didn't jump to line 1192, because the condition on line 1185 was never false
1186 ref_path = self._reference_path
1187 if ref_path: 1187 ↛ 1190line 1187 didn't jump to line 1190, because the condition on line 1187 was never false
1188 mtime = ref_path.mtime
1189 else:
1190 mtime = super().mtime
1191 self._mtime = mtime
1192 return mtime
1194 @mtime.setter
1195 def mtime(self, new_mtime: float) -> None:
1196 self._rw_check()
1197 self._mtime = new_mtime
1199 @property
1200 def fs_path(self) -> str:
1201 ref_path = self._reference_path
1202 if ref_path is not None and ( 1202 ↛ 1206line 1202 didn't jump to line 1206, because the condition on line 1202 was never false
1203 not super().has_fs_path or super().fs_path == ref_path.fs_path
1204 ):
1205 return ref_path.fs_path
1206 return super().fs_path
1208 def stat(self) -> os.stat_result:
1209 ref_path = self._reference_path
1210 if ref_path is not None and (
1211 not super().has_fs_path or super().fs_path == ref_path.fs_path
1212 ):
1213 return ref_path.stat()
1214 return super().stat()
1216 def open(
1217 self,
1218 *,
1219 byte_io: bool = False,
1220 buffering: int = -1,
1221 ) -> Union[TextIO, BinaryIO]:
1222 reference_path = self._reference_path
1223 if reference_path is not None and reference_path.fs_path == self.fs_path:
1224 return reference_path.open(byte_io=byte_io, buffering=buffering)
1225 return super().open(byte_io=byte_io, buffering=buffering)
1228class VirtualDirectoryFSPath(VirtualPathWithReference):
1229 __slots__ = ("_reference_path",)
1231 def __init__(
1232 self,
1233 basename: str,
1234 parent: FSPath,
1235 *,
1236 reference_path: Optional[VirtualPath] = None,
1237 ) -> None:
1238 super().__init__(
1239 basename,
1240 parent,
1241 reference_path=reference_path,
1242 default_mode=0o755,
1243 )
1244 self._reference_path = reference_path
1245 assert reference_path is None or reference_path.is_dir
1247 @property
1248 def is_dir(self) -> bool:
1249 return True
1251 @property
1252 def is_file(self) -> bool:
1253 return False
1255 @property
1256 def is_symlink(self) -> bool:
1257 return False
1259 def readlink(self) -> str:
1260 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1263class SymlinkVirtualPath(VirtualPathWithReference):
1264 __slots__ = ("_link_target",)
1266 def __init__(
1267 self,
1268 basename: str,
1269 parent_dir: FSPath,
1270 link_target: str,
1271 *,
1272 reference_path: Optional[VirtualPath] = None,
1273 ) -> None:
1274 super().__init__(
1275 basename,
1276 parent=parent_dir,
1277 default_mode=_SYMLINK_MODE,
1278 reference_path=reference_path,
1279 )
1280 self._link_target = link_target
1282 @property
1283 def is_dir(self) -> bool:
1284 return False
1286 @property
1287 def is_file(self) -> bool:
1288 return False
1290 @property
1291 def is_symlink(self) -> bool:
1292 return True
1294 def readlink(self) -> str:
1295 return self._link_target
1298class FSBackedFilePath(VirtualPathWithReference):
1299 __slots__ = ("_fs_path", "_replaceable_inline")
1301 def __init__(
1302 self,
1303 basename: str,
1304 parent_dir: FSPath,
1305 fs_path: str,
1306 *,
1307 replaceable_inline: bool = False,
1308 initial_mode: Optional[int] = None,
1309 mtime: Optional[float] = None,
1310 stat_cache: Optional[os.stat_result] = None,
1311 reference_path: Optional[VirtualPath] = None,
1312 ) -> None:
1313 super().__init__(
1314 basename,
1315 parent_dir,
1316 default_mode=0o644,
1317 reference_path=reference_path,
1318 )
1319 self._fs_path = fs_path
1320 self._replaceable_inline = replaceable_inline
1321 if initial_mode is not None:
1322 self.mode = initial_mode
1323 if mtime is not None:
1324 self._mtime = mtime
1325 self._stat_cache = stat_cache
1326 assert (
1327 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1328 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1330 @property
1331 def is_dir(self) -> bool:
1332 return False
1334 @property
1335 def is_file(self) -> bool:
1336 return True
1338 @property
1339 def is_symlink(self) -> bool:
1340 return False
1342 def readlink(self) -> str:
1343 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1345 @property
1346 def has_fs_path(self) -> bool:
1347 return True
1349 @property
1350 def fs_path(self) -> str:
1351 return self._fs_path
1353 @property
1354 def _can_replace_inline(self) -> bool:
1355 return self._replaceable_inline
1357 def _replaced_path(self, new_fs_path: str) -> None:
1358 self._fs_path = new_fs_path
1359 self._reference_path = None
1360 self._replaceable_inline = True
1363_SYMLINK_MODE = 0o777
1366class VirtualTestPath(FSPath):
1367 __slots__ = (
1368 "_path_type",
1369 "_has_fs_path",
1370 "_fs_path",
1371 "_link_target",
1372 "_content",
1373 "_materialized_content",
1374 )
1376 def __init__(
1377 self,
1378 basename: str,
1379 parent_dir: Optional[FSPath],
1380 mode: Optional[int] = None,
1381 mtime: Optional[float] = None,
1382 is_dir: bool = False,
1383 has_fs_path: Optional[bool] = False,
1384 fs_path: Optional[str] = None,
1385 link_target: Optional[str] = None,
1386 content: Optional[str] = None,
1387 materialized_content: Optional[str] = None,
1388 ) -> None:
1389 if is_dir:
1390 self._path_type = PathType.DIRECTORY
1391 elif link_target is not None:
1392 self._path_type = PathType.SYMLINK
1393 if mode is not None and mode != _SYMLINK_MODE: 1393 ↛ 1394line 1393 didn't jump to line 1394, because the condition on line 1393 was never true
1394 raise ValueError(
1395 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1396 )
1397 assert mode is None or mode == _SYMLINK_MODE
1398 else:
1399 self._path_type = PathType.FILE
1401 if mode is not None:
1402 initial_mode = mode
1403 else:
1404 initial_mode = 0o755 if is_dir else 0o644
1406 self._link_target = link_target
1407 if has_fs_path is None:
1408 has_fs_path = bool(fs_path)
1409 self._has_fs_path = has_fs_path
1410 self._fs_path = fs_path
1411 self._materialized_content = materialized_content
1412 super().__init__(
1413 basename,
1414 parent=parent_dir,
1415 initial_mode=initial_mode,
1416 mtime=mtime,
1417 )
1418 self._content = content
1420 @property
1421 def is_dir(self) -> bool:
1422 return self._path_type == PathType.DIRECTORY
1424 @property
1425 def is_file(self) -> bool:
1426 return self._path_type == PathType.FILE
1428 @property
1429 def is_symlink(self) -> bool:
1430 return self._path_type == PathType.SYMLINK
1432 def readlink(self) -> str:
1433 if not self.is_symlink: 1433 ↛ 1434line 1433 didn't jump to line 1434, because the condition on line 1433 was never true
1434 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1435 link_target = self._link_target
1436 assert link_target is not None
1437 return link_target
1439 @property
1440 def mtime(self) -> float:
1441 if self._mtime is None:
1442 self._mtime = time.time()
1443 return self._mtime
1445 @mtime.setter
1446 def mtime(self, new_mtime: float) -> None:
1447 self._rw_check()
1448 self._mtime = new_mtime
1450 @property
1451 def has_fs_path(self) -> bool:
1452 return self._has_fs_path
1454 def stat(self) -> os.stat_result:
1455 if self.has_fs_path: 1455 ↛ 1470line 1455 didn't jump to line 1470, because the condition on line 1455 was never false
1456 path = self.fs_path
1457 if path is None: 1457 ↛ 1458line 1457 didn't jump to line 1458, because the condition on line 1457 was never true
1458 raise PureVirtualPathError(
1459 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1460 " cannot provide!"
1461 )
1462 try:
1463 return os.stat(path)
1464 except FileNotFoundError as e:
1465 raise PureVirtualPathError(
1466 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1467 " cannot provide! (An fs_path was provided, but it did not exist)"
1468 ) from e
1470 raise PureVirtualPathError(
1471 "stat() is only applicable to paths backed by the file system. The path"
1472 f" {self._orphan_safe_path()!r} is purely virtual"
1473 )
1475 @property
1476 def size(self) -> int:
1477 if self._content is not None: 1477 ↛ 1478line 1477 didn't jump to line 1478, because the condition on line 1477 was never true
1478 return len(self._content.encode("utf-8"))
1479 if not self.has_fs_path or self.fs_path is None:
1480 return 0
1481 return self.stat().st_size
1483 @property
1484 def fs_path(self) -> str:
1485 if self.has_fs_path:
1486 if self._fs_path is None and self._materialized_content is not None:
1487 with tempfile.NamedTemporaryFile(
1488 mode="w+t",
1489 encoding="utf-8",
1490 suffix=f"__{self.name}",
1491 delete=False,
1492 ) as fd:
1493 filepath = fd.name
1494 fd.write(self._materialized_content)
1495 self._fs_path = filepath
1496 atexit.register(lambda: os.unlink(filepath)) 1496 ↛ exitline 1496 didn't run the lambda on line 1496
1498 path = self._fs_path
1499 if path is None: 1499 ↛ 1500line 1499 didn't jump to line 1500, because the condition on line 1499 was never true
1500 raise PureVirtualPathError(
1501 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1502 " mock path cannot provide!"
1503 )
1504 return path
1505 raise PureVirtualPathError(
1506 "fs_path is only applicable to paths backed by the file system. The path"
1507 f" {self._orphan_safe_path()!r} is purely virtual"
1508 )
1510 def replace_fs_path_content(
1511 self,
1512 *,
1513 use_fs_path_mode: bool = False,
1514 ) -> ContextManager[str]:
1515 if self._content is not None: 1515 ↛ 1516line 1515 didn't jump to line 1516, because the condition on line 1515 was never true
1516 raise TypeError(
1517 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1518 " created with `content` but for this method to work, the path should have been"
1519 " created with `materialized_content`"
1520 )
1521 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1523 def open(
1524 self,
1525 *,
1526 byte_io: bool = False,
1527 buffering: int = -1,
1528 ) -> Union[TextIO, BinaryIO]:
1529 if self._content is None:
1530 try:
1531 return super().open(byte_io=byte_io, buffering=buffering)
1532 except FileNotFoundError as e:
1533 raise TestPathWithNonExistentFSPathError(
1534 "The test path {self.path} had an fs_path {self._fs_path}, which does not"
1535 " exist. This exception can only occur in the testsuite. Either have the"
1536 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1537 " if that is too painful in general, have the code accept this error as a "
1538 " test only-case and provide a default."
1539 ) from e
1541 if byte_io:
1542 return io.BytesIO(self._content.encode("utf-8"))
1543 return io.StringIO(self._content)
1545 def _replaced_path(self, new_fs_path: str) -> None:
1546 self._fs_path = new_fs_path
1549class FSROOverlay(VirtualPathBase):
1550 __slots__ = (
1551 "_path",
1552 "_fs_path",
1553 "_parent",
1554 "_stat_cache",
1555 "_readlink_cache",
1556 "_children",
1557 "_stat_failed_cache",
1558 "__weakref__",
1559 )
1561 def __init__(
1562 self,
1563 path: str,
1564 fs_path: str,
1565 parent: Optional["FSROOverlay"],
1566 ) -> None:
1567 self._path: str = path
1568 self._fs_path: str = _normalize_path(fs_path, with_prefix=False)
1569 self._parent: Optional[ReferenceType[FSROOverlay]] = (
1570 ref(parent) if parent is not None else None
1571 )
1572 self._stat_cache: Optional[os.stat_result] = None
1573 self._readlink_cache: Optional[str] = None
1574 self._stat_failed_cache = False
1575 self._children: Optional[Mapping[str, FSROOverlay]] = None
1577 @classmethod
1578 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay":
1579 return FSROOverlay(path, fs_path, None)
1581 @property
1582 def name(self) -> str:
1583 return os.path.basename(self._path)
1585 @property
1586 def iterdir(self) -> Iterable["FSROOverlay"]:
1587 if not self.is_dir:
1588 return
1589 if self._children is None:
1590 self._ensure_children_are_resolved()
1591 yield from assume_not_none(self._children).values()
1593 def lookup(self, path: str) -> Optional["FSROOverlay"]:
1594 if not self.is_dir:
1595 return None
1596 if self._children is None:
1597 self._ensure_children_are_resolved()
1599 absolute, _, path_parts = _split_path(path)
1600 current = cast("FSROOverlay", _root(self)) if absolute else self
1601 for no, dir_part in enumerate(path_parts):
1602 if dir_part == ".":
1603 continue
1604 if dir_part == "..":
1605 p = current.parent_dir
1606 if current is None:
1607 raise ValueError(f'The path "{path}" escapes the root dir')
1608 current = p
1609 continue
1610 try:
1611 current = current[dir_part]
1612 except KeyError:
1613 return None
1614 return current
1616 def all_paths(self) -> Iterable["FSROOverlay"]:
1617 yield self
1618 if not self.is_dir:
1619 return
1620 stack = list(self.iterdir)
1621 stack.reverse()
1622 while stack:
1623 current = stack.pop()
1624 yield current
1625 if current.is_dir:
1626 if current._children is None:
1627 current._ensure_children_are_resolved()
1628 stack.extend(reversed(current._children.values()))
1630 def _ensure_children_are_resolved(self) -> None:
1631 if not self.is_dir or self._children:
1632 return
1633 dir_path = self.path
1634 dir_fs_path = self.fs_path
1635 children = {}
1636 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1637 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1638 child_fs_path = (
1639 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1640 )
1641 children[name] = FSROOverlay(
1642 child_path,
1643 child_fs_path,
1644 self,
1645 )
1646 self._children = children
1648 @property
1649 def is_detached(self) -> bool:
1650 return False
1652 def __getitem__(self, key) -> "VirtualPath":
1653 if not self.is_dir:
1654 raise KeyError(key)
1655 if self._children is None:
1656 self._ensure_children_are_resolved()
1657 if isinstance(key, FSPath):
1658 key = key.name
1659 return self._children[key]
1661 def __delitem__(self, key) -> None:
1662 self._error_ro_fs()
1664 @property
1665 def is_read_write(self) -> bool:
1666 return False
1668 def _rw_check(self) -> None:
1669 self._error_ro_fs()
1671 def _error_ro_fs(self) -> NoReturn:
1672 raise DebputyFSIsROError(
1673 f'Attempt to write to "{self.path}" failed:'
1674 " Debputy Virtual File system is R/O."
1675 )
1677 @property
1678 def path(self) -> str:
1679 return self._path
1681 @property
1682 def parent_dir(self) -> Optional["FSROOverlay"]:
1683 parent = self._parent
1684 if parent is None:
1685 return None
1686 resolved = parent()
1687 if resolved is None:
1688 raise RuntimeError("Parent was garbage collected!")
1689 return resolved
1691 def stat(self) -> os.stat_result:
1692 if self._stat_failed_cache:
1693 raise FileNotFoundError(
1694 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
1695 )
1697 if self._stat_cache is None:
1698 try:
1699 self._stat_cache = os.lstat(self.fs_path)
1700 except FileNotFoundError:
1701 self._stat_failed_cache = True
1702 raise
1703 return self._stat_cache
1705 @property
1706 def mode(self) -> int:
1707 return stat.S_IMODE(self.stat().st_mode)
1709 @mode.setter
1710 def mode(self, _unused: int) -> None:
1711 self._error_ro_fs()
1713 @property
1714 def mtime(self) -> float:
1715 return self.stat().st_mtime
1717 @mtime.setter
1718 def mtime(self, new_mtime: float) -> None:
1719 self._error_ro_fs()
1721 def readlink(self) -> str:
1722 if not self.is_symlink:
1723 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1724 if self._readlink_cache is None:
1725 self._readlink_cache = os.readlink(self.fs_path)
1726 return self._readlink_cache
1728 @property
1729 def fs_path(self) -> str:
1730 return self._fs_path
1732 @property
1733 def is_dir(self) -> bool:
1734 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1735 try:
1736 return stat.S_ISDIR(self.stat().st_mode)
1737 except FileNotFoundError:
1738 return False
1740 @property
1741 def is_file(self) -> bool:
1742 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1743 try:
1744 return stat.S_ISREG(self.stat().st_mode)
1745 except FileNotFoundError:
1746 return False
1748 @property
1749 def is_symlink(self) -> bool:
1750 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1751 try:
1752 return stat.S_ISLNK(self.stat().st_mode)
1753 except FileNotFoundError:
1754 return False
1756 @property
1757 def has_fs_path(self) -> bool:
1758 return True
1760 def open(
1761 self,
1762 *,
1763 byte_io: bool = False,
1764 buffering: int = -1,
1765 ) -> Union[TextIO, BinaryIO]:
1766 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1767 # case.
1768 if not self.is_file and not self.is_symlink:
1769 raise TypeError(
1770 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1771 )
1773 if byte_io:
1774 return open(self.fs_path, "rb", buffering=buffering)
1775 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering)
1777 def chown(
1778 self,
1779 owner: Optional[StaticFileSystemOwner],
1780 group: Optional[StaticFileSystemGroup],
1781 ) -> None:
1782 self._error_ro_fs()
1784 def mkdir(self, name: str) -> "VirtualPath":
1785 self._error_ro_fs()
1787 def add_file(
1788 self,
1789 name: str,
1790 *,
1791 unlink_if_exists: bool = True,
1792 use_fs_path_mode: bool = False,
1793 mode: int = 0o0644,
1794 mtime: Optional[float] = None,
1795 ) -> ContextManager["VirtualPath"]:
1796 self._error_ro_fs()
1798 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
1799 self._error_ro_fs()
1801 def unlink(self, *, recursive: bool = False) -> None:
1802 self._error_ro_fs()
1804 def metadata(
1805 self,
1806 metadata_type: Type[PMT],
1807 *,
1808 owning_plugin: Optional[str] = None,
1809 ) -> PathMetadataReference[PMT]:
1810 current_plugin = self._current_plugin()
1811 if owning_plugin is None:
1812 owning_plugin = current_plugin
1813 return AlwaysEmptyReadOnlyMetadataReference(
1814 owning_plugin,
1815 current_plugin,
1816 metadata_type,
1817 )
1820class FSROOverlayRootDir(FSROOverlay):
1821 __slots__ = ("_plugin_context",)
1823 def __init__(self, path: str, fs_path: str) -> None:
1824 super().__init__(path, fs_path, None)
1825 self._plugin_context = CurrentPluginContextManager("debputy")
1827 def _current_plugin(self) -> str:
1828 return self._plugin_context.current_plugin_name
1830 @contextlib.contextmanager
1831 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1832 with self._plugin_context.change_plugin_context(new_plugin) as r:
1833 yield r
1836def as_path_def(pd: Union[str, PathDef]) -> PathDef:
1837 return PathDef(pd) if isinstance(pd, str) else pd
1840def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]:
1841 yield from (as_path_def(p) for p in paths)
1844def build_virtual_fs(
1845 paths: Iterable[Union[str, PathDef]],
1846 read_write_fs: bool = False,
1847) -> "FSPath":
1848 root_dir: Optional[FSRootDir] = None
1849 directories: Dict[str, FSPath] = {}
1850 non_directories = set()
1852 def _ensure_parent_dirs(p: str) -> None:
1853 current = p.rstrip("/")
1854 missing_dirs = []
1855 while True:
1856 current = os.path.dirname(current)
1857 if current in directories:
1858 break
1859 if current in non_directories: 1859 ↛ 1860line 1859 didn't jump to line 1860, because the condition on line 1859 was never true
1860 raise ValueError(
1861 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
1862 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
1863 )
1864 missing_dirs.append(current)
1865 for dir_path in reversed(missing_dirs):
1866 parent_dir = directories[os.path.dirname(dir_path)]
1867 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
1868 directories[dir_path] = d
1870 for path_def in as_path_defs(paths):
1871 path = path_def.path_name
1872 if path in directories or path in non_directories: 1872 ↛ 1873line 1872 didn't jump to line 1873, because the condition on line 1872 was never true
1873 raise ValueError(
1874 f'Duplicate definition of "{path}". Can be false positive if input is not in'
1875 ' "correct order" (ensure directories occur before their children)'
1876 )
1877 if root_dir is None:
1878 root_fs_path = None
1879 if path in (".", "./", "/"):
1880 root_fs_path = path_def.fs_path
1881 root_dir = FSRootDir(fs_path=root_fs_path)
1882 directories["."] = root_dir
1884 if path not in (".", "./", "/") and not path.startswith("./"):
1885 path = "./" + path
1886 if path not in (".", "./", "/"):
1887 _ensure_parent_dirs(path)
1888 if path in (".", "./"):
1889 assert "." in directories
1890 continue
1891 is_dir = False
1892 if path.endswith("/"):
1893 path = path[:-1]
1894 is_dir = True
1895 directory = directories[os.path.dirname(path)]
1896 assert not is_dir or not bool(
1897 path_def.link_target
1898 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
1899 fs_path = VirtualTestPath(
1900 os.path.basename(path),
1901 directory,
1902 is_dir=is_dir,
1903 mode=path_def.mode,
1904 mtime=path_def.mtime,
1905 has_fs_path=path_def.has_fs_path,
1906 fs_path=path_def.fs_path,
1907 link_target=path_def.link_target,
1908 content=path_def.content,
1909 materialized_content=path_def.materialized_content,
1910 )
1911 assert not fs_path.is_detached
1912 if fs_path.is_dir:
1913 directories[fs_path.path] = fs_path
1914 else:
1915 non_directories.add(fs_path.path)
1917 if root_dir is None:
1918 root_dir = FSRootDir()
1920 root_dir.is_read_write = read_write_fs
1921 return root_dir