Coverage for src/debputy/commands/deb_packer.py: 58%

197 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1#!/usr/bin/python3 -B 

2import argparse 

3import errno 

4import operator 

5import os 

6import stat 

7import subprocess 

8import tarfile 

9import textwrap 

10from typing import Optional, List, FrozenSet, Iterable, Callable, BinaryIO, cast 

11 

12from debputy.intermediate_manifest import TarMember, PathType 

13from debputy.util import ( 

14 _error, 

15 compute_output_filename, 

16 resolve_source_date_epoch, 

17 ColorizedArgumentParser, 

18 setup_logging, 

19 program_name, 

20 assume_not_none, 

21) 

22from debputy.version import __version__ 

23 

24 

25# AR header / start of a deb file for reference 

26# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b| 

27# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736| 

28# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 | 

29# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 | 

30# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.| 

31# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736| 

32# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 | 

33# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 | 

34# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F| 

35 

36 

37class ArMember: 

38 def __init__( 

39 self, 

40 name: str, 

41 mtime: int, 

42 fixed_binary: Optional[bytes] = None, 

43 write_to_impl: Optional[Callable[[BinaryIO], None]] = None, 

44 ) -> None: 

45 self.name = name 

46 self._mtime = mtime 

47 self._write_to_impl = write_to_impl 

48 self.fixed_binary = fixed_binary 

49 

50 @property 

51 def is_fixed_binary(self) -> bool: 

52 return self.fixed_binary is not None 

53 

54 @property 

55 def mtime(self) -> int: 

56 return self.mtime 

57 

58 def write_to(self, fd: BinaryIO) -> None: 

59 writer = self._write_to_impl 

60 assert writer is not None 

61 writer(fd) 

62 

63 

64AR_HEADER_LEN = 60 

65AR_HEADER = b" " * AR_HEADER_LEN 

66 

67 

68def write_header( 

69 fd: BinaryIO, 

70 member: ArMember, 

71 member_len: int, 

72 mtime: int, 

73) -> None: 

74 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % ( 

75 member.name.encode("ascii"), 

76 mtime, 

77 member_len, 

78 ) 

79 fd.write(header) 

80 

81 

82def generate_ar_archive( 

83 output_filename: str, 

84 mtime: int, 

85 members: Iterable[ArMember], 

86 prefer_raw_exceptions: bool, 

87) -> None: 

88 try: 

89 with open(output_filename, "wb", buffering=0) as fd: 

90 fd.write(b"!<arch>\n") 

91 for member in members: 

92 if member.is_fixed_binary: 

93 fixed_binary = assume_not_none(member.fixed_binary) 

94 write_header(fd, member, len(fixed_binary), mtime) 

95 fd.write(fixed_binary) 

96 else: 

97 header_pos = fd.tell() 

98 fd.write(AR_HEADER) 

99 member.write_to(fd) 

100 current_pos = fd.tell() 

101 fd.seek(header_pos, os.SEEK_SET) 

102 content_len = current_pos - header_pos - AR_HEADER_LEN 

103 assert content_len >= 0 

104 write_header(fd, member, content_len, mtime) 

105 fd.seek(current_pos, os.SEEK_SET) 

106 except OSError as e: 

107 if prefer_raw_exceptions: 

108 raise 

109 if e.errno == errno.ENOSPC: 

110 _error( 

111 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}" 

112 ) 

113 elif e.errno == errno.EIO: 

114 _error( 

115 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}" 

116 ) 

117 elif e.errno == errno.EROFS: 

118 _error( 

119 f"Unable to write {output_filename}. The file system is read-only: {str(e)}" 

120 ) 

121 raise 

122 print(f"Generated {output_filename}") 

123 

124 

125def _generate_tar_file( 

126 tar_members: Iterable[TarMember], 

127 compression_cmd: List[str], 

128 write_to: BinaryIO, 

129) -> None: 

130 with ( 

131 subprocess.Popen( 

132 compression_cmd, stdin=subprocess.PIPE, stdout=write_to 

133 ) as compress_proc, 

134 tarfile.open( 

135 mode="w|", 

136 fileobj=compress_proc.stdin, 

137 format=tarfile.GNU_FORMAT, 

138 errorlevel=1, 

139 ) as tar_fd, 

140 ): 

141 for tar_member in tar_members: 

142 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd) 

143 if tar_member.path_type == PathType.FILE: 

144 with open(assume_not_none(tar_member.fs_path), "rb") as mfd: 

145 tar_fd.addfile(tar_info, fileobj=mfd) 

146 else: 

147 tar_fd.addfile(tar_info) 

148 compress_proc.wait() 

149 if compress_proc.returncode != 0: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true

150 _error( 

151 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}" 

152 ) 

153 

154 

155def generate_tar_file_member( 

156 tar_members: Iterable[TarMember], 

157 compression_cmd: List[str], 

158) -> Callable[[BinaryIO], None]: 

159 def _impl(fd: BinaryIO) -> None: 

160 _generate_tar_file( 

161 tar_members, 

162 compression_cmd, 

163 fd, 

164 ) 

165 

166 return _impl 

167 

168 

169def _xz_cmdline( 

170 compression_rule: "Compression", 

171 parsed_args: Optional[argparse.Namespace], 

172) -> List[str]: 

173 compression_level = compression_rule.effective_compression_level(parsed_args) 

174 cmdline = ["xz", "-T2", "-" + str(compression_level)] 

175 strategy = None if parsed_args is None else parsed_args.compression_strategy 

176 if strategy is None: 176 ↛ 178line 176 didn't jump to line 178, because the condition on line 176 was never false

177 strategy = "none" 

178 if strategy != "none": 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true

179 cmdline.append("--" + strategy) 

180 cmdline.append("--no-adjust") 

181 return cmdline 

182 

183 

184def _gzip_cmdline( 

185 compression_rule: "Compression", 

186 parsed_args: Optional[argparse.Namespace], 

187) -> List[str]: 

188 compression_level = compression_rule.effective_compression_level(parsed_args) 

189 cmdline = ["gzip", "-n" + str(compression_level)] 

190 strategy = None if parsed_args is None else parsed_args.compression_strategy 

191 if strategy is not None and strategy != "none": 

192 raise ValueError( 

193 f"Not implemented: Compression strategy {strategy}" 

194 " for gzip is currently unsupported (but dpkg-deb does)" 

195 ) 

196 return cmdline 

197 

198 

199def _uncompressed_cmdline( 

200 _unused_a: "Compression", 

201 _unused_b: Optional[argparse.Namespace], 

202) -> List[str]: 

203 return ["cat"] 

204 

205 

206class Compression: 

207 def __init__( 

208 self, 

209 default_compression_level: int, 

210 extension: str, 

211 allowed_strategies: FrozenSet[str], 

212 cmdline_builder: Callable[ 

213 ["Compression", Optional[argparse.Namespace]], List[str] 

214 ], 

215 ) -> None: 

216 self.default_compression_level = default_compression_level 

217 self.extension = extension 

218 self.allowed_strategies = allowed_strategies 

219 self.cmdline_builder = cmdline_builder 

220 

221 def __repr__(self) -> str: 

222 return f"<{self.__class__.__name__} {self.extension}>" 

223 

224 def effective_compression_level( 

225 self, parsed_args: Optional[argparse.Namespace] 

226 ) -> int: 

227 if parsed_args and parsed_args.compression_level is not None: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 return cast("int", parsed_args.compression_level) 

229 return self.default_compression_level 

230 

231 def as_cmdline(self, parsed_args: Optional[argparse.Namespace]) -> List[str]: 

232 return self.cmdline_builder(self, parsed_args) 

233 

234 def with_extension(self, filename: str) -> str: 

235 return filename + self.extension 

236 

237 

238COMPRESSIONS = { 

239 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline), 

240 "gzip": Compression( 

241 9, 

242 ".gz", 

243 frozenset({"none", "filtered", "huffman", "rle", "fixed"}), 

244 _gzip_cmdline, 

245 ), 

246 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline), 

247} 

248 

249 

250def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace: 

251 if ( 

252 parsed_args.compression_level == 0 

253 and parsed_args.compression_algorithm == "gzip" 

254 ): 

255 print( 

256 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)" 

257 ) 

258 setattr(parsed_args, "compression_algorithm", "none") 

259 

260 compression = COMPRESSIONS[parsed_args.compression_algorithm] 

261 strategy = parsed_args.compression_strategy 

262 if strategy is not None and strategy not in compression.allowed_strategies: 

263 _error( 

264 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy' 

265 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}' 

266 ) 

267 return parsed_args 

268 

269 

270def parse_args() -> argparse.Namespace: 

271 try: 

272 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"]) 

273 except (KeyError, ValueError): 

274 compression_level_default = None 

275 

276 try: 

277 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"] 

278 except (KeyError, ValueError): 

279 compression_type = "xz" 

280 

281 try: 

282 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"]) 

283 except (KeyError, ValueError): 

284 threads_max = None 

285 

286 description = textwrap.dedent( 

287 """\ 

288 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support 

289 

290 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as 

291 soon as "dpkg-deb -b" grows support for the relevant features. 

292 

293 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package 

294 without requiring root even for static ownership. It is a temporary stand-in for 

295 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest. 

296 

297 The tool operates on an internal JSON based manifest for now, because it was faster 

298 than building an mtree parser (which is the format that dpkg will likely end up 

299 using). 

300 

301 As the tool is not meant to be used directly, it is full of annoying paper cuts that 

302 I refuse to fix or maintain. Use the high level tool instead. 

303 

304 """ 

305 ) 

306 

307 parser = ColorizedArgumentParser( 

308 description=description, 

309 formatter_class=argparse.RawDescriptionHelpFormatter, 

310 allow_abbrev=False, 

311 prog=program_name(), 

312 ) 

313 parser.add_argument("--version", action="version", version=__version__) 

314 parser.add_argument( 

315 "package_root_dir", 

316 metavar="PACKAGE_ROOT_DIR", 

317 help="Root directory of the package. Must contain a DEBIAN directory", 

318 ) 

319 parser.add_argument( 

320 "package_output_path", 

321 metavar="PATH", 

322 help="Path where the package should be placed. If it is directory," 

323 " the base name will be determined from the package metadata", 

324 ) 

325 

326 parser.add_argument( 

327 "--intermediate-package-manifest", 

328 dest="package_manifest", 

329 metavar="JSON_FILE", 

330 action="store", 

331 default=None, 

332 help="INTERMEDIATE package manifest (JSON!)", 

333 ) 

334 parser.add_argument( 

335 "--root-owner-group", 

336 dest="root_owner_group", 

337 action="store_true", 

338 help="Ignored. Accepted for compatibility with dpkg-deb -b", 

339 ) 

340 parser.add_argument( 

341 "-b", 

342 "--build", 

343 dest="build_param", 

344 action="store_true", 

345 help="Ignored. Accepted for compatibility with dpkg-deb", 

346 ) 

347 parser.add_argument( 

348 "--source-date-epoch", 

349 dest="source_date_epoch", 

350 action="store", 

351 type=int, 

352 default=None, 

353 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable", 

354 ) 

355 parser.add_argument( 

356 "-Z", 

357 dest="compression_algorithm", 

358 choices=COMPRESSIONS, 

359 default=compression_type, 

360 help="The compression algorithm to be used", 

361 ) 

362 parser.add_argument( 

363 "-z", 

364 dest="compression_level", 

365 metavar="{0-9}", 

366 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 

367 default=compression_level_default, 

368 type=int, 

369 help="The compression level to be used", 

370 ) 

371 parser.add_argument( 

372 "-S", 

373 dest="compression_strategy", 

374 # We have a different default for xz when strategy is unset and we are building a udeb 

375 action="store", 

376 default=None, 

377 help="The compression algorithm to be used. Concrete values depend on the compression" 

378 ' algorithm, but the value "none" is always allowed', 

379 ) 

380 parser.add_argument( 

381 "--uniform-compression", 

382 dest="uniform_compression", 

383 action="store_true", 

384 default=True, 

385 help="Whether to use the same compression for the control.tar and the data.tar." 

386 " The default is to use uniform compression.", 

387 ) 

388 parser.add_argument( 

389 "--no-uniform-compression", 

390 dest="uniform_compression", 

391 action="store_false", 

392 default=True, 

393 help="Disable uniform compression (see --uniform-compression)", 

394 ) 

395 parser.add_argument( 

396 "--threads-max", 

397 dest="threads_max", 

398 default=threads_max, 

399 # TODO: Support this properly 

400 type=int, 

401 help="Ignored; accepted for compatibility", 

402 ) 

403 parser.add_argument( 

404 "-d", 

405 "--debug", 

406 dest="debug_mode", 

407 action="store_true", 

408 default=False, 

409 help="Enable debug logging and raw stack traces on errors", 

410 ) 

411 

412 parsed_args = parser.parse_args() 

413 parsed_args = _normalize_compression_args(parsed_args) 

414 

415 return parsed_args 

416 

417 

418def _ctrl_member( 

419 member_path: str, 

420 fs_path: Optional[str] = None, 

421 path_type: PathType = PathType.FILE, 

422 mode: int = 0o644, 

423 mtime: int = 0, 

424) -> TarMember: 

425 if fs_path is None: 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true

426 assert member_path.startswith("./") 

427 fs_path = "DEBIAN" + member_path[1:] 

428 return TarMember( 

429 member_path=member_path, 

430 path_type=path_type, 

431 fs_path=fs_path, 

432 mode=mode, 

433 owner="root", 

434 uid=0, 

435 group="root", 

436 gid=0, 

437 mtime=mtime, 

438 ) 

439 

440 

441CTRL_MEMBER_SCRIPTS = { 

442 "postinst", 

443 "preinst", 

444 "postrm", 

445 "prerm", 

446 "config", 

447 "isinstallable", 

448} 

449 

450 

451def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]: 

452 debian_root = os.path.join(package_root_dir, "DEBIAN") 

453 dir_st = os.stat(debian_root) 

454 dir_mtime = int(dir_st.st_mtime) 

455 yield _ctrl_member( 

456 "./", 

457 debian_root, 

458 path_type=PathType.DIRECTORY, 

459 mode=0o0755, 

460 mtime=min(mtime, dir_mtime), 

461 ) 

462 with os.scandir(debian_root) as dir_iter: 

463 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")): 

464 st = os.stat(ctrl_member) 

465 if not stat.S_ISREG(st.st_mode): 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true

466 _error( 

467 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!" 

468 ) 

469 file_mtime = int(st.st_mtime) 

470 yield _ctrl_member( 

471 f"./{ctrl_member.name}", 

472 path_type=PathType.FILE, 

473 fs_path=ctrl_member.path, 

474 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644, 

475 mtime=min(mtime, file_mtime), 

476 ) 

477 

478 

479def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]": 

480 if manifest_path is None: 480 ↛ 481line 480 didn't jump to line 481, because the condition on line 480 was never true

481 _error(f"--intermediate-package-manifest is mandatory for now") 

482 return TarMember.parse_intermediate_manifest(manifest_path) 

483 

484 

485def main() -> None: 

486 setup_logging() 

487 parsed_args = parse_args() 

488 root_dir: str = parsed_args.package_root_dir 

489 output_path: str = parsed_args.package_output_path 

490 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) 

491 

492 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm] 

493 data_compression_cmd = data_compression.as_cmdline(parsed_args) 

494 if parsed_args.uniform_compression: 

495 ctrl_compression = data_compression 

496 ctrl_compression_cmd = data_compression_cmd 

497 else: 

498 ctrl_compression = COMPRESSIONS["gzip"] 

499 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None) 

500 

501 if output_path.endswith("/") or os.path.isdir(output_path): 

502 deb_file = os.path.join( 

503 output_path, 

504 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False), 

505 ) 

506 else: 

507 deb_file = output_path 

508 

509 pack( 

510 deb_file, 

511 ctrl_compression, 

512 data_compression, 

513 root_dir, 

514 parsed_args.package_manifest, 

515 mtime, 

516 ctrl_compression_cmd, 

517 data_compression_cmd, 

518 prefer_raw_exceptions=not parsed_args.debug_mode, 

519 ) 

520 

521 

522def pack( 

523 deb_file: str, 

524 ctrl_compression: Compression, 

525 data_compression: Compression, 

526 root_dir: str, 

527 package_manifest: "Optional[str]", 

528 mtime: int, 

529 ctrl_compression_cmd: List[str], 

530 data_compression_cmd: List[str], 

531 prefer_raw_exceptions: bool = False, 

532) -> None: 

533 data_tar_members = parse_manifest(package_manifest) 

534 members = [ 

535 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"), 

536 ArMember( 

537 ctrl_compression.with_extension("control.tar"), 

538 mtime, 

539 write_to_impl=generate_tar_file_member( 

540 _ctrl_tar_members(root_dir, mtime), 

541 ctrl_compression_cmd, 

542 ), 

543 ), 

544 ArMember( 

545 data_compression.with_extension("data.tar"), 

546 mtime, 

547 write_to_impl=generate_tar_file_member( 

548 data_tar_members, 

549 data_compression_cmd, 

550 ), 

551 ), 

552 ] 

553 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions) 

554 

555 

556if __name__ == "__main__": 

557 main()