diff options
Diffstat (limited to 'src/debputy/intermediate_manifest.py')
-rw-r--r-- | src/debputy/intermediate_manifest.py | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/src/debputy/intermediate_manifest.py b/src/debputy/intermediate_manifest.py new file mode 100644 index 0000000..7d8dd63 --- /dev/null +++ b/src/debputy/intermediate_manifest.py @@ -0,0 +1,333 @@ +import dataclasses +import json +import os +import stat +import sys +import tarfile +from enum import Enum + + +from typing import Optional, List, Dict, Any, Iterable, Union, Self, Mapping, IO + +IntermediateManifest = List["TarMember"] + + +class PathType(Enum): + FILE = ("file", tarfile.REGTYPE) + DIRECTORY = ("directory", tarfile.DIRTYPE) + SYMLINK = ("symlink", tarfile.SYMTYPE) + # TODO: Add hardlink, FIFO, Char device, BLK device, etc. + + @property + def manifest_key(self) -> str: + return self.value[0] + + @property + def tarinfo_type(self) -> bytes: + return self.value[1] + + @property + def can_be_virtual(self) -> bool: + return self in (PathType.DIRECTORY, PathType.SYMLINK) + + +KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType} + + +def _dirname(path: str) -> str: + path = path.rstrip("/") + if path == ".": + return path + return os.path.dirname(path) + + +def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType: + if stat.S_ISREG(st_mode): + path_type = PathType.FILE + elif stat.S_ISDIR(st_mode): + path_type = PathType.DIRECTORY + # elif stat.S_ISFIFO(st_result): + # type = FIFOTYPE + elif stat.S_ISLNK(st_mode): + raise ValueError( + "Symlinks should have been rewritten to use the virtual rule." + " Otherwise, the link would not be normalized according to Debian Policy." + ) + # elif stat.S_ISCHR(st_result): + # type = CHRTYPE + # elif stat.S_ISBLK(st_result): + # type = BLKTYPE + else: + raise ValueError( + f"The path {fs_path} had an unsupported/unknown file type." + f" Probably a bug in the tool" + ) + return path_type + + +@dataclasses.dataclass(slots=True) +class TarMember: + member_path: str + path_type: PathType + fs_path: Optional[str] + mode: int + owner: str + uid: int + group: str + gid: int + mtime: float + link_target: str = "" + is_virtual_entry: bool = False + may_steal_fs_path: bool = False + + def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo: + tar_info: tarfile.TarInfo + if self.is_virtual_entry: + assert self.path_type.can_be_virtual + tar_info = tar_fd.tarinfo(self.member_path) + tar_info.size = 0 + tar_info.type = self.path_type.tarinfo_type + tar_info.linkpath = self.link_target + else: + try: + tar_info = tar_fd.gettarinfo( + name=self.fs_path, arcname=self.member_path + ) + except (TypeError, ValueError) as e: + raise ValueError( + f"Unable to prepare tar info for {self.member_path}" + ) from e + # TODO: Eventually, we should be able to unconditionally rely on link_target. However, + # until we got symlinks and hardlinks correctly done in the JSON generator, it will be + # conditional for now. + if self.link_target != "": + tar_info.linkpath = self.link_target + tar_info.mode = self.mode + tar_info.uname = self.owner + tar_info.uid = self.uid + tar_info.gname = self.group + tar_info.gid = self.gid + tar_info.mode = self.mode + tar_info.mtime = int(self.mtime) + + return tar_info + + @classmethod + def from_file( + cls, + member_path: str, + fs_path: str, + mode: Optional[int] = None, + owner: str = "root", + uid: int = 0, + group: str = "root", + gid: int = 0, + path_mtime: Optional[Union[float, int]] = None, + clamp_mtime_to: Optional[int] = None, + path_type: Optional[PathType] = None, + may_steal_fs_path: bool = False, + ) -> "TarMember": + # Avoid lstat'ing if we can as it makes it easier to do tests of the code + # (as we do not need an existing physical fs path) + if path_type is None or path_mtime is None or mode is None: + st_result = os.lstat(fs_path) + st_mode = st_result.st_mode + if mode is None: + mode = st_mode + if path_mtime is None: + path_mtime = st_result.st_mtime + if path_type is None: + path_type = _fs_type_from_st_mode(fs_path, st_mode) + + if clamp_mtime_to is not None and path_mtime > clamp_mtime_to: + path_mtime = clamp_mtime_to + + if may_steal_fs_path: + assert ( + "debputy/scratch-dir/" in fs_path + ), f"{fs_path} should not have been stealable" + + return cls( + member_path=member_path, + path_type=path_type, + fs_path=fs_path, + mode=mode, + owner=owner, + uid=uid, + group=group, + gid=gid, + mtime=float(path_mtime), + is_virtual_entry=False, + may_steal_fs_path=may_steal_fs_path, + ) + + @classmethod + def virtual_path( + cls, + member_path: str, + path_type: PathType, + mtime: float, + mode: int, + link_target: str = "", + owner: str = "root", + uid: int = 0, + group: str = "root", + gid: int = 0, + ) -> Self: + if not path_type.can_be_virtual: + raise ValueError(f"The path type {path_type.name} cannot be virtual") + if (path_type == PathType.SYMLINK) ^ bool(link_target): + if not link_target: + raise ValueError("Symlinks must have a link target") + # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this + # code then! + raise ValueError("Non-symlinks must not have a link target") + return cls( + member_path=member_path, + path_type=path_type, + fs_path=None, + link_target=link_target, + mode=mode, + owner=owner, + uid=uid, + group=group, + gid=gid, + mtime=mtime, + is_virtual_entry=True, + ) + + def clone_and_replace(self, /, **changes: Any) -> "TarMember": + return dataclasses.replace(self, **changes) + + def to_manifest(self) -> Dict[str, Any]: + d = dataclasses.asdict(self) + try: + d["mode"] = oct(self.mode) + except (TypeError, ValueError) as e: + raise TypeError(f"Bad mode in TarMember {self.member_path}") from e + d["path_type"] = self.path_type.manifest_key + # "compress" the output by removing redundant fields + if self.link_target is None or self.link_target == "": + del d["link_target"] + if self.is_virtual_entry: + assert self.fs_path is None + del d["fs_path"] + else: + del d["is_virtual_entry"] + return d + + @classmethod + def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest: + directories = {"."} + if manifest_path == "-": + with sys.stdin as fd: + data = json.load(fd) + contents = [TarMember.from_dict(m) for m in data] + else: + with open(manifest_path) as fd: + data = json.load(fd) + contents = [TarMember.from_dict(m) for m in data] + if not contents: + raise ValueError( + "Empty manifest (note that the root directory should always be present" + ) + if contents[0].member_path != "./": + raise ValueError('The first member must always be the root directory "./"') + for tar_member in contents: + directory = _dirname(tar_member.member_path) + if directory not in directories: + raise ValueError( + f'The path "{tar_member.member_path}" came before the directory it is in (or the path' + f" is not a directory). Either way leads to a broken deb." + ) + if tar_member.path_type == PathType.DIRECTORY: + directories.add(tar_member.member_path.rstrip("/")) + return contents + + @classmethod + def from_dict(cls, d: Any) -> "TarMember": + member_path = d["member_path"] + raw_mode = d["mode"] + if not raw_mode.startswith("0o"): + raise ValueError(f"Bad mode for {member_path}") + is_virtual_entry = d.get("is_virtual_entry") or False + path_type = KEY2PATH_TYPE[d["path_type"]] + fs_path = d.get("fs_path") + mode = int(raw_mode[2:], 8) + if is_virtual_entry: + if not path_type.can_be_virtual: + raise ValueError( + f"Bad file type or is_virtual_entry for {d['member_path']}." + " The file type cannot be virtual" + ) + if fs_path is not None: + raise ValueError( + f'Invalid declaration for "{member_path}".' + " The path is listed as a virtual entry but has a file system path" + ) + elif fs_path is None: + raise ValueError( + f'Invalid declaration for "{member_path}".' + " The path is neither a virtual path nor does it have a file system path!" + ) + if path_type == PathType.DIRECTORY and not member_path.endswith("/"): + raise ValueError( + f'Invalid declaration for "{member_path}".' + " The path is listed as a directory but does not end with a slash" + ) + + link_target = d.get("link_target") + if path_type == PathType.SYMLINK: + if mode != 0o777: + raise ValueError( + f'Invalid declaration for "{member_path}".' + f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}." + ) + if not link_target: + raise ValueError( + f'Invalid declaration for "{member_path}".' + " Symlinks must have a link_target" + ) + elif link_target is not None and link_target != "": + # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer + raise ValueError( + f'Invalid declaration for "{member_path}".' + " Only symlinks can have a link_target" + ) + else: + link_target = "" + may_steal_fs_path = d.get("may_steal_fs_path") or False + + if may_steal_fs_path: + assert ( + "debputy/scratch-dir/" in fs_path + ), f"{fs_path} should not have been stealable" + return cls( + member_path=member_path, + path_type=path_type, + fs_path=fs_path, + mode=mode, + owner=d["owner"], + uid=d["uid"], + group=d["group"], + gid=d["gid"], + mtime=float(d["mtime"]), + link_target=link_target, + is_virtual_entry=is_virtual_entry, + may_steal_fs_path=may_steal_fs_path, + ) + + +def output_intermediate_manifest( + manifest_output_file: str, + members: Iterable[TarMember], +) -> None: + with open(manifest_output_file, "w") as fd: + output_intermediate_manifest_to_fd(fd, members) + + +def output_intermediate_manifest_to_fd( + fd: IO[str], members: Iterable[TarMember] +) -> None: + serial_format = [m.to_manifest() for m in members] + json.dump(serial_format, fd) |