Coverage for src/debputy/commands/deb_packer.py: 58%
197 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1#!/usr/bin/python3 -B
2import argparse
3import errno
4import operator
5import os
6import stat
7import subprocess
8import tarfile
9import textwrap
10from typing import Optional, List, FrozenSet, Iterable, Callable, BinaryIO, cast
12from debputy.intermediate_manifest import TarMember, PathType
13from debputy.util import (
14 _error,
15 compute_output_filename,
16 resolve_source_date_epoch,
17 ColorizedArgumentParser,
18 setup_logging,
19 program_name,
20 assume_not_none,
21)
22from debputy.version import __version__
25# AR header / start of a deb file for reference
26# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b|
27# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736|
28# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
29# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 |
30# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.|
31# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736|
32# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
33# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 |
34# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F|
37class ArMember:
38 def __init__(
39 self,
40 name: str,
41 mtime: int,
42 fixed_binary: Optional[bytes] = None,
43 write_to_impl: Optional[Callable[[BinaryIO], None]] = None,
44 ) -> None:
45 self.name = name
46 self._mtime = mtime
47 self._write_to_impl = write_to_impl
48 self.fixed_binary = fixed_binary
50 @property
51 def is_fixed_binary(self) -> bool:
52 return self.fixed_binary is not None
54 @property
55 def mtime(self) -> int:
56 return self.mtime
58 def write_to(self, fd: BinaryIO) -> None:
59 writer = self._write_to_impl
60 assert writer is not None
61 writer(fd)
64AR_HEADER_LEN = 60
65AR_HEADER = b" " * AR_HEADER_LEN
68def write_header(
69 fd: BinaryIO,
70 member: ArMember,
71 member_len: int,
72 mtime: int,
73) -> None:
74 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % (
75 member.name.encode("ascii"),
76 mtime,
77 member_len,
78 )
79 fd.write(header)
82def generate_ar_archive(
83 output_filename: str,
84 mtime: int,
85 members: Iterable[ArMember],
86 prefer_raw_exceptions: bool,
87) -> None:
88 try:
89 with open(output_filename, "wb", buffering=0) as fd:
90 fd.write(b"!<arch>\n")
91 for member in members:
92 if member.is_fixed_binary:
93 fixed_binary = assume_not_none(member.fixed_binary)
94 write_header(fd, member, len(fixed_binary), mtime)
95 fd.write(fixed_binary)
96 else:
97 header_pos = fd.tell()
98 fd.write(AR_HEADER)
99 member.write_to(fd)
100 current_pos = fd.tell()
101 fd.seek(header_pos, os.SEEK_SET)
102 content_len = current_pos - header_pos - AR_HEADER_LEN
103 assert content_len >= 0
104 write_header(fd, member, content_len, mtime)
105 fd.seek(current_pos, os.SEEK_SET)
106 except OSError as e:
107 if prefer_raw_exceptions:
108 raise
109 if e.errno == errno.ENOSPC:
110 _error(
111 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}"
112 )
113 elif e.errno == errno.EIO:
114 _error(
115 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}"
116 )
117 elif e.errno == errno.EROFS:
118 _error(
119 f"Unable to write {output_filename}. The file system is read-only: {str(e)}"
120 )
121 raise
122 print(f"Generated {output_filename}")
125def _generate_tar_file(
126 tar_members: Iterable[TarMember],
127 compression_cmd: List[str],
128 write_to: BinaryIO,
129) -> None:
130 with (
131 subprocess.Popen(
132 compression_cmd, stdin=subprocess.PIPE, stdout=write_to
133 ) as compress_proc,
134 tarfile.open(
135 mode="w|",
136 fileobj=compress_proc.stdin,
137 format=tarfile.GNU_FORMAT,
138 errorlevel=1,
139 ) as tar_fd,
140 ):
141 for tar_member in tar_members:
142 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd)
143 if tar_member.path_type == PathType.FILE:
144 with open(assume_not_none(tar_member.fs_path), "rb") as mfd:
145 tar_fd.addfile(tar_info, fileobj=mfd)
146 else:
147 tar_fd.addfile(tar_info)
148 compress_proc.wait()
149 if compress_proc.returncode != 0: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true
150 _error(
151 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}"
152 )
155def generate_tar_file_member(
156 tar_members: Iterable[TarMember],
157 compression_cmd: List[str],
158) -> Callable[[BinaryIO], None]:
159 def _impl(fd: BinaryIO) -> None:
160 _generate_tar_file(
161 tar_members,
162 compression_cmd,
163 fd,
164 )
166 return _impl
169def _xz_cmdline(
170 compression_rule: "Compression",
171 parsed_args: Optional[argparse.Namespace],
172) -> List[str]:
173 compression_level = compression_rule.effective_compression_level(parsed_args)
174 cmdline = ["xz", "-T2", "-" + str(compression_level)]
175 strategy = None if parsed_args is None else parsed_args.compression_strategy
176 if strategy is None: 176 ↛ 178line 176 didn't jump to line 178, because the condition on line 176 was never false
177 strategy = "none"
178 if strategy != "none": 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true
179 cmdline.append("--" + strategy)
180 cmdline.append("--no-adjust")
181 return cmdline
184def _gzip_cmdline(
185 compression_rule: "Compression",
186 parsed_args: Optional[argparse.Namespace],
187) -> List[str]:
188 compression_level = compression_rule.effective_compression_level(parsed_args)
189 cmdline = ["gzip", "-n" + str(compression_level)]
190 strategy = None if parsed_args is None else parsed_args.compression_strategy
191 if strategy is not None and strategy != "none":
192 raise ValueError(
193 f"Not implemented: Compression strategy {strategy}"
194 " for gzip is currently unsupported (but dpkg-deb does)"
195 )
196 return cmdline
199def _uncompressed_cmdline(
200 _unused_a: "Compression",
201 _unused_b: Optional[argparse.Namespace],
202) -> List[str]:
203 return ["cat"]
206class Compression:
207 def __init__(
208 self,
209 default_compression_level: int,
210 extension: str,
211 allowed_strategies: FrozenSet[str],
212 cmdline_builder: Callable[
213 ["Compression", Optional[argparse.Namespace]], List[str]
214 ],
215 ) -> None:
216 self.default_compression_level = default_compression_level
217 self.extension = extension
218 self.allowed_strategies = allowed_strategies
219 self.cmdline_builder = cmdline_builder
221 def __repr__(self) -> str:
222 return f"<{self.__class__.__name__} {self.extension}>"
224 def effective_compression_level(
225 self, parsed_args: Optional[argparse.Namespace]
226 ) -> int:
227 if parsed_args and parsed_args.compression_level is not None: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true
228 return cast("int", parsed_args.compression_level)
229 return self.default_compression_level
231 def as_cmdline(self, parsed_args: Optional[argparse.Namespace]) -> List[str]:
232 return self.cmdline_builder(self, parsed_args)
234 def with_extension(self, filename: str) -> str:
235 return filename + self.extension
238COMPRESSIONS = {
239 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline),
240 "gzip": Compression(
241 9,
242 ".gz",
243 frozenset({"none", "filtered", "huffman", "rle", "fixed"}),
244 _gzip_cmdline,
245 ),
246 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline),
247}
250def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace:
251 if (
252 parsed_args.compression_level == 0
253 and parsed_args.compression_algorithm == "gzip"
254 ):
255 print(
256 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)"
257 )
258 setattr(parsed_args, "compression_algorithm", "none")
260 compression = COMPRESSIONS[parsed_args.compression_algorithm]
261 strategy = parsed_args.compression_strategy
262 if strategy is not None and strategy not in compression.allowed_strategies:
263 _error(
264 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy'
265 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}'
266 )
267 return parsed_args
270def parse_args() -> argparse.Namespace:
271 try:
272 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"])
273 except (KeyError, ValueError):
274 compression_level_default = None
276 try:
277 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"]
278 except (KeyError, ValueError):
279 compression_type = "xz"
281 try:
282 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"])
283 except (KeyError, ValueError):
284 threads_max = None
286 description = textwrap.dedent(
287 """\
288 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support
290 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as
291 soon as "dpkg-deb -b" grows support for the relevant features.
293 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package
294 without requiring root even for static ownership. It is a temporary stand-in for
295 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest.
297 The tool operates on an internal JSON based manifest for now, because it was faster
298 than building an mtree parser (which is the format that dpkg will likely end up
299 using).
301 As the tool is not meant to be used directly, it is full of annoying paper cuts that
302 I refuse to fix or maintain. Use the high level tool instead.
304 """
305 )
307 parser = ColorizedArgumentParser(
308 description=description,
309 formatter_class=argparse.RawDescriptionHelpFormatter,
310 allow_abbrev=False,
311 prog=program_name(),
312 )
313 parser.add_argument("--version", action="version", version=__version__)
314 parser.add_argument(
315 "package_root_dir",
316 metavar="PACKAGE_ROOT_DIR",
317 help="Root directory of the package. Must contain a DEBIAN directory",
318 )
319 parser.add_argument(
320 "package_output_path",
321 metavar="PATH",
322 help="Path where the package should be placed. If it is directory,"
323 " the base name will be determined from the package metadata",
324 )
326 parser.add_argument(
327 "--intermediate-package-manifest",
328 dest="package_manifest",
329 metavar="JSON_FILE",
330 action="store",
331 default=None,
332 help="INTERMEDIATE package manifest (JSON!)",
333 )
334 parser.add_argument(
335 "--root-owner-group",
336 dest="root_owner_group",
337 action="store_true",
338 help="Ignored. Accepted for compatibility with dpkg-deb -b",
339 )
340 parser.add_argument(
341 "-b",
342 "--build",
343 dest="build_param",
344 action="store_true",
345 help="Ignored. Accepted for compatibility with dpkg-deb",
346 )
347 parser.add_argument(
348 "--source-date-epoch",
349 dest="source_date_epoch",
350 action="store",
351 type=int,
352 default=None,
353 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable",
354 )
355 parser.add_argument(
356 "-Z",
357 dest="compression_algorithm",
358 choices=COMPRESSIONS,
359 default=compression_type,
360 help="The compression algorithm to be used",
361 )
362 parser.add_argument(
363 "-z",
364 dest="compression_level",
365 metavar="{0-9}",
366 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
367 default=compression_level_default,
368 type=int,
369 help="The compression level to be used",
370 )
371 parser.add_argument(
372 "-S",
373 dest="compression_strategy",
374 # We have a different default for xz when strategy is unset and we are building a udeb
375 action="store",
376 default=None,
377 help="The compression algorithm to be used. Concrete values depend on the compression"
378 ' algorithm, but the value "none" is always allowed',
379 )
380 parser.add_argument(
381 "--uniform-compression",
382 dest="uniform_compression",
383 action="store_true",
384 default=True,
385 help="Whether to use the same compression for the control.tar and the data.tar."
386 " The default is to use uniform compression.",
387 )
388 parser.add_argument(
389 "--no-uniform-compression",
390 dest="uniform_compression",
391 action="store_false",
392 default=True,
393 help="Disable uniform compression (see --uniform-compression)",
394 )
395 parser.add_argument(
396 "--threads-max",
397 dest="threads_max",
398 default=threads_max,
399 # TODO: Support this properly
400 type=int,
401 help="Ignored; accepted for compatibility",
402 )
403 parser.add_argument(
404 "-d",
405 "--debug",
406 dest="debug_mode",
407 action="store_true",
408 default=False,
409 help="Enable debug logging and raw stack traces on errors",
410 )
412 parsed_args = parser.parse_args()
413 parsed_args = _normalize_compression_args(parsed_args)
415 return parsed_args
418def _ctrl_member(
419 member_path: str,
420 fs_path: Optional[str] = None,
421 path_type: PathType = PathType.FILE,
422 mode: int = 0o644,
423 mtime: int = 0,
424) -> TarMember:
425 if fs_path is None: 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true
426 assert member_path.startswith("./")
427 fs_path = "DEBIAN" + member_path[1:]
428 return TarMember(
429 member_path=member_path,
430 path_type=path_type,
431 fs_path=fs_path,
432 mode=mode,
433 owner="root",
434 uid=0,
435 group="root",
436 gid=0,
437 mtime=mtime,
438 )
441CTRL_MEMBER_SCRIPTS = {
442 "postinst",
443 "preinst",
444 "postrm",
445 "prerm",
446 "config",
447 "isinstallable",
448}
451def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]:
452 debian_root = os.path.join(package_root_dir, "DEBIAN")
453 dir_st = os.stat(debian_root)
454 dir_mtime = int(dir_st.st_mtime)
455 yield _ctrl_member(
456 "./",
457 debian_root,
458 path_type=PathType.DIRECTORY,
459 mode=0o0755,
460 mtime=min(mtime, dir_mtime),
461 )
462 with os.scandir(debian_root) as dir_iter:
463 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")):
464 st = os.stat(ctrl_member)
465 if not stat.S_ISREG(st.st_mode): 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true
466 _error(
467 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!"
468 )
469 file_mtime = int(st.st_mtime)
470 yield _ctrl_member(
471 f"./{ctrl_member.name}",
472 path_type=PathType.FILE,
473 fs_path=ctrl_member.path,
474 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644,
475 mtime=min(mtime, file_mtime),
476 )
479def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]":
480 if manifest_path is None: 480 ↛ 481line 480 didn't jump to line 481, because the condition on line 480 was never true
481 _error(f"--intermediate-package-manifest is mandatory for now")
482 return TarMember.parse_intermediate_manifest(manifest_path)
485def main() -> None:
486 setup_logging()
487 parsed_args = parse_args()
488 root_dir: str = parsed_args.package_root_dir
489 output_path: str = parsed_args.package_output_path
490 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
492 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm]
493 data_compression_cmd = data_compression.as_cmdline(parsed_args)
494 if parsed_args.uniform_compression:
495 ctrl_compression = data_compression
496 ctrl_compression_cmd = data_compression_cmd
497 else:
498 ctrl_compression = COMPRESSIONS["gzip"]
499 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None)
501 if output_path.endswith("/") or os.path.isdir(output_path):
502 deb_file = os.path.join(
503 output_path,
504 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False),
505 )
506 else:
507 deb_file = output_path
509 pack(
510 deb_file,
511 ctrl_compression,
512 data_compression,
513 root_dir,
514 parsed_args.package_manifest,
515 mtime,
516 ctrl_compression_cmd,
517 data_compression_cmd,
518 prefer_raw_exceptions=not parsed_args.debug_mode,
519 )
522def pack(
523 deb_file: str,
524 ctrl_compression: Compression,
525 data_compression: Compression,
526 root_dir: str,
527 package_manifest: "Optional[str]",
528 mtime: int,
529 ctrl_compression_cmd: List[str],
530 data_compression_cmd: List[str],
531 prefer_raw_exceptions: bool = False,
532) -> None:
533 data_tar_members = parse_manifest(package_manifest)
534 members = [
535 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"),
536 ArMember(
537 ctrl_compression.with_extension("control.tar"),
538 mtime,
539 write_to_impl=generate_tar_file_member(
540 _ctrl_tar_members(root_dir, mtime),
541 ctrl_compression_cmd,
542 ),
543 ),
544 ArMember(
545 data_compression.with_extension("data.tar"),
546 mtime,
547 write_to_impl=generate_tar_file_member(
548 data_tar_members,
549 data_compression_cmd,
550 ),
551 ),
552 ]
553 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions)
556if __name__ == "__main__":
557 main()