summaryrefslogtreecommitdiffstats
path: root/src/debputy/commands/deb_packer.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/commands/deb_packer.py')
-rw-r--r--src/debputy/commands/deb_packer.py557
1 files changed, 557 insertions, 0 deletions
diff --git a/src/debputy/commands/deb_packer.py b/src/debputy/commands/deb_packer.py
new file mode 100644
index 0000000..8c61099
--- /dev/null
+++ b/src/debputy/commands/deb_packer.py
@@ -0,0 +1,557 @@
+#!/usr/bin/python3 -B
+import argparse
+import errno
+import operator
+import os
+import stat
+import subprocess
+import tarfile
+import textwrap
+from typing import Optional, List, FrozenSet, Iterable, Callable, BinaryIO, cast
+
+from debputy.intermediate_manifest import TarMember, PathType
+from debputy.util import (
+ _error,
+ compute_output_filename,
+ resolve_source_date_epoch,
+ ColorizedArgumentParser,
+ setup_logging,
+ program_name,
+ assume_not_none,
+)
+from debputy.version import __version__
+
+
+# AR header / start of a deb file for reference
+# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b|
+# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736|
+# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
+# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 |
+# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.|
+# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736|
+# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
+# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 |
+# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F|
+
+
+class ArMember:
+ def __init__(
+ self,
+ name: str,
+ mtime: int,
+ fixed_binary: Optional[bytes] = None,
+ write_to_impl: Optional[Callable[[BinaryIO], None]] = None,
+ ) -> None:
+ self.name = name
+ self._mtime = mtime
+ self._write_to_impl = write_to_impl
+ self.fixed_binary = fixed_binary
+
+ @property
+ def is_fixed_binary(self) -> bool:
+ return self.fixed_binary is not None
+
+ @property
+ def mtime(self) -> int:
+ return self.mtime
+
+ def write_to(self, fd: BinaryIO) -> None:
+ writer = self._write_to_impl
+ assert writer is not None
+ writer(fd)
+
+
+AR_HEADER_LEN = 60
+AR_HEADER = b" " * AR_HEADER_LEN
+
+
+def write_header(
+ fd: BinaryIO,
+ member: ArMember,
+ member_len: int,
+ mtime: int,
+) -> None:
+ header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % (
+ member.name.encode("ascii"),
+ mtime,
+ member_len,
+ )
+ fd.write(header)
+
+
+def generate_ar_archive(
+ output_filename: str,
+ mtime: int,
+ members: Iterable[ArMember],
+ prefer_raw_exceptions: bool,
+) -> None:
+ try:
+ with open(output_filename, "wb", buffering=0) as fd:
+ fd.write(b"!<arch>\n")
+ for member in members:
+ if member.is_fixed_binary:
+ fixed_binary = assume_not_none(member.fixed_binary)
+ write_header(fd, member, len(fixed_binary), mtime)
+ fd.write(fixed_binary)
+ else:
+ header_pos = fd.tell()
+ fd.write(AR_HEADER)
+ member.write_to(fd)
+ current_pos = fd.tell()
+ fd.seek(header_pos, os.SEEK_SET)
+ content_len = current_pos - header_pos - AR_HEADER_LEN
+ assert content_len >= 0
+ write_header(fd, member, content_len, mtime)
+ fd.seek(current_pos, os.SEEK_SET)
+ except OSError as e:
+ if prefer_raw_exceptions:
+ raise
+ if e.errno == errno.ENOSPC:
+ _error(
+ f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}"
+ )
+ elif e.errno == errno.EIO:
+ _error(
+ f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}"
+ )
+ elif e.errno == errno.EROFS:
+ _error(
+ f"Unable to write {output_filename}. The file system is read-only: {str(e)}"
+ )
+ raise
+ print(f"Generated {output_filename}")
+
+
+def _generate_tar_file(
+ tar_members: Iterable[TarMember],
+ compression_cmd: List[str],
+ write_to: BinaryIO,
+) -> None:
+ with (
+ subprocess.Popen(
+ compression_cmd, stdin=subprocess.PIPE, stdout=write_to
+ ) as compress_proc,
+ tarfile.open(
+ mode="w|",
+ fileobj=compress_proc.stdin,
+ format=tarfile.GNU_FORMAT,
+ errorlevel=1,
+ ) as tar_fd,
+ ):
+ for tar_member in tar_members:
+ tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd)
+ if tar_member.path_type == PathType.FILE:
+ with open(assume_not_none(tar_member.fs_path), "rb") as mfd:
+ tar_fd.addfile(tar_info, fileobj=mfd)
+ else:
+ tar_fd.addfile(tar_info)
+ compress_proc.wait()
+ if compress_proc.returncode != 0:
+ _error(
+ f"Compression command {compression_cmd} failed with code {compress_proc.returncode}"
+ )
+
+
+def generate_tar_file_member(
+ tar_members: Iterable[TarMember],
+ compression_cmd: List[str],
+) -> Callable[[BinaryIO], None]:
+ def _impl(fd: BinaryIO) -> None:
+ _generate_tar_file(
+ tar_members,
+ compression_cmd,
+ fd,
+ )
+
+ return _impl
+
+
+def _xz_cmdline(
+ compression_rule: "Compression",
+ parsed_args: Optional[argparse.Namespace],
+) -> List[str]:
+ compression_level = compression_rule.effective_compression_level(parsed_args)
+ cmdline = ["xz", "-T2", "-" + str(compression_level)]
+ strategy = None if parsed_args is None else parsed_args.compression_strategy
+ if strategy is None:
+ strategy = "none"
+ if strategy != "none":
+ cmdline.append("--" + strategy)
+ cmdline.append("--no-adjust")
+ return cmdline
+
+
+def _gzip_cmdline(
+ compression_rule: "Compression",
+ parsed_args: Optional[argparse.Namespace],
+) -> List[str]:
+ compression_level = compression_rule.effective_compression_level(parsed_args)
+ cmdline = ["gzip", "-n" + str(compression_level)]
+ strategy = None if parsed_args is None else parsed_args.compression_strategy
+ if strategy is not None and strategy != "none":
+ raise ValueError(
+ f"Not implemented: Compression strategy {strategy}"
+ " for gzip is currently unsupported (but dpkg-deb does)"
+ )
+ return cmdline
+
+
+def _uncompressed_cmdline(
+ _unused_a: "Compression",
+ _unused_b: Optional[argparse.Namespace],
+) -> List[str]:
+ return ["cat"]
+
+
+class Compression:
+ def __init__(
+ self,
+ default_compression_level: int,
+ extension: str,
+ allowed_strategies: FrozenSet[str],
+ cmdline_builder: Callable[
+ ["Compression", Optional[argparse.Namespace]], List[str]
+ ],
+ ) -> None:
+ self.default_compression_level = default_compression_level
+ self.extension = extension
+ self.allowed_strategies = allowed_strategies
+ self.cmdline_builder = cmdline_builder
+
+ def __repr__(self) -> str:
+ return f"<{self.__class__.__name__} {self.extension}>"
+
+ def effective_compression_level(
+ self, parsed_args: Optional[argparse.Namespace]
+ ) -> int:
+ if parsed_args and parsed_args.compression_level is not None:
+ return cast("int", parsed_args.compression_level)
+ return self.default_compression_level
+
+ def as_cmdline(self, parsed_args: Optional[argparse.Namespace]) -> List[str]:
+ return self.cmdline_builder(self, parsed_args)
+
+ def with_extension(self, filename: str) -> str:
+ return filename + self.extension
+
+
+COMPRESSIONS = {
+ "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline),
+ "gzip": Compression(
+ 9,
+ ".gz",
+ frozenset({"none", "filtered", "huffman", "rle", "fixed"}),
+ _gzip_cmdline,
+ ),
+ "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline),
+}
+
+
+def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace:
+ if (
+ parsed_args.compression_level == 0
+ and parsed_args.compression_algorithm == "gzip"
+ ):
+ print(
+ "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)"
+ )
+ setattr(parsed_args, "compression_algorithm", "none")
+
+ compression = COMPRESSIONS[parsed_args.compression_algorithm]
+ strategy = parsed_args.compression_strategy
+ if strategy is not None and strategy not in compression.allowed_strategies:
+ _error(
+ f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy'
+ f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}'
+ )
+ return parsed_args
+
+
+def parse_args() -> argparse.Namespace:
+ try:
+ compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"])
+ except (KeyError, ValueError):
+ compression_level_default = None
+
+ try:
+ compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"]
+ except (KeyError, ValueError):
+ compression_type = "xz"
+
+ try:
+ threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"])
+ except (KeyError, ValueError):
+ threads_max = None
+
+ description = textwrap.dedent(
+ """\
+ THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support
+
+ DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as
+ soon as "dpkg-deb -b" grows support for the relevant features.
+
+ This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package
+ without requiring root even for static ownership. It is a temporary stand-in for
+ "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest.
+
+ The tool operates on an internal JSON based manifest for now, because it was faster
+ than building an mtree parser (which is the format that dpkg will likely end up
+ using).
+
+ As the tool is not meant to be used directly, it is full of annoying paper cuts that
+ I refuse to fix or maintain. Use the high level tool instead.
+
+ """
+ )
+
+ parser = ColorizedArgumentParser(
+ description=description,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ allow_abbrev=False,
+ prog=program_name(),
+ )
+ parser.add_argument("--version", action="version", version=__version__)
+ parser.add_argument(
+ "package_root_dir",
+ metavar="PACKAGE_ROOT_DIR",
+ help="Root directory of the package. Must contain a DEBIAN directory",
+ )
+ parser.add_argument(
+ "package_output_path",
+ metavar="PATH",
+ help="Path where the package should be placed. If it is directory,"
+ " the base name will be determined from the package metadata",
+ )
+
+ parser.add_argument(
+ "--intermediate-package-manifest",
+ dest="package_manifest",
+ metavar="JSON_FILE",
+ action="store",
+ default=None,
+ help="INTERMEDIATE package manifest (JSON!)",
+ )
+ parser.add_argument(
+ "--root-owner-group",
+ dest="root_owner_group",
+ action="store_true",
+ help="Ignored. Accepted for compatibility with dpkg-deb -b",
+ )
+ parser.add_argument(
+ "-b",
+ "--build",
+ dest="build_param",
+ action="store_true",
+ help="Ignored. Accepted for compatibility with dpkg-deb",
+ )
+ parser.add_argument(
+ "--source-date-epoch",
+ dest="source_date_epoch",
+ action="store",
+ type=int,
+ default=None,
+ help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable",
+ )
+ parser.add_argument(
+ "-Z",
+ dest="compression_algorithm",
+ choices=COMPRESSIONS,
+ default=compression_type,
+ help="The compression algorithm to be used",
+ )
+ parser.add_argument(
+ "-z",
+ dest="compression_level",
+ metavar="{0-9}",
+ choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
+ default=compression_level_default,
+ type=int,
+ help="The compression level to be used",
+ )
+ parser.add_argument(
+ "-S",
+ dest="compression_strategy",
+ # We have a different default for xz when strategy is unset and we are building a udeb
+ action="store",
+ default=None,
+ help="The compression algorithm to be used. Concrete values depend on the compression"
+ ' algorithm, but the value "none" is always allowed',
+ )
+ parser.add_argument(
+ "--uniform-compression",
+ dest="uniform_compression",
+ action="store_true",
+ default=True,
+ help="Whether to use the same compression for the control.tar and the data.tar."
+ " The default is to use uniform compression.",
+ )
+ parser.add_argument(
+ "--no-uniform-compression",
+ dest="uniform_compression",
+ action="store_false",
+ default=True,
+ help="Disable uniform compression (see --uniform-compression)",
+ )
+ parser.add_argument(
+ "--threads-max",
+ dest="threads_max",
+ default=threads_max,
+ # TODO: Support this properly
+ type=int,
+ help="Ignored; accepted for compatibility",
+ )
+ parser.add_argument(
+ "-d",
+ "--debug",
+ dest="debug_mode",
+ action="store_true",
+ default=False,
+ help="Enable debug logging and raw stack traces on errors",
+ )
+
+ parsed_args = parser.parse_args()
+ parsed_args = _normalize_compression_args(parsed_args)
+
+ return parsed_args
+
+
+def _ctrl_member(
+ member_path: str,
+ fs_path: Optional[str] = None,
+ path_type: PathType = PathType.FILE,
+ mode: int = 0o644,
+ mtime: int = 0,
+) -> TarMember:
+ if fs_path is None:
+ assert member_path.startswith("./")
+ fs_path = "DEBIAN" + member_path[1:]
+ return TarMember(
+ member_path=member_path,
+ path_type=path_type,
+ fs_path=fs_path,
+ mode=mode,
+ owner="root",
+ uid=0,
+ group="root",
+ gid=0,
+ mtime=mtime,
+ )
+
+
+CTRL_MEMBER_SCRIPTS = {
+ "postinst",
+ "preinst",
+ "postrm",
+ "prerm",
+ "config",
+ "isinstallable",
+}
+
+
+def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]:
+ debian_root = os.path.join(package_root_dir, "DEBIAN")
+ dir_st = os.stat(debian_root)
+ dir_mtime = int(dir_st.st_mtime)
+ yield _ctrl_member(
+ "./",
+ debian_root,
+ path_type=PathType.DIRECTORY,
+ mode=0o0755,
+ mtime=min(mtime, dir_mtime),
+ )
+ with os.scandir(debian_root) as dir_iter:
+ for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")):
+ st = os.stat(ctrl_member)
+ if not stat.S_ISREG(st.st_mode):
+ _error(
+ f"{ctrl_member.path} is not a file and all control.tar members ought to be files!"
+ )
+ file_mtime = int(st.st_mtime)
+ yield _ctrl_member(
+ f"./{ctrl_member.name}",
+ path_type=PathType.FILE,
+ fs_path=ctrl_member.path,
+ mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644,
+ mtime=min(mtime, file_mtime),
+ )
+
+
+def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]":
+ if manifest_path is None:
+ _error(f"--intermediate-package-manifest is mandatory for now")
+ return TarMember.parse_intermediate_manifest(manifest_path)
+
+
+def main() -> None:
+ setup_logging()
+ parsed_args = parse_args()
+ root_dir: str = parsed_args.package_root_dir
+ output_path: str = parsed_args.package_output_path
+ mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
+
+ data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm]
+ data_compression_cmd = data_compression.as_cmdline(parsed_args)
+ if parsed_args.uniform_compression:
+ ctrl_compression = data_compression
+ ctrl_compression_cmd = data_compression_cmd
+ else:
+ ctrl_compression = COMPRESSIONS["gzip"]
+ ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None)
+
+ if output_path.endswith("/") or os.path.isdir(output_path):
+ deb_file = os.path.join(
+ output_path,
+ compute_output_filename(os.path.join(root_dir, "DEBIAN"), False),
+ )
+ else:
+ deb_file = output_path
+
+ pack(
+ deb_file,
+ ctrl_compression,
+ data_compression,
+ root_dir,
+ parsed_args.package_manifest,
+ mtime,
+ ctrl_compression_cmd,
+ data_compression_cmd,
+ prefer_raw_exceptions=not parsed_args.debug_mode,
+ )
+
+
+def pack(
+ deb_file: str,
+ ctrl_compression: Compression,
+ data_compression: Compression,
+ root_dir: str,
+ package_manifest: "Optional[str]",
+ mtime: int,
+ ctrl_compression_cmd: List[str],
+ data_compression_cmd: List[str],
+ prefer_raw_exceptions: bool = False,
+) -> None:
+ data_tar_members = parse_manifest(package_manifest)
+ members = [
+ ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"),
+ ArMember(
+ ctrl_compression.with_extension("control.tar"),
+ mtime,
+ write_to_impl=generate_tar_file_member(
+ _ctrl_tar_members(root_dir, mtime),
+ ctrl_compression_cmd,
+ ),
+ ),
+ ArMember(
+ data_compression.with_extension("data.tar"),
+ mtime,
+ write_to_impl=generate_tar_file_member(
+ data_tar_members,
+ data_compression_cmd,
+ ),
+ ),
+ ]
+ generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions)
+
+
+if __name__ == "__main__":
+ main()