Coverage for src/debputy/commands/deb_materialization.py: 9%

236 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1#!/usr/bin/python3 -B 

2import argparse 

3import collections 

4import contextlib 

5import json 

6import os 

7import subprocess 

8import sys 

9import tempfile 

10import textwrap 

11from datetime import datetime 

12from typing import Optional, List, Iterator, Dict, Tuple 

13 

14from debputy import DEBPUTY_ROOT_DIR 

15from debputy.intermediate_manifest import ( 

16 TarMember, 

17 PathType, 

18 output_intermediate_manifest, 

19 output_intermediate_manifest_to_fd, 

20) 

21from debputy.util import ( 

22 _error, 

23 _info, 

24 compute_output_filename, 

25 resolve_source_date_epoch, 

26 ColorizedArgumentParser, 

27 setup_logging, 

28 detect_fakeroot, 

29 print_command, 

30 program_name, 

31) 

32from debputy.version import __version__ 

33 

34 

35def parse_args() -> argparse.Namespace: 

36 description = textwrap.dedent( 

37 """\ 

38 This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling 

39 the deb from a materialization. 

40 

41 The tool is not intended to be run directly by end users. 

42 """ 

43 ) 

44 

45 parser = ColorizedArgumentParser( 

46 description=description, 

47 formatter_class=argparse.RawDescriptionHelpFormatter, 

48 allow_abbrev=False, 

49 prog=program_name(), 

50 ) 

51 

52 parser.add_argument("--version", action="version", version=__version__) 

53 

54 subparsers = parser.add_subparsers(dest="command", required=True) 

55 

56 materialize_deb_parser = subparsers.add_parser( 

57 "materialize-deb", 

58 allow_abbrev=False, 

59 help="Generate .deb/.udebs structure from a root directory and" 

60 " a *intermediate* debputy manifest", 

61 ) 

62 materialize_deb_parser.add_argument( 

63 "control_root_dir", 

64 metavar="control-root-dir", 

65 help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)", 

66 ) 

67 materialize_deb_parser.add_argument( 

68 "materialization_output", 

69 metavar="materialization_output", 

70 help="Where to place the resulting structure should be placed. Should not exist", 

71 ) 

72 materialize_deb_parser.add_argument( 

73 "--discard-existing-output", 

74 dest="discard_existing_output", 

75 default=False, 

76 action="store_true", 

77 help="If passed, then the output location may exist." 

78 " If it does, it will be *deleted*.", 

79 ) 

80 materialize_deb_parser.add_argument( 

81 "--source-date-epoch", 

82 dest="source_date_epoch", 

83 action="store", 

84 type=int, 

85 default=None, 

86 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ" 

87 " variable", 

88 ) 

89 materialize_deb_parser.add_argument( 

90 "--may-move-control-files", 

91 dest="may_move_control_files", 

92 action="store_true", 

93 default=False, 

94 help="Whether the command may optimize by moving (rather than copying) DEBIAN files", 

95 ) 

96 materialize_deb_parser.add_argument( 

97 "--may-move-data-files", 

98 dest="may_move_data_files", 

99 action="store_true", 

100 default=False, 

101 help="Whether the command may optimize by moving (rather than copying) when materializing", 

102 ) 

103 

104 materialize_deb_parser.add_argument( 

105 "--intermediate-package-manifest", 

106 dest="package_manifest", 

107 metavar="JSON_FILE", 

108 action="store", 

109 default=None, 

110 help="INTERMEDIATE package manifest (JSON!)", 

111 ) 

112 

113 materialize_deb_parser.add_argument( 

114 "--udeb", 

115 dest="udeb", 

116 default=False, 

117 action="store_true", 

118 help="Whether this is udeb package. Affects extension and default compression", 

119 ) 

120 

121 materialize_deb_parser.add_argument( 

122 "--build-method", 

123 dest="build_method", 

124 choices=["debputy", "dpkg-deb"], 

125 type=str, 

126 default=None, 

127 help="Immediately assemble the deb as well using the selected method", 

128 ) 

129 materialize_deb_parser.add_argument( 

130 "--assembled-deb-output", 

131 dest="assembled_deb_output", 

132 type=str, 

133 default=None, 

134 help="Where to place the resulting deb. Only applicable with --build-method", 

135 ) 

136 

137 # Added for "help only" - you cannot trigger this option in practice 

138 materialize_deb_parser.add_argument( 

139 "--", 

140 metavar="DPKG_DEB_ARGS", 

141 action="extend", 

142 nargs="+", 

143 dest="unused", 

144 help="Arguments to be passed to dpkg-deb" 

145 " (same as you might pass to dh_builddeb).", 

146 ) 

147 

148 build_deb_structure = subparsers.add_parser( 

149 "build-materialized-deb", 

150 allow_abbrev=False, 

151 help="Produce a .deb from a directory produced by the" 

152 " materialize-deb-structure command", 

153 ) 

154 build_deb_structure.add_argument( 

155 "materialized_deb_root_dir", 

156 metavar="materialized-deb-root-dir", 

157 help="The output directory of the materialize-deb-structure command", 

158 ) 

159 build_deb_structure.add_argument( 

160 "build_method", 

161 metavar="build-method", 

162 choices=["debputy", "dpkg-deb"], 

163 type=str, 

164 default="dpkg-deb", 

165 help="Which tool should assemble the deb", 

166 ) 

167 build_deb_structure.add_argument( 

168 "--output", type=str, default=None, help="Where to place the resulting deb" 

169 ) 

170 

171 argv = sys.argv 

172 try: 

173 i = argv.index("--") 

174 upstream_args = argv[i + 1 :] 

175 argv = argv[:i] 

176 except (IndexError, ValueError): 

177 upstream_args = [] 

178 parsed_args = parser.parse_args(argv[1:]) 

179 setattr(parsed_args, "upstream_args", upstream_args) 

180 

181 return parsed_args 

182 

183 

184def _run(cmd: List[str]) -> None: 

185 print_command(*cmd) 

186 subprocess.check_call(cmd) 

187 

188 

189def strip_path_prefix(member_path: str) -> str: 

190 if not member_path.startswith("./"): 

191 _error( 

192 f'Invalid manifest: "{member_path}" does not start with "./", but all paths should' 

193 ) 

194 return member_path[2:] 

195 

196 

197def _perform_data_tar_materialization( 

198 output_packaging_root: str, 

199 intermediate_manifest: List[TarMember], 

200 may_move_data_files: bool, 

201) -> List[Tuple[str, TarMember]]: 

202 start_time = datetime.now() 

203 replacement_manifest_paths = [] 

204 _info("Materializing data.tar part of the deb:") 

205 

206 directories = ["mkdir"] 

207 symlinks = [] 

208 bulk_copies: Dict[str, List[str]] = collections.defaultdict(list) 

209 copies = [] 

210 renames = [] 

211 

212 for tar_member in intermediate_manifest: 

213 member_path = strip_path_prefix(tar_member.member_path) 

214 new_fs_path = ( 

215 os.path.join("deb-root", member_path) if member_path else "deb-root" 

216 ) 

217 materialization_path = ( 

218 f"{output_packaging_root}/{member_path}" 

219 if member_path 

220 else output_packaging_root 

221 ) 

222 replacement_tar_member = tar_member 

223 materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/")) 

224 if tar_member.path_type == PathType.DIRECTORY: 

225 directories.append(materialization_path) 

226 elif tar_member.path_type == PathType.SYMLINK: 

227 symlinks.append((tar_member.link_target, materialization_path)) 

228 elif tar_member.fs_path is not None: 

229 if tar_member.link_target: 

230 # Not sure if hardlinks gets here yet as we do not support hardlinks 

231 _error("Internal error; hardlink not supported") 

232 

233 if may_move_data_files and tar_member.may_steal_fs_path: 

234 renames.append((tar_member.fs_path, materialization_path)) 

235 elif os.path.basename(tar_member.fs_path) == os.path.basename( 

236 materialization_path 

237 ): 

238 bulk_copies[materialization_parent_dir].append(tar_member.fs_path) 

239 else: 

240 copies.append((tar_member.fs_path, materialization_path)) 

241 else: 

242 _error(f"Internal error; unsupported path type {tar_member.path_type}") 

243 

244 if tar_member.fs_path is not None: 

245 replacement_tar_member = tar_member.clone_and_replace( 

246 fs_path=new_fs_path, may_steal_fs_path=False 

247 ) 

248 

249 replacement_manifest_paths.append( 

250 (materialization_path, replacement_tar_member) 

251 ) 

252 

253 if len(directories) > 1: 

254 _run(directories) 

255 

256 for dest_dir, files in bulk_copies.items(): 

257 cmd = ["cp", "--reflink=auto", "-t", dest_dir] 

258 cmd.extend(files) 

259 _run(cmd) 

260 

261 for source, dest in copies: 

262 _run(["cp", "--reflink=auto", source, dest]) 

263 

264 for source, dest in renames: 

265 print_command("mv", source, dest) 

266 os.rename(source, dest) 

267 

268 for link_target, link_path in symlinks: 

269 print_command("ln", "-s", link_target, link_path) 

270 os.symlink(link_target, link_path) 

271 

272 end_time = datetime.now() 

273 

274 _info(f"Materialization of data.tar finished, took: {end_time - start_time}") 

275 

276 return replacement_manifest_paths 

277 

278 

279def materialize_deb( 

280 control_root_dir: str, 

281 intermediate_manifest_path: Optional[str], 

282 source_date_epoch: int, 

283 dpkg_deb_options: List[str], 

284 is_udeb: bool, 

285 output_dir: str, 

286 may_move_control_files: bool, 

287 may_move_data_files: bool, 

288) -> None: 

289 if not os.path.isfile(f"{control_root_dir}/control"): 

290 _error( 

291 f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)' 

292 ) 

293 intermediate_manifest: List[TarMember] = parse_manifest(intermediate_manifest_path) 

294 

295 output_packaging_root = os.path.join(output_dir, "deb-root") 

296 os.mkdir(output_dir) 

297 

298 replacement_manifest_paths = _perform_data_tar_materialization( 

299 output_packaging_root, intermediate_manifest, may_move_data_files 

300 ) 

301 for materialization_path, tar_member in reversed(replacement_manifest_paths): 

302 # TODO: Hardlinks should probably skip these commands 

303 if tar_member.path_type != PathType.SYMLINK: 

304 os.chmod(materialization_path, tar_member.mode, follow_symlinks=False) 

305 os.utime( 

306 materialization_path, 

307 (tar_member.mtime, tar_member.mtime), 

308 follow_symlinks=False, 

309 ) 

310 

311 materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN" 

312 if may_move_control_files: 

313 print_command("mv", control_root_dir, materialized_ctrl_dir) 

314 os.rename(control_root_dir, materialized_ctrl_dir) 

315 else: 

316 os.mkdir(materialized_ctrl_dir) 

317 copy_cmd = ["cp", "-a"] 

318 copy_cmd.extend( 

319 os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir) 

320 ) 

321 copy_cmd.append(materialized_ctrl_dir) 

322 _run(copy_cmd) 

323 

324 output_intermediate_manifest( 

325 os.path.join(output_dir, "deb-structure-intermediate-manifest.json"), 

326 [t[1] for t in replacement_manifest_paths], 

327 ) 

328 

329 with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd: 

330 serial_format = { 

331 "env": { 

332 "SOURCE_DATE_EPOCH": str(source_date_epoch), 

333 "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get( 

334 "DPKG_DEB_COMPRESSOR_LEVEL" 

335 ), 

336 "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"), 

337 "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"), 

338 }, 

339 "cli": {"dpkg-deb": dpkg_deb_options}, 

340 "udeb": is_udeb, 

341 } 

342 json.dump(serial_format, fd) 

343 

344 

345def apply_fs_metadata( 

346 materialized_path: str, 

347 tar_member: TarMember, 

348 apply_ownership: bool, 

349 is_using_fakeroot: bool, 

350) -> None: 

351 if apply_ownership: 

352 os.chown( 

353 materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False 

354 ) 

355 # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata. 

356 # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally 

357 # also prevents that problem. 

358 if tar_member.path_type != PathType.SYMLINK: 

359 os.chmod(materialized_path, tar_member.mode, follow_symlinks=False) 

360 os.utime( 

361 materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False 

362 ) 

363 if is_using_fakeroot: 

364 st = os.stat(materialized_path, follow_symlinks=False) 

365 if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid: 

366 _error( 

367 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.' 

368 " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python" 

369 " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)" 

370 ) 

371 

372 

373def _dpkg_deb_root_requirements( 

374 intermediate_manifest: List[TarMember], 

375) -> Tuple[List[str], bool, bool]: 

376 needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest) 

377 if needs_root: 

378 if os.getuid() != 0: 

379 _error( 

380 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents' 

381 ) 

382 is_using_fakeroot = detect_fakeroot() 

383 deb_cmd = ["dpkg-deb"] 

384 _info("Applying ownership, mode, and utime from the intermediate manifest...") 

385 else: 

386 # fakeroot does not matter in this case 

387 is_using_fakeroot = False 

388 deb_cmd = ["dpkg-deb", "--root-owner-group"] 

389 _info("Applying mode and utime from the intermediate manifest...") 

390 return deb_cmd, needs_root, is_using_fakeroot 

391 

392 

393@contextlib.contextmanager 

394def maybe_with_materialized_manifest( 

395 content: Optional[List[TarMember]], 

396) -> Iterator[Optional[str]]: 

397 if content is not None: 

398 with tempfile.NamedTemporaryFile( 

399 prefix="debputy-mat-build", 

400 mode="w+t", 

401 suffix=".json", 

402 encoding="utf-8", 

403 ) as fd: 

404 output_intermediate_manifest_to_fd(fd, content) 

405 fd.flush() 

406 yield fd.name 

407 else: 

408 yield None 

409 

410 

411def _prep_assembled_deb_output_path( 

412 output_path: Optional[str], 

413 materialized_deb_structure: str, 

414 deb_root: str, 

415 method: str, 

416 is_udeb: bool, 

417) -> str: 

418 if output_path is None: 

419 ext = "udeb" if is_udeb else "deb" 

420 output_dir = os.path.join(materialized_deb_structure, "output") 

421 if not os.path.isdir(output_dir): 

422 os.mkdir(output_dir) 

423 output = os.path.join(output_dir, f"{method}.{ext}") 

424 elif os.path.isdir(output_path): 

425 output = os.path.join( 

426 output_path, 

427 compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb), 

428 ) 

429 else: 

430 output = output_path 

431 return output 

432 

433 

434def _apply_env(env: Dict[str, Optional[str]]) -> None: 

435 for name, value in env.items(): 

436 if value is not None: 

437 os.environ[name] = value 

438 else: 

439 try: 

440 del os.environ[name] 

441 except KeyError: 

442 pass 

443 

444 

445def assemble_deb( 

446 materialized_deb_structure: str, 

447 method: str, 

448 output_path: Optional[str], 

449 combined_materialization_and_assembly: bool, 

450) -> None: 

451 deb_root = os.path.join(materialized_deb_structure, "deb-root") 

452 

453 with open(os.path.join(materialized_deb_structure, "env-and-cli.json"), "r") as fd: 

454 serial_format = json.load(fd) 

455 

456 env = serial_format.get("env") or {} 

457 cli = serial_format.get("cli") or {} 

458 is_udeb = serial_format.get("udeb") 

459 source_date_epoch = env.get("SOURCE_DATE_EPOCH") 

460 dpkg_deb_options = cli.get("dpkg-deb") or [] 

461 intermediate_manifest_path = os.path.join( 

462 materialized_deb_structure, "deb-structure-intermediate-manifest.json" 

463 ) 

464 original_intermediate_manifest = TarMember.parse_intermediate_manifest( 

465 intermediate_manifest_path 

466 ) 

467 _info( 

468 "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..." 

469 ) 

470 intermediate_manifest = [ 

471 ( 

472 tar_member.clone_and_replace( 

473 fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path) 

474 ) 

475 if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/") 

476 else tar_member 

477 ) 

478 for tar_member in original_intermediate_manifest 

479 ] 

480 materialized_manifest = None 

481 if method == "debputy": 

482 materialized_manifest = intermediate_manifest 

483 

484 if source_date_epoch is None: 

485 _error( 

486 "Cannot reproduce the deb. No source date epoch provided in the materialized deb root." 

487 ) 

488 _apply_env(env) 

489 

490 output = _prep_assembled_deb_output_path( 

491 output_path, 

492 materialized_deb_structure, 

493 deb_root, 

494 method, 

495 is_udeb, 

496 ) 

497 

498 with maybe_with_materialized_manifest(materialized_manifest) as tmp_file: 

499 if method == "dpkg-deb": 

500 deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements( 

501 intermediate_manifest 

502 ) 

503 if needs_root or not combined_materialization_and_assembly: 

504 for tar_member in reversed(intermediate_manifest): 

505 p = os.path.join( 

506 deb_root, strip_path_prefix(tar_member.member_path) 

507 ) 

508 apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot) 

509 elif method == "debputy": 

510 deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py") 

511 assert tmp_file is not None 

512 deb_cmd = [ 

513 deb_packer, 

514 "--intermediate-package-manifest", 

515 tmp_file, 

516 "--source-date-epoch", 

517 source_date_epoch, 

518 ] 

519 else: 

520 _error(f"Internal error: Unsupported assembly method: {method}") 

521 

522 if is_udeb: 

523 deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"]) 

524 deb_cmd.extend(dpkg_deb_options) 

525 deb_cmd.extend(["--build", deb_root, output]) 

526 start_time = datetime.now() 

527 _run(deb_cmd) 

528 end_time = datetime.now() 

529 _info(f" - assembly command took {end_time - start_time}") 

530 

531 

532def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]": 

533 if manifest_path is None: 

534 _error("--intermediate-package-manifest is mandatory for now") 

535 return TarMember.parse_intermediate_manifest(manifest_path) 

536 

537 

538def main() -> None: 

539 setup_logging() 

540 parsed_args = parse_args() 

541 if parsed_args.command == "materialize-deb": 

542 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) 

543 dpkg_deb_args = parsed_args.upstream_args or [] 

544 output_dir = parsed_args.materialization_output 

545 if os.path.exists(output_dir): 

546 if not parsed_args.discard_existing_output: 

547 _error( 

548 "The output path already exists. Please either choose a non-existing path, delete the path" 

549 " or use --discard-existing-output (to have this command remove it as necessary)." 

550 ) 

551 _info( 

552 f'Removing existing path "{output_dir}" as requested by --discard-existing-output' 

553 ) 

554 _run(["rm", "-fr", output_dir]) 

555 

556 materialize_deb( 

557 parsed_args.control_root_dir, 

558 parsed_args.package_manifest, 

559 mtime, 

560 dpkg_deb_args, 

561 parsed_args.udeb, 

562 output_dir, 

563 parsed_args.may_move_control_files, 

564 parsed_args.may_move_data_files, 

565 ) 

566 

567 if parsed_args.build_method is not None: 

568 assemble_deb( 

569 output_dir, 

570 parsed_args.build_method, 

571 parsed_args.assembled_deb_output, 

572 True, 

573 ) 

574 

575 elif parsed_args.command == "build-materialized-deb": 

576 assemble_deb( 

577 parsed_args.materialized_deb_root_dir, 

578 parsed_args.build_method, 

579 parsed_args.output, 

580 False, 

581 ) 

582 else: 

583 _error(f'Internal error: Unimplemented command "{parsed_args.command}"') 

584 

585 

586if __name__ == "__main__": 

587 main()