summaryrefslogtreecommitdiffstats
path: root/src/debputy/intermediate_manifest.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/intermediate_manifest.py')
-rw-r--r--src/debputy/intermediate_manifest.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/src/debputy/intermediate_manifest.py b/src/debputy/intermediate_manifest.py
new file mode 100644
index 0000000..7d8dd63
--- /dev/null
+++ b/src/debputy/intermediate_manifest.py
@@ -0,0 +1,333 @@
+import dataclasses
+import json
+import os
+import stat
+import sys
+import tarfile
+from enum import Enum
+
+
+from typing import Optional, List, Dict, Any, Iterable, Union, Self, Mapping, IO
+
+IntermediateManifest = List["TarMember"]
+
+
+class PathType(Enum):
+ FILE = ("file", tarfile.REGTYPE)
+ DIRECTORY = ("directory", tarfile.DIRTYPE)
+ SYMLINK = ("symlink", tarfile.SYMTYPE)
+ # TODO: Add hardlink, FIFO, Char device, BLK device, etc.
+
+ @property
+ def manifest_key(self) -> str:
+ return self.value[0]
+
+ @property
+ def tarinfo_type(self) -> bytes:
+ return self.value[1]
+
+ @property
+ def can_be_virtual(self) -> bool:
+ return self in (PathType.DIRECTORY, PathType.SYMLINK)
+
+
+KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType}
+
+
+def _dirname(path: str) -> str:
+ path = path.rstrip("/")
+ if path == ".":
+ return path
+ return os.path.dirname(path)
+
+
+def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType:
+ if stat.S_ISREG(st_mode):
+ path_type = PathType.FILE
+ elif stat.S_ISDIR(st_mode):
+ path_type = PathType.DIRECTORY
+ # elif stat.S_ISFIFO(st_result):
+ # type = FIFOTYPE
+ elif stat.S_ISLNK(st_mode):
+ raise ValueError(
+ "Symlinks should have been rewritten to use the virtual rule."
+ " Otherwise, the link would not be normalized according to Debian Policy."
+ )
+ # elif stat.S_ISCHR(st_result):
+ # type = CHRTYPE
+ # elif stat.S_ISBLK(st_result):
+ # type = BLKTYPE
+ else:
+ raise ValueError(
+ f"The path {fs_path} had an unsupported/unknown file type."
+ f" Probably a bug in the tool"
+ )
+ return path_type
+
+
+@dataclasses.dataclass(slots=True)
+class TarMember:
+ member_path: str
+ path_type: PathType
+ fs_path: Optional[str]
+ mode: int
+ owner: str
+ uid: int
+ group: str
+ gid: int
+ mtime: float
+ link_target: str = ""
+ is_virtual_entry: bool = False
+ may_steal_fs_path: bool = False
+
+ def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo:
+ tar_info: tarfile.TarInfo
+ if self.is_virtual_entry:
+ assert self.path_type.can_be_virtual
+ tar_info = tar_fd.tarinfo(self.member_path)
+ tar_info.size = 0
+ tar_info.type = self.path_type.tarinfo_type
+ tar_info.linkpath = self.link_target
+ else:
+ try:
+ tar_info = tar_fd.gettarinfo(
+ name=self.fs_path, arcname=self.member_path
+ )
+ except (TypeError, ValueError) as e:
+ raise ValueError(
+ f"Unable to prepare tar info for {self.member_path}"
+ ) from e
+ # TODO: Eventually, we should be able to unconditionally rely on link_target. However,
+ # until we got symlinks and hardlinks correctly done in the JSON generator, it will be
+ # conditional for now.
+ if self.link_target != "":
+ tar_info.linkpath = self.link_target
+ tar_info.mode = self.mode
+ tar_info.uname = self.owner
+ tar_info.uid = self.uid
+ tar_info.gname = self.group
+ tar_info.gid = self.gid
+ tar_info.mode = self.mode
+ tar_info.mtime = int(self.mtime)
+
+ return tar_info
+
+ @classmethod
+ def from_file(
+ cls,
+ member_path: str,
+ fs_path: str,
+ mode: Optional[int] = None,
+ owner: str = "root",
+ uid: int = 0,
+ group: str = "root",
+ gid: int = 0,
+ path_mtime: Optional[Union[float, int]] = None,
+ clamp_mtime_to: Optional[int] = None,
+ path_type: Optional[PathType] = None,
+ may_steal_fs_path: bool = False,
+ ) -> "TarMember":
+ # Avoid lstat'ing if we can as it makes it easier to do tests of the code
+ # (as we do not need an existing physical fs path)
+ if path_type is None or path_mtime is None or mode is None:
+ st_result = os.lstat(fs_path)
+ st_mode = st_result.st_mode
+ if mode is None:
+ mode = st_mode
+ if path_mtime is None:
+ path_mtime = st_result.st_mtime
+ if path_type is None:
+ path_type = _fs_type_from_st_mode(fs_path, st_mode)
+
+ if clamp_mtime_to is not None and path_mtime > clamp_mtime_to:
+ path_mtime = clamp_mtime_to
+
+ if may_steal_fs_path:
+ assert (
+ "debputy/scratch-dir/" in fs_path
+ ), f"{fs_path} should not have been stealable"
+
+ return cls(
+ member_path=member_path,
+ path_type=path_type,
+ fs_path=fs_path,
+ mode=mode,
+ owner=owner,
+ uid=uid,
+ group=group,
+ gid=gid,
+ mtime=float(path_mtime),
+ is_virtual_entry=False,
+ may_steal_fs_path=may_steal_fs_path,
+ )
+
+ @classmethod
+ def virtual_path(
+ cls,
+ member_path: str,
+ path_type: PathType,
+ mtime: float,
+ mode: int,
+ link_target: str = "",
+ owner: str = "root",
+ uid: int = 0,
+ group: str = "root",
+ gid: int = 0,
+ ) -> Self:
+ if not path_type.can_be_virtual:
+ raise ValueError(f"The path type {path_type.name} cannot be virtual")
+ if (path_type == PathType.SYMLINK) ^ bool(link_target):
+ if not link_target:
+ raise ValueError("Symlinks must have a link target")
+ # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this
+ # code then!
+ raise ValueError("Non-symlinks must not have a link target")
+ return cls(
+ member_path=member_path,
+ path_type=path_type,
+ fs_path=None,
+ link_target=link_target,
+ mode=mode,
+ owner=owner,
+ uid=uid,
+ group=group,
+ gid=gid,
+ mtime=mtime,
+ is_virtual_entry=True,
+ )
+
+ def clone_and_replace(self, /, **changes: Any) -> "TarMember":
+ return dataclasses.replace(self, **changes)
+
+ def to_manifest(self) -> Dict[str, Any]:
+ d = dataclasses.asdict(self)
+ try:
+ d["mode"] = oct(self.mode)
+ except (TypeError, ValueError) as e:
+ raise TypeError(f"Bad mode in TarMember {self.member_path}") from e
+ d["path_type"] = self.path_type.manifest_key
+ # "compress" the output by removing redundant fields
+ if self.link_target is None or self.link_target == "":
+ del d["link_target"]
+ if self.is_virtual_entry:
+ assert self.fs_path is None
+ del d["fs_path"]
+ else:
+ del d["is_virtual_entry"]
+ return d
+
+ @classmethod
+ def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest:
+ directories = {"."}
+ if manifest_path == "-":
+ with sys.stdin as fd:
+ data = json.load(fd)
+ contents = [TarMember.from_dict(m) for m in data]
+ else:
+ with open(manifest_path) as fd:
+ data = json.load(fd)
+ contents = [TarMember.from_dict(m) for m in data]
+ if not contents:
+ raise ValueError(
+ "Empty manifest (note that the root directory should always be present"
+ )
+ if contents[0].member_path != "./":
+ raise ValueError('The first member must always be the root directory "./"')
+ for tar_member in contents:
+ directory = _dirname(tar_member.member_path)
+ if directory not in directories:
+ raise ValueError(
+ f'The path "{tar_member.member_path}" came before the directory it is in (or the path'
+ f" is not a directory). Either way leads to a broken deb."
+ )
+ if tar_member.path_type == PathType.DIRECTORY:
+ directories.add(tar_member.member_path.rstrip("/"))
+ return contents
+
+ @classmethod
+ def from_dict(cls, d: Any) -> "TarMember":
+ member_path = d["member_path"]
+ raw_mode = d["mode"]
+ if not raw_mode.startswith("0o"):
+ raise ValueError(f"Bad mode for {member_path}")
+ is_virtual_entry = d.get("is_virtual_entry") or False
+ path_type = KEY2PATH_TYPE[d["path_type"]]
+ fs_path = d.get("fs_path")
+ mode = int(raw_mode[2:], 8)
+ if is_virtual_entry:
+ if not path_type.can_be_virtual:
+ raise ValueError(
+ f"Bad file type or is_virtual_entry for {d['member_path']}."
+ " The file type cannot be virtual"
+ )
+ if fs_path is not None:
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ " The path is listed as a virtual entry but has a file system path"
+ )
+ elif fs_path is None:
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ " The path is neither a virtual path nor does it have a file system path!"
+ )
+ if path_type == PathType.DIRECTORY and not member_path.endswith("/"):
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ " The path is listed as a directory but does not end with a slash"
+ )
+
+ link_target = d.get("link_target")
+ if path_type == PathType.SYMLINK:
+ if mode != 0o777:
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}."
+ )
+ if not link_target:
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ " Symlinks must have a link_target"
+ )
+ elif link_target is not None and link_target != "":
+ # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer
+ raise ValueError(
+ f'Invalid declaration for "{member_path}".'
+ " Only symlinks can have a link_target"
+ )
+ else:
+ link_target = ""
+ may_steal_fs_path = d.get("may_steal_fs_path") or False
+
+ if may_steal_fs_path:
+ assert (
+ "debputy/scratch-dir/" in fs_path
+ ), f"{fs_path} should not have been stealable"
+ return cls(
+ member_path=member_path,
+ path_type=path_type,
+ fs_path=fs_path,
+ mode=mode,
+ owner=d["owner"],
+ uid=d["uid"],
+ group=d["group"],
+ gid=d["gid"],
+ mtime=float(d["mtime"]),
+ link_target=link_target,
+ is_virtual_entry=is_virtual_entry,
+ may_steal_fs_path=may_steal_fs_path,
+ )
+
+
+def output_intermediate_manifest(
+ manifest_output_file: str,
+ members: Iterable[TarMember],
+) -> None:
+ with open(manifest_output_file, "w") as fd:
+ output_intermediate_manifest_to_fd(fd, members)
+
+
+def output_intermediate_manifest_to_fd(
+ fd: IO[str], members: Iterable[TarMember]
+) -> None:
+ serial_format = [m.to_manifest() for m in members]
+ json.dump(serial_format, fd)