diff options
Diffstat (limited to 'src/debputy/commands/deb_materialization.py')
-rw-r--r-- | src/debputy/commands/deb_materialization.py | 587 |
1 files changed, 587 insertions, 0 deletions
diff --git a/src/debputy/commands/deb_materialization.py b/src/debputy/commands/deb_materialization.py new file mode 100644 index 0000000..58764d0 --- /dev/null +++ b/src/debputy/commands/deb_materialization.py @@ -0,0 +1,587 @@ +#!/usr/bin/python3 -B +import argparse +import collections +import contextlib +import json +import os +import subprocess +import sys +import tempfile +import textwrap +from datetime import datetime +from typing import Optional, List, Iterator, Dict, Tuple + +from debputy import DEBPUTY_ROOT_DIR +from debputy.intermediate_manifest import ( + TarMember, + PathType, + output_intermediate_manifest, + output_intermediate_manifest_to_fd, +) +from debputy.util import ( + _error, + _info, + compute_output_filename, + resolve_source_date_epoch, + ColorizedArgumentParser, + setup_logging, + detect_fakeroot, + print_command, + program_name, +) +from debputy.version import __version__ + + +def parse_args() -> argparse.Namespace: + description = textwrap.dedent( + """\ + This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling + the deb from a materialization. + + The tool is not intended to be run directly by end users. + """ + ) + + parser = ColorizedArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + allow_abbrev=False, + prog=program_name(), + ) + + parser.add_argument("--version", action="version", version=__version__) + + subparsers = parser.add_subparsers(dest="command", required=True) + + materialize_deb_parser = subparsers.add_parser( + "materialize-deb", + allow_abbrev=False, + help="Generate .deb/.udebs structure from a root directory and" + " a *intermediate* debputy manifest", + ) + materialize_deb_parser.add_argument( + "control_root_dir", + metavar="control-root-dir", + help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)", + ) + materialize_deb_parser.add_argument( + "materialization_output", + metavar="materialization_output", + help="Where to place the resulting structure should be placed. Should not exist", + ) + materialize_deb_parser.add_argument( + "--discard-existing-output", + dest="discard_existing_output", + default=False, + action="store_true", + help="If passed, then the output location may exist." + " If it does, it will be *deleted*.", + ) + materialize_deb_parser.add_argument( + "--source-date-epoch", + dest="source_date_epoch", + action="store", + type=int, + default=None, + help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ" + " variable", + ) + materialize_deb_parser.add_argument( + "--may-move-control-files", + dest="may_move_control_files", + action="store_true", + default=False, + help="Whether the command may optimize by moving (rather than copying) DEBIAN files", + ) + materialize_deb_parser.add_argument( + "--may-move-data-files", + dest="may_move_data_files", + action="store_true", + default=False, + help="Whether the command may optimize by moving (rather than copying) when materializing", + ) + + materialize_deb_parser.add_argument( + "--intermediate-package-manifest", + dest="package_manifest", + metavar="JSON_FILE", + action="store", + default=None, + help="INTERMEDIATE package manifest (JSON!)", + ) + + materialize_deb_parser.add_argument( + "--udeb", + dest="udeb", + default=False, + action="store_true", + help="Whether this is udeb package. Affects extension and default compression", + ) + + materialize_deb_parser.add_argument( + "--build-method", + dest="build_method", + choices=["debputy", "dpkg-deb"], + type=str, + default=None, + help="Immediately assemble the deb as well using the selected method", + ) + materialize_deb_parser.add_argument( + "--assembled-deb-output", + dest="assembled_deb_output", + type=str, + default=None, + help="Where to place the resulting deb. Only applicable with --build-method", + ) + + # Added for "help only" - you cannot trigger this option in practice + materialize_deb_parser.add_argument( + "--", + metavar="DPKG_DEB_ARGS", + action="extend", + nargs="+", + dest="unused", + help="Arguments to be passed to dpkg-deb" + " (same as you might pass to dh_builddeb).", + ) + + build_deb_structure = subparsers.add_parser( + "build-materialized-deb", + allow_abbrev=False, + help="Produce a .deb from a directory produced by the" + " materialize-deb-structure command", + ) + build_deb_structure.add_argument( + "materialized_deb_root_dir", + metavar="materialized-deb-root-dir", + help="The output directory of the materialize-deb-structure command", + ) + build_deb_structure.add_argument( + "build_method", + metavar="build-method", + choices=["debputy", "dpkg-deb"], + type=str, + default="dpkg-deb", + help="Which tool should assemble the deb", + ) + build_deb_structure.add_argument( + "--output", type=str, default=None, help="Where to place the resulting deb" + ) + + argv = sys.argv + try: + i = argv.index("--") + upstream_args = argv[i + 1 :] + argv = argv[:i] + except (IndexError, ValueError): + upstream_args = [] + parsed_args = parser.parse_args(argv[1:]) + setattr(parsed_args, "upstream_args", upstream_args) + + return parsed_args + + +def _run(cmd: List[str]) -> None: + print_command(*cmd) + subprocess.check_call(cmd) + + +def strip_path_prefix(member_path: str) -> str: + if not member_path.startswith("./"): + _error( + f'Invalid manifest: "{member_path}" does not start with "./", but all paths should' + ) + return member_path[2:] + + +def _perform_data_tar_materialization( + output_packaging_root: str, + intermediate_manifest: List[TarMember], + may_move_data_files: bool, +) -> List[Tuple[str, TarMember]]: + start_time = datetime.now() + replacement_manifest_paths = [] + _info("Materializing data.tar part of the deb:") + + directories = ["mkdir"] + symlinks = [] + bulk_copies: Dict[str, List[str]] = collections.defaultdict(list) + copies = [] + renames = [] + + for tar_member in intermediate_manifest: + member_path = strip_path_prefix(tar_member.member_path) + new_fs_path = ( + os.path.join("deb-root", member_path) if member_path else "deb-root" + ) + materialization_path = ( + f"{output_packaging_root}/{member_path}" + if member_path + else output_packaging_root + ) + replacement_tar_member = tar_member + materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/")) + if tar_member.path_type == PathType.DIRECTORY: + directories.append(materialization_path) + elif tar_member.path_type == PathType.SYMLINK: + symlinks.append((tar_member.link_target, materialization_path)) + elif tar_member.fs_path is not None: + if tar_member.link_target: + # Not sure if hardlinks gets here yet as we do not support hardlinks + _error("Internal error; hardlink not supported") + + if may_move_data_files and tar_member.may_steal_fs_path: + renames.append((tar_member.fs_path, materialization_path)) + elif os.path.basename(tar_member.fs_path) == os.path.basename( + materialization_path + ): + bulk_copies[materialization_parent_dir].append(tar_member.fs_path) + else: + copies.append((tar_member.fs_path, materialization_path)) + else: + _error(f"Internal error; unsupported path type {tar_member.path_type}") + + if tar_member.fs_path is not None: + replacement_tar_member = tar_member.clone_and_replace( + fs_path=new_fs_path, may_steal_fs_path=False + ) + + replacement_manifest_paths.append( + (materialization_path, replacement_tar_member) + ) + + if len(directories) > 1: + _run(directories) + + for dest_dir, files in bulk_copies.items(): + cmd = ["cp", "--reflink=auto", "-t", dest_dir] + cmd.extend(files) + _run(cmd) + + for source, dest in copies: + _run(["cp", "--reflink=auto", source, dest]) + + for source, dest in renames: + print_command("mv", source, dest) + os.rename(source, dest) + + for link_target, link_path in symlinks: + print_command("ln", "-s", link_target, link_path) + os.symlink(link_target, link_path) + + end_time = datetime.now() + + _info(f"Materialization of data.tar finished, took: {end_time - start_time}") + + return replacement_manifest_paths + + +def materialize_deb( + control_root_dir: str, + intermediate_manifest_path: Optional[str], + source_date_epoch: int, + dpkg_deb_options: List[str], + is_udeb: bool, + output_dir: str, + may_move_control_files: bool, + may_move_data_files: bool, +) -> None: + if not os.path.isfile(f"{control_root_dir}/control"): + _error( + f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)' + ) + intermediate_manifest: List[TarMember] = parse_manifest(intermediate_manifest_path) + + output_packaging_root = os.path.join(output_dir, "deb-root") + os.mkdir(output_dir) + + replacement_manifest_paths = _perform_data_tar_materialization( + output_packaging_root, intermediate_manifest, may_move_data_files + ) + for materialization_path, tar_member in reversed(replacement_manifest_paths): + # TODO: Hardlinks should probably skip these commands + if tar_member.path_type != PathType.SYMLINK: + os.chmod(materialization_path, tar_member.mode, follow_symlinks=False) + os.utime( + materialization_path, + (tar_member.mtime, tar_member.mtime), + follow_symlinks=False, + ) + + materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN" + if may_move_control_files: + print_command("mv", control_root_dir, materialized_ctrl_dir) + os.rename(control_root_dir, materialized_ctrl_dir) + else: + os.mkdir(materialized_ctrl_dir) + copy_cmd = ["cp", "-a"] + copy_cmd.extend( + os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir) + ) + copy_cmd.append(materialized_ctrl_dir) + _run(copy_cmd) + + output_intermediate_manifest( + os.path.join(output_dir, "deb-structure-intermediate-manifest.json"), + [t[1] for t in replacement_manifest_paths], + ) + + with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd: + serial_format = { + "env": { + "SOURCE_DATE_EPOCH": str(source_date_epoch), + "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get( + "DPKG_DEB_COMPRESSOR_LEVEL" + ), + "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"), + "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"), + }, + "cli": {"dpkg-deb": dpkg_deb_options}, + "udeb": is_udeb, + } + json.dump(serial_format, fd) + + +def apply_fs_metadata( + materialized_path: str, + tar_member: TarMember, + apply_ownership: bool, + is_using_fakeroot: bool, +) -> None: + if apply_ownership: + os.chown( + materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False + ) + # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata. + # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally + # also prevents that problem. + if tar_member.path_type != PathType.SYMLINK: + os.chmod(materialized_path, tar_member.mode, follow_symlinks=False) + os.utime( + materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False + ) + if is_using_fakeroot: + st = os.stat(materialized_path, follow_symlinks=False) + if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid: + _error( + 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.' + " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python" + " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)" + ) + + +def _dpkg_deb_root_requirements( + intermediate_manifest: List[TarMember], +) -> Tuple[List[str], bool, bool]: + needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest) + if needs_root: + if os.getuid() != 0: + _error( + 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents' + ) + is_using_fakeroot = detect_fakeroot() + deb_cmd = ["dpkg-deb"] + _info("Applying ownership, mode, and utime from the intermediate manifest...") + else: + # fakeroot does not matter in this case + is_using_fakeroot = False + deb_cmd = ["dpkg-deb", "--root-owner-group"] + _info("Applying mode and utime from the intermediate manifest...") + return deb_cmd, needs_root, is_using_fakeroot + + +@contextlib.contextmanager +def maybe_with_materialized_manifest( + content: Optional[List[TarMember]], +) -> Iterator[Optional[str]]: + if content is not None: + with tempfile.NamedTemporaryFile( + prefix="debputy-mat-build", + mode="w+t", + suffix=".json", + encoding="utf-8", + ) as fd: + output_intermediate_manifest_to_fd(fd, content) + fd.flush() + yield fd.name + else: + yield None + + +def _prep_assembled_deb_output_path( + output_path: Optional[str], + materialized_deb_structure: str, + deb_root: str, + method: str, + is_udeb: bool, +) -> str: + if output_path is None: + ext = "udeb" if is_udeb else "deb" + output_dir = os.path.join(materialized_deb_structure, "output") + if not os.path.isdir(output_dir): + os.mkdir(output_dir) + output = os.path.join(output_dir, f"{method}.{ext}") + elif os.path.isdir(output_path): + output = os.path.join( + output_path, + compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb), + ) + else: + output = output_path + return output + + +def _apply_env(env: Dict[str, Optional[str]]) -> None: + for name, value in env.items(): + if value is not None: + os.environ[name] = value + else: + try: + del os.environ[name] + except KeyError: + pass + + +def assemble_deb( + materialized_deb_structure: str, + method: str, + output_path: Optional[str], + combined_materialization_and_assembly: bool, +) -> None: + deb_root = os.path.join(materialized_deb_structure, "deb-root") + + with open(os.path.join(materialized_deb_structure, "env-and-cli.json"), "r") as fd: + serial_format = json.load(fd) + + env = serial_format.get("env") or {} + cli = serial_format.get("cli") or {} + is_udeb = serial_format.get("udeb") + source_date_epoch = env.get("SOURCE_DATE_EPOCH") + dpkg_deb_options = cli.get("dpkg-deb") or [] + intermediate_manifest_path = os.path.join( + materialized_deb_structure, "deb-structure-intermediate-manifest.json" + ) + original_intermediate_manifest = TarMember.parse_intermediate_manifest( + intermediate_manifest_path + ) + _info( + "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..." + ) + intermediate_manifest = [ + ( + tar_member.clone_and_replace( + fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path) + ) + if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/") + else tar_member + ) + for tar_member in original_intermediate_manifest + ] + materialized_manifest = None + if method == "debputy": + materialized_manifest = intermediate_manifest + + if source_date_epoch is None: + _error( + "Cannot reproduce the deb. No source date epoch provided in the materialized deb root." + ) + _apply_env(env) + + output = _prep_assembled_deb_output_path( + output_path, + materialized_deb_structure, + deb_root, + method, + is_udeb, + ) + + with maybe_with_materialized_manifest(materialized_manifest) as tmp_file: + if method == "dpkg-deb": + deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements( + intermediate_manifest + ) + if needs_root or not combined_materialization_and_assembly: + for tar_member in reversed(intermediate_manifest): + p = os.path.join( + deb_root, strip_path_prefix(tar_member.member_path) + ) + apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot) + elif method == "debputy": + deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py") + assert tmp_file is not None + deb_cmd = [ + deb_packer, + "--intermediate-package-manifest", + tmp_file, + "--source-date-epoch", + source_date_epoch, + ] + else: + _error(f"Internal error: Unsupported assembly method: {method}") + + if is_udeb: + deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"]) + deb_cmd.extend(dpkg_deb_options) + deb_cmd.extend(["--build", deb_root, output]) + start_time = datetime.now() + _run(deb_cmd) + end_time = datetime.now() + _info(f" - assembly command took {end_time - start_time}") + + +def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]": + if manifest_path is None: + _error("--intermediate-package-manifest is mandatory for now") + return TarMember.parse_intermediate_manifest(manifest_path) + + +def main() -> None: + setup_logging() + parsed_args = parse_args() + if parsed_args.command == "materialize-deb": + mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) + dpkg_deb_args = parsed_args.upstream_args or [] + output_dir = parsed_args.materialization_output + if os.path.exists(output_dir): + if not parsed_args.discard_existing_output: + _error( + "The output path already exists. Please either choose a non-existing path, delete the path" + " or use --discard-existing-output (to have this command remove it as necessary)." + ) + _info( + f'Removing existing path "{output_dir}" as requested by --discard-existing-output' + ) + _run(["rm", "-fr", output_dir]) + + materialize_deb( + parsed_args.control_root_dir, + parsed_args.package_manifest, + mtime, + dpkg_deb_args, + parsed_args.udeb, + output_dir, + parsed_args.may_move_control_files, + parsed_args.may_move_data_files, + ) + + if parsed_args.build_method is not None: + assemble_deb( + output_dir, + parsed_args.build_method, + parsed_args.assembled_deb_output, + True, + ) + + elif parsed_args.command == "build-materialized-deb": + assemble_deb( + parsed_args.materialized_deb_root_dir, + parsed_args.build_method, + parsed_args.output, + False, + ) + else: + _error(f'Internal error: Unimplemented command "{parsed_args.command}"') + + +if __name__ == "__main__": + main() |