diff options
Diffstat (limited to 'src/debputy/filesystem_scan.py')
-rw-r--r-- | src/debputy/filesystem_scan.py | 1921 |
1 files changed, 1921 insertions, 0 deletions
diff --git a/src/debputy/filesystem_scan.py b/src/debputy/filesystem_scan.py new file mode 100644 index 0000000..f7f97c2 --- /dev/null +++ b/src/debputy/filesystem_scan.py @@ -0,0 +1,1921 @@ +import atexit +import contextlib +import dataclasses +import errno +import io +import operator +import os +import stat +import subprocess +import tempfile +import time +from abc import ABC +from contextlib import suppress +from typing import ( + List, + Iterable, + Dict, + Optional, + Tuple, + Union, + Iterator, + Mapping, + cast, + Any, + ContextManager, + TextIO, + BinaryIO, + NoReturn, + Type, + Generic, +) +from weakref import ref, ReferenceType + +from debputy.exceptions import ( + PureVirtualPathError, + DebputyFSIsROError, + DebputyMetadataAccessError, + TestPathWithNonExistentFSPathError, + SymlinkLoopError, +) +from debputy.intermediate_manifest import PathType +from debputy.manifest_parser.base_types import ( + ROOT_DEFINITION, + StaticFileSystemOwner, + StaticFileSystemGroup, +) +from debputy.plugin.api.spec import ( + VirtualPath, + PathDef, + PathMetadataReference, + PMT, +) +from debputy.types import VP +from debputy.util import ( + generated_content_dir, + _error, + escape_shell, + assume_not_none, + _normalize_path, +) + +BY_BASENAME = operator.attrgetter("name") + + +class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): + __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") + + def __init__( + self, + owning_plugin: str, + current_plugin: str, + metadata_type: Type[PMT], + ) -> None: + self._owning_plugin = owning_plugin + self._current_plugin = current_plugin + self._metadata_type = metadata_type + + @property + def is_present(self) -> bool: + return False + + @property + def can_read(self) -> bool: + return self._owning_plugin == self._current_plugin + + @property + def can_write(self) -> bool: + return False + + @property + def value(self) -> Optional[PMT]: + if self.can_read: + return None + raise DebputyMetadataAccessError( + f"Cannot read the metadata {self._metadata_type.__name__} owned by" + f" {self._owning_plugin} as the metadata has not been made" + f" readable to the plugin {self._current_plugin}." + ) + + @value.setter + def value(self, new_value: PMT) -> None: + if self._is_owner: + raise DebputyFSIsROError( + f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" + ) + raise DebputyMetadataAccessError( + f"Cannot set the metadata {self._metadata_type.__name__} owned by" + f" {self._owning_plugin} as the metadata has not been made" + f" read-write to the plugin {self._current_plugin}." + ) + + @property + def _is_owner(self) -> bool: + return self._owning_plugin == self._current_plugin + + +@dataclasses.dataclass(slots=True) +class PathMetadataValue(Generic[PMT]): + owning_plugin: str + metadata_type: Type[PMT] + value: Optional[PMT] = None + + def can_read_value(self, current_plugin: str) -> bool: + return self.owning_plugin == current_plugin + + def can_write_value(self, current_plugin: str) -> bool: + return self.owning_plugin == current_plugin + + +class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): + __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") + + def __init__( + self, + owning_path: VirtualPath, + current_plugin: str, + path_metadata_value: PathMetadataValue[PMT], + ) -> None: + self._owning_path = owning_path + self._current_plugin = current_plugin + self._path_metadata_value = path_metadata_value + + @property + def is_present(self) -> bool: + if not self.can_read: + return False + return self._path_metadata_value.value is not None + + @property + def can_read(self) -> bool: + return self._path_metadata_value.can_read_value(self._current_plugin) + + @property + def can_write(self) -> bool: + if not self._path_metadata_value.can_write_value(self._current_plugin): + return False + owning_path = self._owning_path + return owning_path.is_read_write and not owning_path.is_detached + + @property + def value(self) -> Optional[PMT]: + if self.can_read: + return self._path_metadata_value.value + raise DebputyMetadataAccessError( + f"Cannot read the metadata {self._metadata_type_name} owned by" + f" {self._owning_plugin} as the metadata has not been made" + f" readable to the plugin {self._current_plugin}." + ) + + @value.setter + def value(self, new_value: PMT) -> None: + if not self.can_write: + m = "set" if new_value is not None else "delete" + raise DebputyMetadataAccessError( + f"Cannot {m} the metadata {self._metadata_type_name} owned by" + f" {self._owning_plugin} as the metadata has not been made" + f" read-write to the plugin {self._current_plugin}." + ) + owning_path = self._owning_path + if not owning_path.is_read_write: + raise DebputyFSIsROError( + f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" + ) + if owning_path.is_detached: + raise TypeError( + f"Cannot set the metadata {self._metadata_type_name} as the path is detached" + ) + self._path_metadata_value.value = new_value + + @property + def _is_owner(self) -> bool: + return self._owning_plugin == self._current_plugin + + @property + def _owning_plugin(self) -> str: + return self._path_metadata_value.owning_plugin + + @property + def _metadata_type_name(self) -> str: + return self._path_metadata_value.metadata_type.__name__ + + +def _cp_a(source: str, dest: str) -> None: + cmd = ["cp", "-a", source, dest] + try: + subprocess.check_call(cmd) + except subprocess.CalledProcessError: + full_command = escape_shell(*cmd) + _error( + f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" + f" above to understand what went wrong. The full command was: {full_command}" + ) + + +def _split_path(path: str) -> Tuple[bool, bool, List[str]]: + must_be_dir = True if path.endswith("/") else False + absolute = False + if path.startswith("/"): + absolute = True + path = "." + path + path_parts = path.rstrip("/").split("/") + if must_be_dir: + path_parts.append(".") + return absolute, must_be_dir, path_parts + + +def _root(path: VP) -> VP: + current = path + while True: + parent = current.parent_dir + if parent is None: + return current + current = parent + + +def _check_fs_path_is_file( + fs_path: str, + unlink_on_error: Optional["FSPath"] = None, +) -> None: + had_issue = False + try: + # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect + st = os.lstat(fs_path) + except FileNotFoundError: + had_issue = True + else: + if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: + had_issue = True + if not had_issue: + return + + if unlink_on_error: + with suppress(FileNotFoundError): + os.unlink(fs_path) + raise TypeError( + "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" + " linked to another file. The entry has been disconnected." + ) + + +class CurrentPluginContextManager: + __slots__ = ("_plugin_names",) + + def __init__(self, initial_plugin_name: str) -> None: + self._plugin_names = [initial_plugin_name] + + @property + def current_plugin_name(self) -> str: + return self._plugin_names[-1] + + @contextlib.contextmanager + def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: + self._plugin_names.append(new_plugin_name) + yield new_plugin_name + self._plugin_names.pop() + + +class VirtualPathBase(VirtualPath, ABC): + __slots__ = () + + def _orphan_safe_path(self) -> str: + return self.path + + def _rw_check(self) -> None: + if not self.is_read_write: + raise DebputyFSIsROError( + f'Attempt to write to "{self._orphan_safe_path()}" failed:' + " Debputy Virtual File system is R/O." + ) + + def lookup(self, path: str) -> Optional["VirtualPathBase"]: + match, missing = self.attempt_lookup(path) + if missing: + return None + return match + + def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]: + if self.is_detached: + raise ValueError( + f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' + ) + absolute, must_be_dir, path_parts = _split_path(path) + current = _root(self) if absolute else self + path_parts.reverse() + link_expansions = set() + while path_parts: + dir_part = path_parts.pop() + if dir_part == ".": + continue + if dir_part == "..": + p = current.parent_dir + if p is None: + raise ValueError(f'The path "{path}" escapes the root dir') + current = p + continue + try: + current = current[dir_part] + except KeyError: + path_parts.append(dir_part) + path_parts.reverse() + if must_be_dir: + path_parts.pop() + return current, path_parts + if current.is_symlink and path_parts: + if current.path in link_expansions: + # This is our loop detection for now. It might have some false positives where you + # could safely resolve the same symlink twice. However, given that this use-case is + # basically none existent in practice for packaging, we just stop here for now. + raise SymlinkLoopError( + f'The path "{path}" traversed the symlink "{current.path}" multiple' + " times. Currently, traversing the same symlink twice is considered" + " a loop by `debputy` even if the path would eventually resolve." + " Consider filing a feature request if you have a benign case that" + " triggers this error." + ) + link_expansions.add(current.path) + link_target = current.readlink() + link_absolute, _, link_path_parts = _split_path(link_target) + if link_absolute: + current = _root(current) + else: + current = assume_not_none(current.parent_dir) + link_path_parts.reverse() + path_parts.extend(link_path_parts) + return current, [] + + def mkdirs(self, path: str) -> "VirtualPath": + current: VirtualPath + current, missing_parts = self.attempt_lookup( + f"{path}/" if not path.endswith("/") else path + ) + if not current.is_dir: + raise ValueError( + f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' + " a directory. However, that path exist AND is a not directory." + ) + for missing_part in missing_parts: + assert missing_part not in (".", "..") + current = current.mkdir(missing_part) + return current + + def prune_if_empty_dir(self) -> None: + """Remove this and all (now) empty parent directories + + Same as: `rmdir --ignore-fail-on-non-empty --parents` + + This operation may cause the path (and any of its parent directories) to become "detached" + and therefore unsafe to use in further operations. + """ + self._rw_check() + + if not self.is_dir: + raise TypeError(f"{self._orphan_safe_path()} is not a directory") + if any(self.iterdir): + return + parent_dir = assume_not_none(self.parent_dir) + + # Recursive does not matter; we already know the directory is empty. + self.unlink() + + # Note: The root dir must never be deleted. This works because when delegating it to the root + # directory, its implementation of this method is a no-op. If this is later rewritten to an + # inline loop (rather than recursion), be sure to preserve this feature. + parent_dir.prune_if_empty_dir() + + def _current_plugin(self) -> str: + if self.is_detached: + raise TypeError("Cannot resolve the current plugin; path is detached") + current = self + while True: + next_parent = current.parent_dir + if next_parent is None: + break + current = next_parent + assert current is not None + return cast("FSRootDir", current)._current_plugin() + + +class FSPath(VirtualPathBase, ABC): + __slots__ = ( + "_basename", + "_parent_dir", + "_children", + "_path_cache", + "_parent_path_cache", + "_last_known_parent_path", + "_mode", + "_owner", + "_group", + "_mtime", + "_stat_cache", + "_metadata", + "__weakref__", + ) + + def __init__( + self, + basename: str, + parent: Optional["FSPath"], + children: Optional[Dict[str, "FSPath"]] = None, + initial_mode: Optional[int] = None, + mtime: Optional[float] = None, + stat_cache: Optional[os.stat_result] = None, + ) -> None: + self._basename = basename + self._path_cache: Optional[str] = None + self._parent_path_cache: Optional[str] = None + self._children = children + self._last_known_parent_path: Optional[str] = None + self._mode = initial_mode + self._mtime = mtime + self._stat_cache = stat_cache + self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {} + self._owner = ROOT_DEFINITION + self._group = ROOT_DEFINITION + + # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls + # is_orphaned, which assumes self._parent_dir is an attribute. + self._parent_dir: Optional[ReferenceType["FSPath"]] = None + if parent is not None: + self.parent_dir = parent + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}({self._orphan_safe_path()!r}," + f" is_file={self.is_file}," + f" is_dir={self.is_dir}," + f" is_symlink={self.is_symlink}," + f" has_fs_path={self.has_fs_path}," + f" children_len={len(self._children) if self._children else 0})" + ) + + @property + def name(self) -> str: + return self._basename + + @name.setter + def name(self, new_name: str) -> None: + self._rw_check() + if new_name == self._basename: + return + if self.is_detached: + self._basename = new_name + return + self._rw_check() + parent = self.parent_dir + # This little parent_dir dance ensures the parent dir detects the rename properly + self.parent_dir = None + self._basename = new_name + self.parent_dir = parent + + @property + def iterdir(self) -> Iterable["FSPath"]: + if self._children is not None: + yield from self._children.values() + + def all_paths(self) -> Iterable["FSPath"]: + yield self + if not self.is_dir: + return + by_basename = BY_BASENAME + stack = sorted(self.iterdir, key=by_basename, reverse=True) + while stack: + current = stack.pop() + yield current + if current.is_dir and not current.is_detached: + stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) + + def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]: + # FIXME: can this be more "os.walk"-like without making it harder to implement? + if not self.is_dir: + yield self, [] + return + by_basename = BY_BASENAME + stack = [self] + while stack: + current = stack.pop() + children = sorted(current.iterdir, key=by_basename) + assert not children or current.is_dir + yield current, children + # Removing the directory counts as discarding the children. + if not current.is_detached: + stack.extend(reversed(children)) + + def _orphan_safe_path(self) -> str: + if not self.is_detached or self._last_known_parent_path is not None: + return self.path + return f"<orphaned>/{self.name}" + + @property + def is_detached(self) -> bool: + parent = self._parent_dir + if parent is None: + return True + resolved_parent = parent() + if resolved_parent is None: + return True + return resolved_parent.is_detached + + # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. + # However, that does not feel compatible, so lets force people to use .children instead for the Sequence + # behaviour to avoid surprises for now. + # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed + # to using it) + __iter__ = None + + def __getitem__(self, key) -> "FSPath": + if self._children is None: + raise KeyError( + f"{key} (note: {self._orphan_safe_path()!r} has no children)" + ) + if isinstance(key, FSPath): + key = key.name + return self._children[key] + + def __delitem__(self, key) -> None: + self._rw_check() + children = self._children + if children is None: + raise KeyError(key) + del children[key] + + def get(self, key: str) -> "Optional[FSPath]": + try: + return self[key] + except KeyError: + return None + + def __contains__(self, item: object) -> bool: + if isinstance(item, VirtualPath): + return item.parent_dir is self + if not isinstance(item, str): + return False + m = self.get(item) + return m is not None + + def _add_child(self, child: "FSPath") -> None: + self._rw_check() + if not self.is_dir: + raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") + if self._children is None: + self._children = {} + + conflict_child = self.get(child.name) + if conflict_child is not None: + conflict_child.unlink(recursive=True) + self._children[child.name] = child + + @property + def tar_path(self) -> str: + path = self.path + if self.is_dir: + return path + "/" + return path + + @property + def path(self) -> str: + parent_path = self.parent_dir_path + if ( + self._parent_path_cache is not None + and self._parent_path_cache == parent_path + ): + return assume_not_none(self._path_cache) + if parent_path is None: + raise ReferenceError( + f"The path {self.name} is detached! {self.__class__.__name__}" + ) + self._parent_path_cache = parent_path + ret = os.path.join(parent_path, self.name) + self._path_cache = ret + return ret + + @property + def parent_dir(self) -> Optional["FSPath"]: + p_ref = self._parent_dir + p = p_ref() if p_ref is not None else None + if p is None: + raise ReferenceError( + f"The path {self.name} is detached! {self.__class__.__name__}" + ) + return p + + @parent_dir.setter + def parent_dir(self, new_parent: Optional["FSPath"]) -> None: + self._rw_check() + if new_parent is not None: + if not new_parent.is_dir: + raise ValueError( + f"The parent {new_parent._orphan_safe_path()} must be a directory" + ) + new_parent._rw_check() + old_parent = None + self._last_known_parent_path = None + if not self.is_detached: + old_parent = self.parent_dir + old_parent_children = assume_not_none(assume_not_none(old_parent)._children) + del old_parent_children[self.name] + if new_parent is not None: + self._parent_dir = ref(new_parent) + new_parent._add_child(self) + else: + if old_parent is not None and not old_parent.is_detached: + self._last_known_parent_path = old_parent.path + self._parent_dir = None + self._parent_path_cache = None + + @property + def parent_dir_path(self) -> Optional[str]: + if self.is_detached: + return self._last_known_parent_path + return assume_not_none(self.parent_dir).path + + def chown( + self, + owner: Optional[StaticFileSystemOwner], + group: Optional[StaticFileSystemGroup], + ) -> None: + """Change the owner/group of this path + + :param owner: The desired owner definition for this path. If None, then no change of owner is performed. + :param group: The desired group definition for this path. If None, then no change of group is performed. + """ + self._rw_check() + + if owner is not None: + self._owner = owner.ownership_definition + if group is not None: + self._group = group.ownership_definition + + def stat(self) -> os.stat_result: + st = self._stat_cache + if st is None: + st = self._uncached_stat() + self._stat_cache = st + return st + + def _uncached_stat(self) -> os.stat_result: + return os.lstat(self.fs_path) + + @property + def mode(self) -> int: + current_mode = self._mode + if current_mode is None: + current_mode = stat.S_IMODE(self.stat().st_mode) + self._mode = current_mode + return current_mode + + @mode.setter + def mode(self, new_mode: int) -> None: + self._rw_check() + min_bit = 0o500 if self.is_dir else 0o400 + if (new_mode & min_bit) != min_bit: + omode = oct(new_mode)[2:] + omin = oct(min_bit)[2:] + raise ValueError( + f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' + f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." + " There are no paths that do not need these requirements met and they can cause" + " problems during build or on the final system." + ) + self._mode = new_mode + + @property + def mtime(self) -> float: + mtime = self._mtime + if mtime is None: + mtime = self.stat().st_mtime + self._mtime = mtime + return mtime + + @mtime.setter + def mtime(self, new_mtime: float) -> None: + self._rw_check() + self._mtime = new_mtime + + @property + def tar_owner_info(self) -> Tuple[str, int, str, int]: + owner = self._owner + group = self._group + return ( + owner.entity_name, + owner.entity_id, + group.entity_name, + group.entity_id, + ) + + @property + def _can_replace_inline(self) -> bool: + return False + + @contextlib.contextmanager + def add_file( + self, + name: str, + *, + unlink_if_exists: bool = True, + use_fs_path_mode: bool = False, + mode: int = 0o0644, + mtime: Optional[float] = None, + # Special-case parameters that are not exposed in the API + fs_basename_matters: bool = False, + subdir_key: Optional[str] = None, + ) -> Iterator["FSPath"]: + if "/" in name or name in {".", ".."}: + raise ValueError(f'Invalid file name: "{name}"') + if not self.is_dir: + raise TypeError( + f"Cannot create {self._orphan_safe_path()}/{name}:" + f" {self._orphan_safe_path()} is not a directory" + ) + self._rw_check() + existing = self.get(name) + if existing is not None: + if not unlink_if_exists: + raise ValueError( + f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' + f" and exist_ok was False" + ) + existing.unlink(recursive=False) + + if fs_basename_matters and subdir_key is None: + raise ValueError( + "When fs_basename_matters is True, a subdir_key must be provided" + ) + + directory = generated_content_dir(subdir_key=subdir_key) + + if fs_basename_matters: + fs_path = os.path.join(directory, name) + with open(fs_path, "xb") as _: + # Ensure that the fs_path exists + pass + child = FSBackedFilePath( + name, + self, + fs_path, + replaceable_inline=True, + mtime=mtime, + ) + yield child + else: + with tempfile.NamedTemporaryFile( + dir=directory, suffix=f"__{name}", delete=False + ) as fd: + fs_path = fd.name + child = FSBackedFilePath( + name, + self, + fs_path, + replaceable_inline=True, + mtime=mtime, + ) + fd.close() + yield child + + if use_fs_path_mode: + # Ensure the caller can see the current mode + os.chmod(fs_path, mode) + _check_fs_path_is_file(fs_path, unlink_on_error=child) + child._reset_caches() + if not use_fs_path_mode: + child.mode = mode + + def insert_file_from_fs_path( + self, + name: str, + fs_path: str, + *, + exist_ok: bool = True, + use_fs_path_mode: bool = False, + mode: int = 0o0644, + require_copy_on_write: bool = True, + follow_symlinks: bool = True, + reference_path: Optional[VirtualPath] = None, + ) -> "FSPath": + if "/" in name or name in {".", ".."}: + raise ValueError(f'Invalid file name: "{name}"') + if not self.is_dir: + raise TypeError( + f"Cannot create {self._orphan_safe_path()}/{name}:" + f" {self._orphan_safe_path()} is not a directory" + ) + self._rw_check() + if name in self and not exist_ok: + raise ValueError( + f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' + f" and exist_ok was False" + ) + new_fs_path = fs_path + if follow_symlinks: + if reference_path is not None: + raise ValueError( + "The reference_path cannot be used with follow_symlinks" + ) + new_fs_path = os.path.realpath(new_fs_path, strict=True) + + fmode: Optional[int] = mode + if use_fs_path_mode: + fmode = None + + st = None + if reference_path is None: + st = os.lstat(new_fs_path) + if stat.S_ISDIR(st.st_mode): + raise ValueError( + f'The provided path "{fs_path}" is a directory. However, this' + " method does not support directories" + ) + + if not stat.S_ISREG(st.st_mode): + if follow_symlinks: + raise ValueError( + f"The resolved fs_path ({new_fs_path}) was not a file." + ) + raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") + return FSBackedFilePath( + name, + self, + new_fs_path, + initial_mode=fmode, + stat_cache=st, + replaceable_inline=not require_copy_on_write, + reference_path=reference_path, + ) + + def add_symlink( + self, + link_name: str, + link_target: str, + *, + reference_path: Optional[VirtualPath] = None, + ) -> "FSPath": + if "/" in link_name or link_name in {".", ".."}: + raise ValueError( + f'Invalid file name: "{link_name}" (it must be a valid basename)' + ) + if not self.is_dir: + raise TypeError( + f"Cannot create {self._orphan_safe_path()}/{link_name}:" + f" {self._orphan_safe_path()} is not a directory" + ) + self._rw_check() + + existing = self.get(link_name) + if existing: + # Emulate ln -sf with attempts a non-recursive unlink first. + existing.unlink(recursive=False) + + return SymlinkVirtualPath( + link_name, + self, + link_target, + reference_path=reference_path, + ) + + def mkdir( + self, + name: str, + *, + reference_path: Optional[VirtualPath] = None, + ) -> "FSPath": + if "/" in name or name in {".", ".."}: + raise ValueError( + f'Invalid file name: "{name}" (it must be a valid basename)' + ) + if not self.is_dir: + raise TypeError( + f"Cannot create {self._orphan_safe_path()}/{name}:" + f" {self._orphan_safe_path()} is not a directory" + ) + if reference_path is not None and not reference_path.is_dir: + raise ValueError( + f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' + ) + self._rw_check() + + existing = self.get(name) + if existing: + raise ValueError(f"Path {existing.path} already exist") + return VirtualDirectoryFSPath(name, self, reference_path=reference_path) + + def mkdirs(self, path: str) -> "FSPath": + return cast("FSPath", super().mkdirs(path)) + + @property + def is_read_write(self) -> bool: + """When true, the file system entry may be mutated + + :return: Whether file system mutations are permitted. + """ + if self.is_detached: + return True + return assume_not_none(self.parent_dir).is_read_write + + def unlink(self, *, recursive: bool = False) -> None: + """Unlink a file or a directory + + This operation will detach the path from the file system (causing "is_detached" to return True). + + Note that the root directory cannot be deleted. + + :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them + as well. When False, an error is raised if the path is a non-empty directory + """ + if self.is_detached: + return + if not recursive and any(self.iterdir): + raise ValueError( + f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' + ) + # The .parent_dir setter does a _rw_check() for us. + self.parent_dir = None + + def _reset_caches(self) -> None: + self._mtime = None + self._stat_cache = None + + def metadata( + self, + metadata_type: Type[PMT], + *, + owning_plugin: Optional[str] = None, + ) -> PathMetadataReference[PMT]: + current_plugin = self._current_plugin() + if owning_plugin is None: + owning_plugin = current_plugin + metadata_key = (owning_plugin, metadata_type) + metadata_value = self._metadata.get(metadata_key) + if metadata_value is None: + if self.is_detached: + raise TypeError( + f"Cannot access the metadata {metadata_type.__name__}: The path is detached." + ) + if not self.is_read_write: + return AlwaysEmptyReadOnlyMetadataReference( + owning_plugin, + current_plugin, + metadata_type, + ) + metadata_value = PathMetadataValue(owning_plugin, metadata_type) + self._metadata[metadata_key] = metadata_value + return PathMetadataReferenceImplementation( + self, + current_plugin, + metadata_value, + ) + + @contextlib.contextmanager + def replace_fs_path_content( + self, + *, + use_fs_path_mode: bool = False, + ) -> Iterator[str]: + if not self.is_file: + raise TypeError( + f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' + ) + self._rw_check() + fs_path = self.fs_path + if not self._can_replace_inline: + fs_path = self.fs_path + directory = generated_content_dir() + with tempfile.NamedTemporaryFile( + dir=directory, suffix=f"__{self.name}", delete=False + ) as new_path_fd: + new_path_fd.close() + _cp_a(fs_path, new_path_fd.name) + fs_path = new_path_fd.name + self._replaced_path(fs_path) + assert self.fs_path == fs_path + + current_mtime = self._mtime + if current_mtime is not None: + os.utime(fs_path, (current_mtime, current_mtime)) + + current_mode = self.mode + yield fs_path + _check_fs_path_is_file(fs_path, unlink_on_error=self) + if not use_fs_path_mode: + os.chmod(fs_path, current_mode) + self._reset_caches() + + def _replaced_path(self, new_fs_path: str) -> None: + raise NotImplementedError + + +class VirtualFSPathBase(FSPath, ABC): + __slots__ = () + + def __init__( + self, + basename: str, + parent: Optional["FSPath"], + children: Optional[Dict[str, "FSPath"]] = None, + initial_mode: Optional[int] = None, + mtime: Optional[float] = None, + stat_cache: Optional[os.stat_result] = None, + ) -> None: + super().__init__( + basename, + parent, + children, + initial_mode=initial_mode, + mtime=mtime, + stat_cache=stat_cache, + ) + + @property + def mtime(self) -> float: + mtime = self._mtime + if mtime is None: + mtime = time.time() + self._mtime = mtime + return mtime + + @property + def has_fs_path(self) -> bool: + return False + + def stat(self) -> os.stat_result: + if not self.has_fs_path: + raise PureVirtualPathError( + "stat() is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + return super().stat() + + @property + def fs_path(self) -> str: + if not self.has_fs_path: + raise PureVirtualPathError( + "fs_path is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + return self.fs_path + + +class FSRootDir(FSPath): + __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") + + def __init__(self, fs_path: Optional[str] = None) -> None: + self._fs_path = fs_path + self._fs_read_write = True + super().__init__( + ".", + None, + children={}, + initial_mode=0o755, + ) + self._plugin_context = CurrentPluginContextManager("debputy") + + @property + def is_detached(self) -> bool: + return False + + def _orphan_safe_path(self) -> str: + return self.name + + @property + def path(self) -> str: + return self.name + + @property + def parent_dir(self) -> Optional["FSPath"]: + return None + + @parent_dir.setter + def parent_dir(self, new_parent: Optional[FSPath]) -> None: + if new_parent is not None: + raise ValueError("The root directory cannot become a non-root directory") + + @property + def parent_dir_path(self) -> Optional[str]: + return None + + @property + def is_dir(self) -> bool: + return True + + @property + def is_file(self) -> bool: + return False + + @property + def is_symlink(self) -> bool: + return False + + def readlink(self) -> str: + raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') + + @property + def has_fs_path(self) -> bool: + return self._fs_path is not None + + def stat(self) -> os.stat_result: + if not self.has_fs_path: + raise PureVirtualPathError( + "stat() is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + return os.stat(self.fs_path) + + @property + def fs_path(self) -> str: + if not self.has_fs_path: + raise PureVirtualPathError( + "fs_path is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + return assume_not_none(self._fs_path) + + @property + def is_read_write(self) -> bool: + return self._fs_read_write + + @is_read_write.setter + def is_read_write(self, new_value: bool) -> None: + self._fs_read_write = new_value + + def prune_if_empty_dir(self) -> None: + # No-op for the root directory. There is never a case where you want to delete this directory + # (and even if you could, debputy will need it for technical reasons, so the root dir stays) + return + + def unlink(self, *, recursive: bool = False) -> None: + # There is never a case where you want to delete this directory (and even if you could, + # debputy will need it for technical reasons, so the root dir stays) + raise TypeError("Cannot delete the root directory") + + def _current_plugin(self) -> str: + return self._plugin_context.current_plugin_name + + @contextlib.contextmanager + def change_plugin_context(self, new_plugin: str) -> Iterator[str]: + with self._plugin_context.change_plugin_context(new_plugin) as r: + yield r + + +class VirtualPathWithReference(VirtualFSPathBase, ABC): + __slots__ = ("_reference_path",) + + def __init__( + self, + basename: str, + parent: FSPath, + *, + default_mode: int, + reference_path: Optional[VirtualPath] = None, + ) -> None: + super().__init__( + basename, + parent=parent, + initial_mode=reference_path.mode if reference_path else default_mode, + ) + self._reference_path = reference_path + + @property + def has_fs_path(self) -> bool: + ref_path = self._reference_path + return ref_path is not None and ref_path.has_fs_path + + @property + def mtime(self) -> float: + mtime = self._mtime + if mtime is None: + ref_path = self._reference_path + if ref_path: + mtime = ref_path.mtime + else: + mtime = super().mtime + self._mtime = mtime + return mtime + + @mtime.setter + def mtime(self, new_mtime: float) -> None: + self._rw_check() + self._mtime = new_mtime + + @property + def fs_path(self) -> str: + ref_path = self._reference_path + if ref_path is not None and ( + not super().has_fs_path or super().fs_path == ref_path.fs_path + ): + return ref_path.fs_path + return super().fs_path + + def stat(self) -> os.stat_result: + ref_path = self._reference_path + if ref_path is not None and ( + not super().has_fs_path or super().fs_path == ref_path.fs_path + ): + return ref_path.stat() + return super().stat() + + def open( + self, + *, + byte_io: bool = False, + buffering: int = -1, + ) -> Union[TextIO, BinaryIO]: + reference_path = self._reference_path + if reference_path is not None and reference_path.fs_path == self.fs_path: + return reference_path.open(byte_io=byte_io, buffering=buffering) + return super().open(byte_io=byte_io, buffering=buffering) + + +class VirtualDirectoryFSPath(VirtualPathWithReference): + __slots__ = ("_reference_path",) + + def __init__( + self, + basename: str, + parent: FSPath, + *, + reference_path: Optional[VirtualPath] = None, + ) -> None: + super().__init__( + basename, + parent, + reference_path=reference_path, + default_mode=0o755, + ) + self._reference_path = reference_path + assert reference_path is None or reference_path.is_dir + + @property + def is_dir(self) -> bool: + return True + + @property + def is_file(self) -> bool: + return False + + @property + def is_symlink(self) -> bool: + return False + + def readlink(self) -> str: + raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') + + +class SymlinkVirtualPath(VirtualPathWithReference): + __slots__ = ("_link_target",) + + def __init__( + self, + basename: str, + parent_dir: FSPath, + link_target: str, + *, + reference_path: Optional[VirtualPath] = None, + ) -> None: + super().__init__( + basename, + parent=parent_dir, + default_mode=_SYMLINK_MODE, + reference_path=reference_path, + ) + self._link_target = link_target + + @property + def is_dir(self) -> bool: + return False + + @property + def is_file(self) -> bool: + return False + + @property + def is_symlink(self) -> bool: + return True + + def readlink(self) -> str: + return self._link_target + + +class FSBackedFilePath(VirtualPathWithReference): + __slots__ = ("_fs_path", "_replaceable_inline") + + def __init__( + self, + basename: str, + parent_dir: FSPath, + fs_path: str, + *, + replaceable_inline: bool = False, + initial_mode: Optional[int] = None, + mtime: Optional[float] = None, + stat_cache: Optional[os.stat_result] = None, + reference_path: Optional[VirtualPath] = None, + ) -> None: + super().__init__( + basename, + parent_dir, + default_mode=0o644, + reference_path=reference_path, + ) + self._fs_path = fs_path + self._replaceable_inline = replaceable_inline + if initial_mode is not None: + self.mode = initial_mode + if mtime is not None: + self._mtime = mtime + self._stat_cache = stat_cache + assert ( + not replaceable_inline or "debputy/scratch-dir/" in fs_path + ), f"{fs_path} should not be inline-replaceable -- {self.path}" + + @property + def is_dir(self) -> bool: + return False + + @property + def is_file(self) -> bool: + return True + + @property + def is_symlink(self) -> bool: + return False + + def readlink(self) -> str: + raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') + + @property + def has_fs_path(self) -> bool: + return True + + @property + def fs_path(self) -> str: + return self._fs_path + + @property + def _can_replace_inline(self) -> bool: + return self._replaceable_inline + + def _replaced_path(self, new_fs_path: str) -> None: + self._fs_path = new_fs_path + self._reference_path = None + self._replaceable_inline = True + + +_SYMLINK_MODE = 0o777 + + +class VirtualTestPath(FSPath): + __slots__ = ( + "_path_type", + "_has_fs_path", + "_fs_path", + "_link_target", + "_content", + "_materialized_content", + ) + + def __init__( + self, + basename: str, + parent_dir: Optional[FSPath], + mode: Optional[int] = None, + mtime: Optional[float] = None, + is_dir: bool = False, + has_fs_path: Optional[bool] = False, + fs_path: Optional[str] = None, + link_target: Optional[str] = None, + content: Optional[str] = None, + materialized_content: Optional[str] = None, + ) -> None: + if is_dir: + self._path_type = PathType.DIRECTORY + elif link_target is not None: + self._path_type = PathType.SYMLINK + if mode is not None and mode != _SYMLINK_MODE: + raise ValueError( + f'Please do not assign a mode to symlinks. Triggered for "{basename}".' + ) + assert mode is None or mode == _SYMLINK_MODE + else: + self._path_type = PathType.FILE + + if mode is not None: + initial_mode = mode + else: + initial_mode = 0o755 if is_dir else 0o644 + + self._link_target = link_target + if has_fs_path is None: + has_fs_path = bool(fs_path) + self._has_fs_path = has_fs_path + self._fs_path = fs_path + self._materialized_content = materialized_content + super().__init__( + basename, + parent=parent_dir, + initial_mode=initial_mode, + mtime=mtime, + ) + self._content = content + + @property + def is_dir(self) -> bool: + return self._path_type == PathType.DIRECTORY + + @property + def is_file(self) -> bool: + return self._path_type == PathType.FILE + + @property + def is_symlink(self) -> bool: + return self._path_type == PathType.SYMLINK + + def readlink(self) -> str: + if not self.is_symlink: + raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") + link_target = self._link_target + assert link_target is not None + return link_target + + @property + def mtime(self) -> float: + if self._mtime is None: + self._mtime = time.time() + return self._mtime + + @mtime.setter + def mtime(self, new_mtime: float) -> None: + self._rw_check() + self._mtime = new_mtime + + @property + def has_fs_path(self) -> bool: + return self._has_fs_path + + def stat(self) -> os.stat_result: + if self.has_fs_path: + path = self.fs_path + if path is None: + raise PureVirtualPathError( + f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" + " cannot provide!" + ) + try: + return os.stat(path) + except FileNotFoundError as e: + raise PureVirtualPathError( + f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" + " cannot provide! (An fs_path was provided, but it did not exist)" + ) from e + + raise PureVirtualPathError( + "stat() is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + + @property + def size(self) -> int: + if self._content is not None: + return len(self._content.encode("utf-8")) + if not self.has_fs_path or self.fs_path is None: + return 0 + return self.stat().st_size + + @property + def fs_path(self) -> str: + if self.has_fs_path: + if self._fs_path is None and self._materialized_content is not None: + with tempfile.NamedTemporaryFile( + mode="w+t", + encoding="utf-8", + suffix=f"__{self.name}", + delete=False, + ) as fd: + filepath = fd.name + fd.write(self._materialized_content) + self._fs_path = filepath + atexit.register(lambda: os.unlink(filepath)) + + path = self._fs_path + if path is None: + raise PureVirtualPathError( + f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " + " mock path cannot provide!" + ) + return path + raise PureVirtualPathError( + "fs_path is only applicable to paths backed by the file system. The path" + f" {self._orphan_safe_path()!r} is purely virtual" + ) + + def replace_fs_path_content( + self, + *, + use_fs_path_mode: bool = False, + ) -> ContextManager[str]: + if self._content is not None: + raise TypeError( + f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" + " created with `content` but for this method to work, the path should have been" + " created with `materialized_content`" + ) + return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) + + def open( + self, + *, + byte_io: bool = False, + buffering: int = -1, + ) -> Union[TextIO, BinaryIO]: + if self._content is None: + try: + return super().open(byte_io=byte_io, buffering=buffering) + except FileNotFoundError as e: + raise TestPathWithNonExistentFSPathError( + "The test path {self.path} had an fs_path {self._fs_path}, which does not" + " exist. This exception can only occur in the testsuite. Either have the" + " test provide content for the path (`virtual_path_def(..., content=...) or," + " if that is too painful in general, have the code accept this error as a " + " test only-case and provide a default." + ) from e + + if byte_io: + return io.BytesIO(self._content.encode("utf-8")) + return io.StringIO(self._content) + + def _replaced_path(self, new_fs_path: str) -> None: + self._fs_path = new_fs_path + + +class FSROOverlay(VirtualPathBase): + __slots__ = ( + "_path", + "_fs_path", + "_parent", + "_stat_cache", + "_readlink_cache", + "_children", + "_stat_failed_cache", + "__weakref__", + ) + + def __init__( + self, + path: str, + fs_path: str, + parent: Optional["FSROOverlay"], + ) -> None: + self._path: str = path + self._fs_path: str = _normalize_path(fs_path, with_prefix=False) + self._parent: Optional[ReferenceType[FSROOverlay]] = ( + ref(parent) if parent is not None else None + ) + self._stat_cache: Optional[os.stat_result] = None + self._readlink_cache: Optional[str] = None + self._stat_failed_cache = False + self._children: Optional[Mapping[str, FSROOverlay]] = None + + @classmethod + def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": + return FSROOverlay(path, fs_path, None) + + @property + def name(self) -> str: + return os.path.basename(self._path) + + @property + def iterdir(self) -> Iterable["FSROOverlay"]: + if not self.is_dir: + return + if self._children is None: + self._ensure_children_are_resolved() + yield from assume_not_none(self._children).values() + + def lookup(self, path: str) -> Optional["FSROOverlay"]: + if not self.is_dir: + return None + if self._children is None: + self._ensure_children_are_resolved() + + absolute, _, path_parts = _split_path(path) + current = cast("FSROOverlay", _root(self)) if absolute else self + for no, dir_part in enumerate(path_parts): + if dir_part == ".": + continue + if dir_part == "..": + p = current.parent_dir + if current is None: + raise ValueError(f'The path "{path}" escapes the root dir') + current = p + continue + try: + current = current[dir_part] + except KeyError: + return None + return current + + def all_paths(self) -> Iterable["FSROOverlay"]: + yield self + if not self.is_dir: + return + stack = list(self.iterdir) + stack.reverse() + while stack: + current = stack.pop() + yield current + if current.is_dir: + if current._children is None: + current._ensure_children_are_resolved() + stack.extend(reversed(current._children.values())) + + def _ensure_children_are_resolved(self) -> None: + if not self.is_dir or self._children: + return + dir_path = self.path + dir_fs_path = self.fs_path + children = {} + for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): + child_path = os.path.join(dir_path, name) if dir_path != "." else name + child_fs_path = ( + os.path.join(dir_fs_path, name) if dir_fs_path != "." else name + ) + children[name] = FSROOverlay( + child_path, + child_fs_path, + self, + ) + self._children = children + + @property + def is_detached(self) -> bool: + return False + + def __getitem__(self, key) -> "VirtualPath": + if not self.is_dir: + raise KeyError(key) + if self._children is None: + self._ensure_children_are_resolved() + if isinstance(key, FSPath): + key = key.name + return self._children[key] + + def __delitem__(self, key) -> None: + self._error_ro_fs() + + @property + def is_read_write(self) -> bool: + return False + + def _rw_check(self) -> None: + self._error_ro_fs() + + def _error_ro_fs(self) -> NoReturn: + raise DebputyFSIsROError( + f'Attempt to write to "{self.path}" failed:' + " Debputy Virtual File system is R/O." + ) + + @property + def path(self) -> str: + return self._path + + @property + def parent_dir(self) -> Optional["FSROOverlay"]: + parent = self._parent + if parent is None: + return None + resolved = parent() + if resolved is None: + raise RuntimeError("Parent was garbage collected!") + return resolved + + def stat(self) -> os.stat_result: + if self._stat_failed_cache: + raise FileNotFoundError( + errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path + ) + + if self._stat_cache is None: + try: + self._stat_cache = os.lstat(self.fs_path) + except FileNotFoundError: + self._stat_failed_cache = True + raise + return self._stat_cache + + @property + def mode(self) -> int: + return stat.S_IMODE(self.stat().st_mode) + + @mode.setter + def mode(self, _unused: int) -> None: + self._error_ro_fs() + + @property + def mtime(self) -> float: + return self.stat().st_mtime + + @mtime.setter + def mtime(self, new_mtime: float) -> None: + self._error_ro_fs() + + def readlink(self) -> str: + if not self.is_symlink: + raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") + if self._readlink_cache is None: + self._readlink_cache = os.readlink(self.fs_path) + return self._readlink_cache + + @property + def fs_path(self) -> str: + return self._fs_path + + @property + def is_dir(self) -> bool: + # The root path can have a non-existent fs_path (such as d/tmp not always existing) + try: + return stat.S_ISDIR(self.stat().st_mode) + except FileNotFoundError: + return False + + @property + def is_file(self) -> bool: + # The root path can have a non-existent fs_path (such as d/tmp not always existing) + try: + return stat.S_ISREG(self.stat().st_mode) + except FileNotFoundError: + return False + + @property + def is_symlink(self) -> bool: + # The root path can have a non-existent fs_path (such as d/tmp not always existing) + try: + return stat.S_ISLNK(self.stat().st_mode) + except FileNotFoundError: + return False + + @property + def has_fs_path(self) -> bool: + return True + + def open( + self, + *, + byte_io: bool = False, + buffering: int = -1, + ) -> Union[TextIO, BinaryIO]: + # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this + # case. + if not self.is_file and not self.is_symlink: + raise TypeError( + f"Cannot open {self.path} for reading: It is not a file nor a symlink" + ) + + if byte_io: + return open(self.fs_path, "rb", buffering=buffering) + return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering) + + def chown( + self, + owner: Optional[StaticFileSystemOwner], + group: Optional[StaticFileSystemGroup], + ) -> None: + self._error_ro_fs() + + def mkdir(self, name: str) -> "VirtualPath": + self._error_ro_fs() + + def add_file( + self, + name: str, + *, + unlink_if_exists: bool = True, + use_fs_path_mode: bool = False, + mode: int = 0o0644, + mtime: Optional[float] = None, + ) -> ContextManager["VirtualPath"]: + self._error_ro_fs() + + def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": + self._error_ro_fs() + + def unlink(self, *, recursive: bool = False) -> None: + self._error_ro_fs() + + def metadata( + self, + metadata_type: Type[PMT], + *, + owning_plugin: Optional[str] = None, + ) -> PathMetadataReference[PMT]: + current_plugin = self._current_plugin() + if owning_plugin is None: + owning_plugin = current_plugin + return AlwaysEmptyReadOnlyMetadataReference( + owning_plugin, + current_plugin, + metadata_type, + ) + + +class FSROOverlayRootDir(FSROOverlay): + __slots__ = ("_plugin_context",) + + def __init__(self, path: str, fs_path: str) -> None: + super().__init__(path, fs_path, None) + self._plugin_context = CurrentPluginContextManager("debputy") + + def _current_plugin(self) -> str: + return self._plugin_context.current_plugin_name + + @contextlib.contextmanager + def change_plugin_context(self, new_plugin: str) -> Iterator[str]: + with self._plugin_context.change_plugin_context(new_plugin) as r: + yield r + + +def as_path_def(pd: Union[str, PathDef]) -> PathDef: + return PathDef(pd) if isinstance(pd, str) else pd + + +def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]: + yield from (as_path_def(p) for p in paths) + + +def build_virtual_fs( + paths: Iterable[Union[str, PathDef]], + read_write_fs: bool = False, +) -> "FSPath": + root_dir: Optional[FSRootDir] = None + directories: Dict[str, FSPath] = {} + non_directories = set() + + def _ensure_parent_dirs(p: str) -> None: + current = p.rstrip("/") + missing_dirs = [] + while True: + current = os.path.dirname(current) + if current in directories: + break + if current in non_directories: + raise ValueError( + f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' + ' but it is defined as a non-directory. (Ensure dirs end with "/")' + ) + missing_dirs.append(current) + for dir_path in reversed(missing_dirs): + parent_dir = directories[os.path.dirname(dir_path)] + d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) + directories[dir_path] = d + + for path_def in as_path_defs(paths): + path = path_def.path_name + if path in directories or path in non_directories: + raise ValueError( + f'Duplicate definition of "{path}". Can be false positive if input is not in' + ' "correct order" (ensure directories occur before their children)' + ) + if root_dir is None: + root_fs_path = None + if path in (".", "./", "/"): + root_fs_path = path_def.fs_path + root_dir = FSRootDir(fs_path=root_fs_path) + directories["."] = root_dir + + if path not in (".", "./", "/") and not path.startswith("./"): + path = "./" + path + if path not in (".", "./", "/"): + _ensure_parent_dirs(path) + if path in (".", "./"): + assert "." in directories + continue + is_dir = False + if path.endswith("/"): + path = path[:-1] + is_dir = True + directory = directories[os.path.dirname(path)] + assert not is_dir or not bool( + path_def.link_target + ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" + fs_path = VirtualTestPath( + os.path.basename(path), + directory, + is_dir=is_dir, + mode=path_def.mode, + mtime=path_def.mtime, + has_fs_path=path_def.has_fs_path, + fs_path=path_def.fs_path, + link_target=path_def.link_target, + content=path_def.content, + materialized_content=path_def.materialized_content, + ) + assert not fs_path.is_detached + if fs_path.is_dir: + directories[fs_path.path] = fs_path + else: + non_directories.add(fs_path.path) + + if root_dir is None: + root_dir = FSRootDir() + + root_dir.is_read_write = read_write_fs + return root_dir |