Coverage for src/debputy/highlevel_manifest.py: 67%

801 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5from contextlib import suppress 

6from dataclasses import dataclass, field 

7from typing import ( 

8 List, 

9 Dict, 

10 Iterable, 

11 Mapping, 

12 Any, 

13 Union, 

14 Optional, 

15 TypeVar, 

16 Generic, 

17 cast, 

18 Set, 

19 Tuple, 

20 Sequence, 

21 FrozenSet, 

22) 

23 

24from debian.debian_support import DpkgArchTable 

25from ._deb_options_profiles import DebBuildOptionsAndProfiles 

26from ._manifest_constants import * 

27from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

28from .builtin_manifest_rules import builtin_mode_normalization_rules 

29from .debhelper_emulation import ( 

30 dhe_dbgsym_root_dir, 

31 assert_no_dbgsym_migration, 

32 read_dbgsym_file, 

33) 

34from .exceptions import DebputySubstitutionError 

35from .filesystem_scan import FSPath, FSRootDir, FSROOverlay 

36from .installations import ( 

37 InstallRule, 

38 SourcePathMatcher, 

39 PathAlreadyInstalledOrDiscardedError, 

40 NoMatchForInstallPatternError, 

41 InstallRuleContext, 

42 BinaryPackageInstallRuleContext, 

43 InstallSearchDirContext, 

44 SearchDir, 

45) 

46from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

47from .maintscript_snippet import ( 

48 DpkgMaintscriptHelperCommand, 

49 MaintscriptSnippetContainer, 

50) 

51from .manifest_conditions import ConditionContext 

52from .manifest_parser.base_types import FileSystemMatchRule, FileSystemExactMatchRule 

53from .manifest_parser.util import AttributePath 

54from .packager_provided_files import PackagerProvidedFile 

55from .packages import BinaryPackage, SourcePackage 

56from .plugin.api.feature_set import PluginProvidedFeatureSet 

57from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

58from .plugin.api.impl_types import ( 

59 PackageProcessingContextProvider, 

60 PackageDataTable, 

61) 

62from .plugin.api.spec import FlushableSubstvars, VirtualPath 

63from .plugin.debputy.binary_package_rules import ServiceRule 

64from .substitution import Substitution 

65from .transformation_rules import ( 

66 TransformationRule, 

67 ModeNormalizationTransformationRule, 

68 NormalizeShebangLineTransformation, 

69) 

70from .util import ( 

71 _error, 

72 _warn, 

73 debian_policy_normalize_symlink_target, 

74 generated_content_dir, 

75 _info, 

76) 

77from .yaml import MANIFEST_YAML 

78from .yaml.compat import CommentedMap, CommentedSeq 

79 

80 

81@dataclass(slots=True) 

82class DbgsymInfo: 

83 dbgsym_fs_root: FSPath 

84 dbgsym_ids: List[str] 

85 

86 

87@dataclass(slots=True, frozen=True) 

88class BinaryPackageData: 

89 source_package: SourcePackage 

90 binary_package: BinaryPackage 

91 binary_staging_root_dir: str 

92 control_output_dir: Optional[str] 

93 fs_root: FSPath 

94 substvars: FlushableSubstvars 

95 package_metadata_context: PackageProcessingContextProvider 

96 ctrl_creator: BinaryCtrlAccessorProviderCreator 

97 dbgsym_info: DbgsymInfo 

98 

99 

100@dataclass(slots=True) 

101class PackageTransformationDefinition: 

102 binary_package: BinaryPackage 

103 substitution: Substitution 

104 is_auto_generated_package: bool 

105 binary_version: Optional[str] = None 

106 search_dirs: Optional[List[FileSystemExactMatchRule]] = None 

107 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field( 

108 default_factory=list 

109 ) 

110 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field( 

111 default_factory=dict 

112 ) 

113 transformations: List[TransformationRule] = field(default_factory=list) 

114 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field( 

115 default_factory=dict 

116 ) 

117 install_rules: List[InstallRule] = field(default_factory=list) 

118 requested_service_rules: List[ServiceRule] = field(default_factory=list) 

119 

120 

121def _path_to_tar_member( 

122 path: FSPath, 

123 clamp_mtime_to: int, 

124) -> TarMember: 

125 mtime = float(clamp_mtime_to) 

126 owner, uid, group, gid = path.tar_owner_info 

127 mode = path.mode 

128 

129 if path.has_fs_path: 

130 mtime = min(mtime, path.mtime) 

131 

132 if path.is_dir: 

133 path_type = PathType.DIRECTORY 

134 elif path.is_file: 

135 # TODO: someday we will need to deal with hardlinks and it might appear here. 

136 path_type = PathType.FILE 

137 elif path.is_symlink: 137 ↛ 157line 137 didn't jump to line 157, because the condition on line 137 was never false

138 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

139 link_target = debian_policy_normalize_symlink_target( 

140 path.path, 

141 path.readlink(), 

142 ) 

143 return TarMember.virtual_path( 

144 path.tar_path, 

145 PathType.SYMLINK, 

146 mtime, 

147 link_target=link_target, 

148 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

149 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

150 mode=0o0777, 

151 owner=owner, 

152 uid=uid, 

153 group=group, 

154 gid=gid, 

155 ) 

156 else: 

157 assert not path.is_symlink 

158 raise AssertionError( 

159 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

160 ) 

161 

162 if not path.has_fs_path: 

163 assert not path.is_file 

164 return TarMember.virtual_path( 

165 path.tar_path, 

166 path_type, 

167 mtime, 

168 mode=mode, 

169 owner=owner, 

170 uid=uid, 

171 group=group, 

172 gid=gid, 

173 ) 

174 may_steal_fs_path = path._can_replace_inline 

175 return TarMember.from_file( 

176 path.tar_path, 

177 path.fs_path, 

178 mode=mode, 

179 uid=uid, 

180 owner=owner, 

181 gid=gid, 

182 group=group, 

183 path_type=path_type, 

184 path_mtime=mtime, 

185 clamp_mtime_to=clamp_mtime_to, 

186 may_steal_fs_path=may_steal_fs_path, 

187 ) 

188 

189 

190def _generate_intermediate_manifest( 

191 fs_root: FSPath, 

192 clamp_mtime_to: int, 

193) -> Iterable[TarMember]: 

194 symlinks = [] 

195 for path in fs_root.all_paths(): 

196 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

197 if tar_member.path_type == PathType.SYMLINK: 

198 symlinks.append(tar_member) 

199 continue 

200 yield tar_member 

201 yield from symlinks 

202 

203 

204ST = TypeVar("ST") 

205T = TypeVar("T") 

206 

207 

208class AbstractYAMLSubStore(Generic[ST]): 

209 def __init__( 

210 self, 

211 parent_store: Any, 

212 parent_key: Optional[Union[int, str]], 

213 store: Optional[ST] = None, 

214 ) -> None: 

215 if parent_store is not None and parent_key is not None: 

216 try: 

217 from_parent_store = parent_store[parent_key] 

218 except (KeyError, IndexError): 

219 from_parent_store = None 

220 if ( 220 ↛ 225line 220 didn't jump to line 225

221 store is not None 

222 and from_parent_store is not None 

223 and store is not parent_store 

224 ): 

225 raise ValueError( 

226 "Store is provided but is not the one already in the parent store" 

227 ) 

228 if store is None: 228 ↛ 230line 228 didn't jump to line 230, because the condition on line 228 was never false

229 store = from_parent_store 

230 self._parent_store = parent_store 

231 self._parent_key = parent_key 

232 self._is_detached = ( 

233 parent_key is None or parent_store is None or parent_key not in parent_store 

234 ) 

235 assert self._is_detached or store is not None 

236 if store is None: 

237 store = self._create_new_instance() 

238 self._store: ST = store 

239 

240 def _create_new_instance(self) -> ST: 

241 raise NotImplementedError 

242 

243 def create_definition_if_missing(self) -> None: 

244 if self._is_detached: 

245 self.create_definition() 

246 

247 def create_definition(self) -> None: 

248 if not self._is_detached: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true

249 raise RuntimeError("Definition is already present") 

250 parent_store = self._parent_store 

251 if parent_store is None: 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true

252 raise RuntimeError( 

253 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

254 ) 

255 if isinstance(parent_store, list): 

256 assert self._parent_key is None 

257 self._parent_key = len(parent_store) 

258 self._parent_store.append(self._store) 

259 else: 

260 parent_store[self._parent_key] = self._store 

261 self._is_detached = False 

262 

263 def remove_definition(self) -> None: 

264 self._ensure_attached() 

265 del self._parent_store[self._parent_key] 

266 if isinstance(self._parent_store, list): 

267 self._parent_key = None 

268 self._is_detached = True 

269 

270 def _ensure_attached(self) -> None: 

271 if self._is_detached: 

272 raise RuntimeError("The definition has been removed!") 

273 

274 

275class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]): 

276 def _create_new_instance(self) -> List[T]: 

277 return CommentedSeq() 

278 

279 

280class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]): 

281 def _create_new_instance(self) -> Dict[str, T]: 

282 return CommentedMap() 

283 

284 

285class MutableCondition: 

286 @classmethod 

287 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

288 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

289 

290 @classmethod 

291 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

292 return CommentedMap( 

293 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

294 ) 

295 

296 

297class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

298 @classmethod 

299 def new_symlink( 

300 cls, link_path: str, link_target: str, condition: Optional[Any] 

301 ) -> "MutableYAMLSymlink": 

302 inner = { 

303 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

304 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

305 } 

306 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

307 if condition is not None: 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true

308 inner["when"] = condition 

309 return cls(None, None, store=CommentedMap(content)) 

310 

311 @property 

312 def symlink_path(self) -> str: 

313 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

314 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

315 ] 

316 

317 @symlink_path.setter 

318 def symlink_path(self, path: str) -> None: 

319 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

320 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

321 ] = path 

322 

323 @property 

324 def symlink_target(self) -> Optional[str]: 

325 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

326 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

327 ] 

328 

329 @symlink_target.setter 

330 def symlink_target(self, target: str) -> None: 

331 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

332 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

333 ] = target 

334 

335 

336class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

337 @classmethod 

338 def rm_conffile( 

339 cls, 

340 conffile: str, 

341 prior_to_version: Optional[str], 

342 owning_package: Optional[str], 

343 ) -> "MutableYAMLConffileManagementItem": 

344 r = cls( 

345 None, 

346 None, 

347 store=CommentedMap( 

348 { 

349 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

350 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

351 ) 

352 } 

353 ), 

354 ) 

355 r.prior_to_version = prior_to_version 

356 r.owning_package = owning_package 

357 return r 

358 

359 @classmethod 

360 def mv_conffile( 

361 cls, 

362 old_conffile: str, 

363 new_conffile: str, 

364 prior_to_version: Optional[str], 

365 owning_package: Optional[str], 

366 ) -> "MutableYAMLConffileManagementItem": 

367 r = cls( 

368 None, 

369 None, 

370 store=CommentedMap( 

371 { 

372 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

373 { 

374 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

375 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

376 } 

377 ) 

378 } 

379 ), 

380 ) 

381 r.prior_to_version = prior_to_version 

382 r.owning_package = owning_package 

383 return r 

384 

385 @property 

386 def _container(self) -> Dict[str, Any]: 

387 assert len(self._store) == 1 

388 return next(iter(self._store.values())) 

389 

390 @property 

391 def command(self) -> str: 

392 assert len(self._store) == 1 

393 return next(iter(self._store)) 

394 

395 @property 

396 def obsolete_conffile(self) -> str: 

397 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

398 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

399 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

400 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

401 

402 @obsolete_conffile.setter 

403 def obsolete_conffile(self, value: str) -> None: 

404 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

405 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

406 else: 

407 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

408 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

409 

410 @property 

411 def new_conffile(self) -> str: 

412 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

413 raise TypeError( 

414 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

415 f" This is a {self.command}" 

416 ) 

417 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

418 

419 @new_conffile.setter 

420 def new_conffile(self, value: str) -> None: 

421 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

422 raise TypeError( 

423 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

424 f" This is a {self.command}" 

425 ) 

426 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

427 

428 @property 

429 def prior_to_version(self) -> Optional[str]: 

430 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

431 

432 @prior_to_version.setter 

433 def prior_to_version(self, value: Optional[str]) -> None: 

434 if value is None: 

435 try: 

436 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

437 except KeyError: 

438 pass 

439 else: 

440 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

441 

442 @property 

443 def owning_package(self) -> Optional[str]: 

444 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

445 

446 @owning_package.setter 

447 def owning_package(self, value: Optional[str]) -> None: 

448 if value is None: 

449 try: 

450 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

451 except KeyError: 

452 pass 

453 else: 

454 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

455 

456 

457class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

458 def _list_store( 

459 self, key, *, create_if_absent: bool = False 

460 ) -> Optional[List[Dict[str, Any]]]: 

461 if self._is_detached or key not in self._store: 

462 if create_if_absent: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true

463 return None 

464 self.create_definition_if_missing() 

465 self._store[key] = [] 

466 return self._store[key] 

467 

468 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

469 parent_store = self._list_store(key, create_if_absent=True) 

470 assert parent_store is not None 

471 if not item._is_detached or ( 471 ↛ 474line 471 didn't jump to line 474, because the condition on line 471 was never true

472 item._parent_store is not None and item._parent_store is not parent_store 

473 ): 

474 raise RuntimeError( 

475 "Item is already attached or associated with a different container" 

476 ) 

477 item._parent_store = parent_store 

478 item.create_definition() 

479 

480 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

481 self._insert_item(MK_TRANSFORMATIONS, symlink) 

482 

483 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

484 store = self._list_store(MK_TRANSFORMATIONS) 

485 if store is None: 485 ↛ 486line 485 didn't jump to line 486, because the condition on line 485 was never true

486 return 

487 for i in range(len(store)): 487 ↛ 488line 487 didn't jump to line 488, because the loop on line 487 never started

488 d = store[i] 

489 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

490 yield MutableYAMLSymlink(store, i) 

491 

492 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

493 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

494 if store is None: 494 ↛ 495line 494 didn't jump to line 495, because the condition on line 494 was never true

495 return 

496 yield from ( 

497 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

498 ) 

499 

500 def add_conffile_management( 

501 self, conffile_management_item: MutableYAMLConffileManagementItem 

502 ) -> None: 

503 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

504 

505 

506class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

507 @property 

508 def _container(self) -> Dict[str, Any]: 

509 assert len(self._store) == 1 

510 return next(iter(self._store.values())) 

511 

512 @property 

513 def into(self) -> Optional[List[str]]: 

514 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

515 if v is None: 

516 return None 

517 if isinstance(v, str): 

518 return [v] 

519 return v 

520 

521 @into.setter 

522 def into(self, new_value: Optional[Union[str, List[str]]]) -> None: 

523 if new_value is None: 523 ↛ 527line 523 didn't jump to line 527, because the condition on line 523 was never false

524 with suppress(KeyError): 

525 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

526 return 

527 if isinstance(new_value, str): 

528 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

529 return 

530 new_list = CommentedSeq(new_value) 

531 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

532 

533 @property 

534 def when(self) -> Optional[Union[str, Mapping[str, Any]]]: 

535 return self._container[MK_CONDITION_WHEN] 

536 

537 @when.setter 

538 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None: 

539 if new_value is None: 539 ↛ 540line 539 didn't jump to line 540, because the condition on line 539 was never true

540 with suppress(KeyError): 

541 del self._container[MK_CONDITION_WHEN] 

542 return 

543 if isinstance(new_value, str): 543 ↛ 544line 543 didn't jump to line 544, because the condition on line 543 was never true

544 self._container[MK_CONDITION_WHEN] = new_value 

545 return 

546 new_map = CommentedMap(new_value) 

547 self._container[MK_CONDITION_WHEN] = new_map 

548 

549 @classmethod 

550 def install_dest( 

551 cls, 

552 sources: Union[str, List[str]], 

553 into: Optional[Union[str, List[str]]], 

554 *, 

555 dest_dir: Optional[str] = None, 

556 when: Optional[Union[str, Mapping[str, Any]]] = None, 

557 ) -> "MutableYAMLInstallRuleInstall": 

558 k = MK_INSTALLATIONS_INSTALL_SOURCES 

559 if isinstance(sources, str): 

560 k = MK_INSTALLATIONS_INSTALL_SOURCE 

561 r = MutableYAMLInstallRuleInstall( 

562 None, 

563 None, 

564 store=CommentedMap( 

565 { 

566 MK_INSTALLATIONS_INSTALL: CommentedMap( 

567 { 

568 k: sources, 

569 } 

570 ) 

571 } 

572 ), 

573 ) 

574 r.dest_dir = dest_dir 

575 r.into = into 

576 if when is not None: 

577 r.when = when 

578 return r 

579 

580 @classmethod 

581 def multi_dest_install( 

582 cls, 

583 sources: Union[str, List[str]], 

584 dest_dirs: Sequence[str], 

585 into: Optional[Union[str, List[str]]], 

586 *, 

587 when: Optional[Union[str, Mapping[str, Any]]] = None, 

588 ) -> "MutableYAMLInstallRuleInstall": 

589 k = MK_INSTALLATIONS_INSTALL_SOURCES 

590 if isinstance(sources, str): 590 ↛ 592line 590 didn't jump to line 592, because the condition on line 590 was never false

591 k = MK_INSTALLATIONS_INSTALL_SOURCE 

592 r = MutableYAMLInstallRuleInstall( 

593 None, 

594 None, 

595 store=CommentedMap( 

596 { 

597 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

598 { 

599 k: sources, 

600 "dest-dirs": dest_dirs, 

601 } 

602 ) 

603 } 

604 ), 

605 ) 

606 r.into = into 

607 if when is not None: 607 ↛ 608line 607 didn't jump to line 608, because the condition on line 607 was never true

608 r.when = when 

609 return r 

610 

611 @classmethod 

612 def install_as( 

613 cls, 

614 source: str, 

615 install_as: str, 

616 into: Optional[Union[str, List[str]]], 

617 when: Optional[Union[str, Mapping[str, Any]]] = None, 

618 ) -> "MutableYAMLInstallRuleInstall": 

619 r = MutableYAMLInstallRuleInstall( 

620 None, 

621 None, 

622 store=CommentedMap( 

623 { 

624 MK_INSTALLATIONS_INSTALL: CommentedMap( 

625 { 

626 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

627 MK_INSTALLATIONS_INSTALL_AS: install_as, 

628 } 

629 ) 

630 } 

631 ), 

632 ) 

633 r.into = into 

634 if when is not None: 634 ↛ 635line 634 didn't jump to line 635, because the condition on line 634 was never true

635 r.when = when 

636 return r 

637 

638 @classmethod 

639 def install_doc_as( 

640 cls, 

641 source: str, 

642 install_as: str, 

643 into: Optional[Union[str, List[str]]], 

644 when: Optional[Union[str, Mapping[str, Any]]] = None, 

645 ) -> "MutableYAMLInstallRuleInstall": 

646 r = MutableYAMLInstallRuleInstall( 

647 None, 

648 None, 

649 store=CommentedMap( 

650 { 

651 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

652 { 

653 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

654 MK_INSTALLATIONS_INSTALL_AS: install_as, 

655 } 

656 ) 

657 } 

658 ), 

659 ) 

660 r.into = into 

661 if when is not None: 

662 r.when = when 

663 return r 

664 

665 @classmethod 

666 def install_docs( 

667 cls, 

668 sources: Union[str, List[str]], 

669 into: Optional[Union[str, List[str]]], 

670 *, 

671 dest_dir: Optional[str] = None, 

672 when: Optional[Union[str, Mapping[str, Any]]] = None, 

673 ) -> "MutableYAMLInstallRuleInstall": 

674 k = MK_INSTALLATIONS_INSTALL_SOURCES 

675 if isinstance(sources, str): 

676 k = MK_INSTALLATIONS_INSTALL_SOURCE 

677 r = MutableYAMLInstallRuleInstall( 

678 None, 

679 None, 

680 store=CommentedMap( 

681 { 

682 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

683 { 

684 k: sources, 

685 } 

686 ) 

687 } 

688 ), 

689 ) 

690 r.into = into 

691 r.dest_dir = dest_dir 

692 if when is not None: 

693 r.when = when 

694 return r 

695 

696 @classmethod 

697 def install_examples( 

698 cls, 

699 sources: Union[str, List[str]], 

700 into: Optional[Union[str, List[str]]], 

701 when: Optional[Union[str, Mapping[str, Any]]] = None, 

702 ) -> "MutableYAMLInstallRuleInstallExamples": 

703 k = MK_INSTALLATIONS_INSTALL_SOURCES 

704 if isinstance(sources, str): 

705 k = MK_INSTALLATIONS_INSTALL_SOURCE 

706 r = MutableYAMLInstallRuleInstallExamples( 

707 None, 

708 None, 

709 store=CommentedMap( 

710 { 

711 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

712 { 

713 k: sources, 

714 } 

715 ) 

716 } 

717 ), 

718 ) 

719 r.into = into 

720 if when is not None: 720 ↛ 721line 720 didn't jump to line 721, because the condition on line 720 was never true

721 r.when = when 

722 return r 

723 

724 @classmethod 

725 def install_man( 

726 cls, 

727 sources: Union[str, List[str]], 

728 into: Optional[Union[str, List[str]]], 

729 language: Optional[str], 

730 when: Optional[Union[str, Mapping[str, Any]]] = None, 

731 ) -> "MutableYAMLInstallRuleMan": 

732 k = MK_INSTALLATIONS_INSTALL_SOURCES 

733 if isinstance(sources, str): 733 ↛ 734line 733 didn't jump to line 734, because the condition on line 733 was never true

734 k = MK_INSTALLATIONS_INSTALL_SOURCE 

735 r = MutableYAMLInstallRuleMan( 

736 None, 

737 None, 

738 store=CommentedMap( 

739 { 

740 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

741 { 

742 k: sources, 

743 } 

744 ) 

745 } 

746 ), 

747 ) 

748 r.language = language 

749 r.into = into 

750 if when is not None: 750 ↛ 751line 750 didn't jump to line 751, because the condition on line 750 was never true

751 r.when = when 

752 return r 

753 

754 @classmethod 

755 def discard( 

756 cls, 

757 sources: Union[str, List[str]], 

758 ) -> "MutableYAMLInstallRuleDiscard": 

759 return MutableYAMLInstallRuleDiscard( 

760 None, 

761 None, 

762 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

763 ) 

764 

765 

766class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

767 pass 

768 

769 

770class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

771 @property 

772 def language(self) -> Optional[str]: 

773 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

774 

775 @language.setter 

776 def language(self, new_value: Optional[str]) -> None: 

777 if new_value is not None: 

778 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

779 return 

780 with suppress(KeyError): 

781 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

782 

783 

784class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

785 pass 

786 

787 

788class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

789 @property 

790 def sources(self) -> List[str]: 

791 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

792 if isinstance(v, str): 

793 return [v] 

794 return v 

795 

796 @sources.setter 

797 def sources(self, new_value: Union[str, List[str]]) -> None: 

798 if isinstance(new_value, str): 

799 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

800 return 

801 new_list = CommentedSeq(new_value) 

802 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

803 

804 @property 

805 def dest_dir(self) -> Optional[str]: 

806 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

807 

808 @dest_dir.setter 

809 def dest_dir(self, new_value: Optional[str]) -> None: 

810 if new_value is not None and self.dest_as is not None: 810 ↛ 811line 810 didn't jump to line 811, because the condition on line 810 was never true

811 raise ValueError( 

812 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

813 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

814 ) 

815 if new_value is not None: 

816 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

817 else: 

818 with suppress(KeyError): 

819 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

820 

821 @property 

822 def dest_as(self) -> Optional[str]: 

823 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

824 

825 @dest_as.setter 

826 def dest_as(self, new_value: Optional[str]) -> None: 

827 if new_value is not None: 

828 if self.dest_dir is not None: 

829 raise ValueError( 

830 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

831 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

832 ) 

833 

834 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

835 if isinstance(sources, list): 

836 if len(sources) != 1: 

837 raise ValueError( 

838 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

839 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

840 ) 

841 self.sources = sources[0] 

842 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

843 else: 

844 with suppress(KeyError): 

845 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

846 

847 

848class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

849 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

850 parent_store = self._store 

851 if not install_rule._is_detached or ( 851 ↛ 855line 851 didn't jump to line 855, because the condition on line 851 was never true

852 install_rule._parent_store is not None 

853 and install_rule._parent_store is not parent_store 

854 ): 

855 raise RuntimeError( 

856 "Item is already attached or associated with a different container" 

857 ) 

858 self.create_definition_if_missing() 

859 install_rule._parent_store = parent_store 

860 install_rule.create_definition() 

861 

862 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

863 parent_store = self._store 

864 for install_rule in install_rules: 

865 if not install_rule._is_detached or ( 865 ↛ 869line 865 didn't jump to line 869, because the condition on line 865 was never true

866 install_rule._parent_store is not None 

867 and install_rule._parent_store is not parent_store 

868 ): 

869 raise RuntimeError( 

870 "Item is already attached or associated with a different container" 

871 ) 

872 self.create_definition_if_missing() 

873 install_rule._parent_store = parent_store 

874 install_rule.create_definition() 

875 

876 

877class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

878 @property 

879 def variables(self) -> Dict[str, Any]: 

880 return self._store 

881 

882 def __setitem__(self, key: str, value: Any) -> None: 

883 self._store[key] = value 

884 self.create_definition_if_missing() 

885 

886 

887class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

888 def manifest_variables( 

889 self, *, create_if_absent: bool = True 

890 ) -> MutableYAMLManifestVariables: 

891 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

892 if create_if_absent: 892 ↛ 893line 892 didn't jump to line 893, because the condition on line 892 was never true

893 d.create_definition_if_missing() 

894 return d 

895 

896 

897class MutableYAMLManifest: 

898 def __init__(self, store: Any) -> None: 

899 self._store = store 

900 

901 @classmethod 

902 def empty_manifest(cls) -> "MutableYAMLManifest": 

903 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

904 

905 @property 

906 def manifest_version(self) -> str: 

907 return self._store[MK_MANIFEST_VERSION] 

908 

909 @manifest_version.setter 

910 def manifest_version(self, version: str) -> None: 

911 if version not in SUPPORTED_MANIFEST_VERSIONS: 

912 raise ValueError("Unsupported version") 

913 self._store[MK_MANIFEST_VERSION] = version 

914 

915 def installations( 

916 self, 

917 *, 

918 create_if_absent: bool = True, 

919 ) -> MutableYAMLInstallationsDefinition: 

920 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

921 if create_if_absent: 921 ↛ 922line 921 didn't jump to line 922, because the condition on line 921 was never true

922 d.create_definition_if_missing() 

923 return d 

924 

925 def manifest_definitions( 

926 self, 

927 *, 

928 create_if_absent: bool = True, 

929 ) -> MutableYAMLManifestDefinitions: 

930 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

931 if create_if_absent: 931 ↛ 932line 931 didn't jump to line 932, because the condition on line 931 was never true

932 d.create_definition_if_missing() 

933 return d 

934 

935 def package( 

936 self, name: str, *, create_if_absent: bool = True 

937 ) -> MutableYAMLPackageDefinition: 

938 if MK_PACKAGES not in self._store: 938 ↛ 940line 938 didn't jump to line 940, because the condition on line 938 was never false

939 self._store[MK_PACKAGES] = CommentedMap() 

940 packages_store = self._store[MK_PACKAGES] 

941 package = packages_store.get(name) 

942 if package is None: 942 ↛ 949line 942 didn't jump to line 949, because the condition on line 942 was never false

943 if not create_if_absent: 943 ↛ 944line 943 didn't jump to line 944, because the condition on line 943 was never true

944 raise KeyError(name) 

945 assert packages_store is not None 

946 d = MutableYAMLPackageDefinition(packages_store, name) 

947 d.create_definition() 

948 else: 

949 d = MutableYAMLPackageDefinition(packages_store, name) 

950 return d 

951 

952 def write_to(self, fd) -> None: 

953 MANIFEST_YAML.dump(self._store, fd) 

954 

955 

956def _describe_missing_path(entry: VirtualPath) -> str: 

957 if entry.is_dir: 

958 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

959 if entry.is_symlink: 

960 target = os.readlink(entry.fs_path) 

961 return f"{entry.fs_path} (symlink; links to {target})" 

962 if entry.is_file: 

963 return f"{entry.fs_path} (file)" 

964 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

965 

966 

967def _detect_missing_installations( 

968 path_matcher: SourcePathMatcher, 

969 search_dir: VirtualPath, 

970) -> None: 

971 if not os.path.isdir(search_dir.fs_path): 971 ↛ 973line 971 didn't jump to line 973, because the condition on line 971 was never false

972 return 

973 missing = list(path_matcher.detect_missing(search_dir)) 

974 if not missing: 

975 return 

976 

977 _warn( 

978 f"The following paths were present in {search_dir.fs_path}, but not installed (nor explicitly discarded)." 

979 ) 

980 _warn("") 

981 for entry in missing: 

982 desc = _describe_missing_path(entry) 

983 _warn(f" * {desc}") 

984 _warn("") 

985 

986 excl = textwrap.dedent( 

987 """\ 

988 - discard: "*" 

989 """ 

990 ) 

991 

992 _error( 

993 "Please review the list and add either install rules or exclusions to `installations` in" 

994 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

995 f" end of your 'installations`:\n\n{excl}\n" 

996 ) 

997 

998 

999def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1000 used_discard_rules = path_matcher.used_auto_discard_rules 

1001 # Discard rules can match and then be overridden. In that case, they appear 

1002 # but have 0 matches. 

1003 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1004 return 

1005 _info("The following automatic discard rules were triggered:") 

1006 example_path: Optional[str] = None 

1007 for rule in sorted(used_discard_rules): 

1008 for fs_path in sorted(used_discard_rules[rule]): 

1009 if example_path is None: 1009 ↛ 1011line 1009 didn't jump to line 1011, because the condition on line 1009 was never false

1010 example_path = fs_path 

1011 _info(f" * {rule} -> {fs_path}") 

1012 assert example_path is not None 

1013 _info("") 

1014 _info( 

1015 "Note that some of these may have been overruled. The overrule detection logic is not" 

1016 ) 

1017 _info("100% reliable.") 

1018 _info("") 

1019 _info( 

1020 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1021 ) 

1022 _info(" installations:") 

1023 _info(" - install:") 

1024 _info(f" source: {example_path}") 

1025 

1026 

1027def _install_everything_from_source_dir_if_present( 

1028 dctrl_bin: BinaryPackage, 

1029 substitution: Substitution, 

1030 path_matcher: SourcePathMatcher, 

1031 install_rule_context: InstallRuleContext, 

1032 source_condition_context: ConditionContext, 

1033 source_dir: VirtualPath, 

1034 *, 

1035 into_dir: Optional[VirtualPath] = None, 

1036) -> None: 

1037 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1038 pkg_set = frozenset([dctrl_bin]) 

1039 install_rule = InstallRule.install_dest( 

1040 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1041 None, 

1042 pkg_set, 

1043 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1044 None, 

1045 ) 

1046 pkg_search_dir: Tuple[SearchDir] = ( 

1047 SearchDir( 

1048 source_dir, 

1049 pkg_set, 

1050 ), 

1051 ) 

1052 replacements = { 

1053 "search_dirs": pkg_search_dir, 

1054 } 

1055 if into_dir is not None: 1055 ↛ 1056line 1055 didn't jump to line 1056, because the condition on line 1055 was never true

1056 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1057 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1058 binary_package_contexts[dctrl_bin.name] = updated 

1059 replacements["binary_package_contexts"] = binary_package_contexts 

1060 

1061 fake_install_rule_context = install_rule_context.replace(**replacements) 

1062 try: 

1063 install_rule.perform_install( 

1064 path_matcher, 

1065 fake_install_rule_context, 

1066 source_condition_context, 

1067 ) 

1068 except ( 

1069 NoMatchForInstallPatternError, 

1070 PathAlreadyInstalledOrDiscardedError, 

1071 ): 

1072 # Empty directory or everything excluded by default; ignore the error 

1073 pass 

1074 

1075 

1076class HighLevelManifest: 

1077 def __init__( 

1078 self, 

1079 manifest_path: str, 

1080 mutable_manifest: Optional[MutableYAMLManifest], 

1081 install_rules: Optional[List[InstallRule]], 

1082 source_package: SourcePackage, 

1083 binary_packages: Mapping[str, BinaryPackage], 

1084 substitution: Substitution, 

1085 package_transformations: Mapping[str, PackageTransformationDefinition], 

1086 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1087 dpkg_arch_query_table: DpkgArchTable, 

1088 build_env: DebBuildOptionsAndProfiles, 

1089 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1090 debian_dir: VirtualPath, 

1091 ) -> None: 

1092 self.manifest_path = manifest_path 

1093 self.mutable_manifest = mutable_manifest 

1094 self._install_rules = install_rules 

1095 self._source_package = source_package 

1096 self._binary_packages = binary_packages 

1097 self.substitution = substitution 

1098 self.package_transformations = package_transformations 

1099 self._dpkg_architecture_variables = dpkg_architecture_variables 

1100 self._dpkg_arch_query_table = dpkg_arch_query_table 

1101 self._build_env = build_env 

1102 self._used_for: Set[str] = set() 

1103 self._plugin_provided_feature_set = plugin_provided_feature_set 

1104 self._debian_dir = debian_dir 

1105 

1106 def source_version(self, include_binnmu_version: bool = True) -> str: 

1107 # TODO: There should an easier way to determine the source version; really. 

1108 version_var = "{{DEB_VERSION}}" 

1109 if not include_binnmu_version: 

1110 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1111 try: 

1112 return self.substitution.substitute( 

1113 version_var, "internal (resolve version)" 

1114 ) 

1115 except DebputySubstitutionError as e: 

1116 raise AssertionError(f"Could not resolve {version_var}") from e 

1117 

1118 @property 

1119 def debian_dir(self) -> VirtualPath: 

1120 return self._debian_dir 

1121 

1122 @property 

1123 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1124 return self._dpkg_architecture_variables 

1125 

1126 @property 

1127 def build_env(self) -> DebBuildOptionsAndProfiles: 

1128 return self._build_env 

1129 

1130 @property 

1131 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1132 return self._plugin_provided_feature_set 

1133 

1134 @property 

1135 def active_packages(self) -> Iterable[BinaryPackage]: 

1136 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1137 

1138 @property 

1139 def all_packages(self) -> Iterable[BinaryPackage]: 

1140 yield from self._binary_packages.values() 

1141 

1142 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1143 return self.package_transformations[package] 

1144 

1145 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1146 name = package.name 

1147 # If it is not a -doc package, then docs should be installed 

1148 # under its own package name. 

1149 if not name.endswith("-doc"): 1149 ↛ 1151line 1149 didn't jump to line 1151, because the condition on line 1149 was never false

1150 return package 

1151 name = name[:-4] 

1152 main_package = self._binary_packages.get(name) 

1153 if main_package: 

1154 return main_package 

1155 if name.startswith("lib"): 

1156 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1157 if dev_pkg: 

1158 return dev_pkg 

1159 

1160 # If we found no better match; default to the doc package itself. 

1161 return package 

1162 

1163 def perform_installations( 

1164 self, 

1165 *, 

1166 install_request_context: Optional[InstallSearchDirContext] = None, 

1167 enable_manifest_installation_feature: bool = True, 

1168 ) -> PackageDataTable: 

1169 package_data_dict = {} 

1170 package_data_table = PackageDataTable(package_data_dict) 

1171 if install_request_context is None: 1171 ↛ 1173line 1171 didn't jump to line 1173, because the condition on line 1171 was never true

1172 

1173 @functools.lru_cache(None) 

1174 def _as_path(fs_path: str) -> VirtualPath: 

1175 return FSROOverlay.create_root_dir(".", fs_path) 

1176 

1177 dtmp_dir = _as_path("debian/tmp") 

1178 source_root_dir = _as_path(".") 

1179 into = frozenset(self._binary_packages.values()) 

1180 default_search_dirs = [dtmp_dir] 

1181 per_package_search_dirs = { 

1182 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1183 for t in self.package_transformations.values() 

1184 if t.search_dirs is not None 

1185 } 

1186 search_dirs = _determine_search_dir_order( 

1187 per_package_search_dirs, 

1188 into, 

1189 default_search_dirs, 

1190 source_root_dir, 

1191 ) 

1192 check_for_uninstalled_dirs = tuple( 

1193 s.search_dir 

1194 for s in search_dirs 

1195 if s.search_dir.fs_path != source_root_dir.fs_path 

1196 ) 

1197 _present_installation_dirs(search_dirs, check_for_uninstalled_dirs, into) 

1198 else: 

1199 dtmp_dir = None 

1200 search_dirs = install_request_context.search_dirs 

1201 into = frozenset(self._binary_packages.values()) 

1202 seen = set() 

1203 for search_dir in search_dirs: 

1204 seen.update(search_dir.applies_to) 

1205 

1206 missing = into - seen 

1207 if missing: 1207 ↛ 1208line 1207 didn't jump to line 1208, because the condition on line 1207 was never true

1208 names = ", ".join(p.name for p in missing) 

1209 raise ValueError( 

1210 f"The following package(s) had no search dirs: {names}." 

1211 " (Generally, the source root would be applicable to all packages)" 

1212 ) 

1213 extra_names = seen - into 

1214 if extra_names: 1214 ↛ 1215line 1214 didn't jump to line 1215, because the condition on line 1214 was never true

1215 names = ", ".join(p.name for p in extra_names) 

1216 raise ValueError( 

1217 f"The install_request_context referenced the following unknown package(s): {names}" 

1218 ) 

1219 

1220 check_for_uninstalled_dirs = ( 

1221 install_request_context.check_for_uninstalled_dirs 

1222 ) 

1223 

1224 install_rule_context = InstallRuleContext(search_dirs) 

1225 

1226 if ( 1226 ↛ 1232line 1226 didn't jump to line 1232

1227 enable_manifest_installation_feature 

1228 and self._install_rules is None 

1229 and dtmp_dir is not None 

1230 and os.path.isdir(dtmp_dir.fs_path) 

1231 ): 

1232 msg = ( 

1233 "The build system appears to have provided the output of upstream build system's" 

1234 " install in debian/tmp. However, these are no provisions for debputy to install" 

1235 " any of that into any of the debian packages listed in debian/control." 

1236 " To avoid accidentally creating empty packages, debputy will insist that you " 

1237 " explicitly define an empty installation definition if you did not want to " 

1238 " install any of those files even though they have been provided." 

1239 ' Example: "installations: []"' 

1240 ) 

1241 _error(msg) 

1242 elif ( 1242 ↛ 1245line 1242 didn't jump to line 1245

1243 not enable_manifest_installation_feature and self._install_rules is not None 

1244 ): 

1245 _error( 

1246 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1247 f" Please remove or comment out the `installations` keyword." 

1248 ) 

1249 

1250 for dctrl_bin in self.all_packages: 

1251 package = dctrl_bin.name 

1252 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1253 

1254 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1255 dctrl_bin, 

1256 FSRootDir(), 

1257 doc_main_package, 

1258 ) 

1259 

1260 if enable_manifest_installation_feature: 1260 ↛ 1265line 1260 didn't jump to line 1265

1261 discard_rules = list( 

1262 self.plugin_provided_feature_set.auto_discard_rules.values() 

1263 ) 

1264 else: 

1265 discard_rules = [ 

1266 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1267 ] 

1268 path_matcher = SourcePathMatcher(discard_rules) 

1269 

1270 source_condition_context = ConditionContext( 

1271 binary_package=None, 

1272 substitution=self.substitution, 

1273 build_env=self._build_env, 

1274 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1275 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1276 ) 

1277 

1278 for dctrl_bin in self.active_packages: 

1279 package = dctrl_bin.name 

1280 if install_request_context: 1280 ↛ 1285line 1280 didn't jump to line 1285, because the condition on line 1280 was never false

1281 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1282 package 

1283 ) 

1284 else: 

1285 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1286 if os.path.isdir(build_system_staging_dir_fs_path): 

1287 build_system_staging_dir = FSROOverlay.create_root_dir( 

1288 ".", 

1289 build_system_staging_dir_fs_path, 

1290 ) 

1291 else: 

1292 build_system_staging_dir = None 

1293 

1294 if build_system_staging_dir is not None: 

1295 _install_everything_from_source_dir_if_present( 

1296 dctrl_bin, 

1297 self.substitution, 

1298 path_matcher, 

1299 install_rule_context, 

1300 source_condition_context, 

1301 build_system_staging_dir, 

1302 ) 

1303 

1304 if self._install_rules: 

1305 # FIXME: Check that every install rule remains used after transformations have run. 

1306 # What we want to check is transformations do not exclude everything from an install 

1307 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1308 # match. 

1309 for install_rule in self._install_rules: 

1310 install_rule.perform_install( 

1311 path_matcher, 

1312 install_rule_context, 

1313 source_condition_context, 

1314 ) 

1315 

1316 if enable_manifest_installation_feature: 1316 ↛ 1320line 1316 didn't jump to line 1320, because the condition on line 1316 was never false

1317 for search_dir in check_for_uninstalled_dirs: 

1318 _detect_missing_installations(path_matcher, search_dir) 

1319 

1320 for dctrl_bin in self.all_packages: 

1321 package = dctrl_bin.name 

1322 binary_install_rule_context = install_rule_context[package] 

1323 build_system_pkg_staging_dir = os.path.join("debian", package) 

1324 fs_root = binary_install_rule_context.fs_root 

1325 

1326 context = self.package_transformations[package] 

1327 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 1327 ↛ 1335line 1327 didn't jump to line 1335, because the condition on line 1327 was never false

1328 for special_install_rule in context.install_rules: 1328 ↛ 1329line 1328 didn't jump to line 1329, because the loop on line 1328 never started

1329 special_install_rule.perform_install( 

1330 path_matcher, 

1331 install_rule_context, 

1332 source_condition_context, 

1333 ) 

1334 

1335 if dctrl_bin.should_be_acted_on: 1335 ↛ 1347line 1335 didn't jump to line 1347, because the condition on line 1335 was never false

1336 self.apply_fs_transformations(package, fs_root) 

1337 substvars_file = f"debian/{package}.substvars" 

1338 substvars = FlushableSubstvars.load_from_path( 

1339 substvars_file, missing_ok=True 

1340 ) 

1341 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1342 substvars.substvars_path = None 

1343 control_output_dir = generated_content_dir( 

1344 package=dctrl_bin, subdir_key="DEBIAN" 

1345 ) 

1346 else: 

1347 substvars = FlushableSubstvars() 

1348 control_output_dir = None 

1349 

1350 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1351 if udeb_package and not udeb_package.is_udeb: 1351 ↛ 1352line 1351 didn't jump to line 1352, because the condition on line 1351 was never true

1352 udeb_package = None 

1353 

1354 package_metadata_context = PackageProcessingContextProvider( 

1355 self, 

1356 dctrl_bin, 

1357 udeb_package, 

1358 package_data_table, 

1359 # FIXME: source_package 

1360 ) 

1361 

1362 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1363 package_metadata_context, 

1364 substvars, 

1365 context.maintscript_snippets, 

1366 context.substitution, 

1367 ) 

1368 

1369 if not enable_manifest_installation_feature: 1369 ↛ 1370line 1369 didn't jump to line 1370, because the condition on line 1369 was never true

1370 assert_no_dbgsym_migration(dctrl_bin) 

1371 dh_dbgsym_root_fs = FSROOverlay.create_root_dir( 

1372 "", dhe_dbgsym_root_dir(dctrl_bin) 

1373 ) 

1374 dbgsym_root_fs = FSRootDir() 

1375 _install_everything_from_source_dir_if_present( 

1376 dctrl_bin, 

1377 self.substitution, 

1378 path_matcher, 

1379 install_rule_context, 

1380 source_condition_context, 

1381 dh_dbgsym_root_fs, 

1382 into_dir=dbgsym_root_fs, 

1383 ) 

1384 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1385 dbgsym_info = DbgsymInfo( 

1386 dbgsym_root_fs, 

1387 dbgsym_build_ids, 

1388 ) 

1389 else: 

1390 dbgsym_info = DbgsymInfo( 

1391 FSRootDir(), 

1392 [], 

1393 ) 

1394 

1395 package_data_dict[package] = BinaryPackageData( 

1396 self._source_package, 

1397 dctrl_bin, 

1398 build_system_pkg_staging_dir, 

1399 control_output_dir, 

1400 fs_root, 

1401 substvars, 

1402 package_metadata_context, 

1403 ctrl_creator, 

1404 dbgsym_info, 

1405 ) 

1406 

1407 _list_automatic_discard_rules(path_matcher) 

1408 

1409 return package_data_table 

1410 

1411 def condition_context( 

1412 self, binary_package: Optional[Union[BinaryPackage, str]] 

1413 ) -> ConditionContext: 

1414 if binary_package is None: 1414 ↛ 1415line 1414 didn't jump to line 1415, because the condition on line 1414 was never true

1415 return ConditionContext( 

1416 binary_package=None, 

1417 substitution=self.substitution, 

1418 build_env=self._build_env, 

1419 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1420 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1421 ) 

1422 if not isinstance(binary_package, str): 1422 ↛ 1423line 1422 didn't jump to line 1423, because the condition on line 1422 was never true

1423 binary_package = binary_package.name 

1424 

1425 package_transformation = self.package_transformations[binary_package] 

1426 return ConditionContext( 

1427 binary_package=package_transformation.binary_package, 

1428 substitution=package_transformation.substitution, 

1429 build_env=self._build_env, 

1430 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1431 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1432 ) 

1433 

1434 def apply_fs_transformations( 

1435 self, 

1436 package: str, 

1437 fs_root: FSPath, 

1438 ) -> None: 

1439 if package in self._used_for: 1439 ↛ 1440line 1439 didn't jump to line 1440, because the condition on line 1439 was never true

1440 raise ValueError( 

1441 f"data.tar contents for {package} has already been finalized!?" 

1442 ) 

1443 if package not in self.package_transformations: 1443 ↛ 1444line 1443 didn't jump to line 1444, because the condition on line 1443 was never true

1444 raise ValueError( 

1445 f'The package "{package}" was not relevant for the manifest!?' 

1446 ) 

1447 package_transformation = self.package_transformations[package] 

1448 condition_context = ConditionContext( 

1449 binary_package=package_transformation.binary_package, 

1450 substitution=package_transformation.substitution, 

1451 build_env=self._build_env, 

1452 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1453 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1454 ) 

1455 norm_rules = list( 

1456 builtin_mode_normalization_rules( 

1457 self._dpkg_architecture_variables, 

1458 package_transformation.binary_package, 

1459 package_transformation.substitution, 

1460 ) 

1461 ) 

1462 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1463 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1464 for transformation in package_transformation.transformations: 

1465 transformation.transform_file_system(fs_root, condition_context) 

1466 interpreter_normalization = NormalizeShebangLineTransformation() 

1467 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1468 

1469 def finalize_data_tar_contents( 

1470 self, 

1471 package: str, 

1472 fs_root: FSPath, 

1473 clamp_mtime_to: int, 

1474 ) -> IntermediateManifest: 

1475 if package in self._used_for: 1475 ↛ 1476line 1475 didn't jump to line 1476, because the condition on line 1475 was never true

1476 raise ValueError( 

1477 f"data.tar contents for {package} has already been finalized!?" 

1478 ) 

1479 if package not in self.package_transformations: 1479 ↛ 1480line 1479 didn't jump to line 1480, because the condition on line 1479 was never true

1480 raise ValueError( 

1481 f'The package "{package}" was not relevant for the manifest!?' 

1482 ) 

1483 self._used_for.add(package) 

1484 

1485 # At this point, there so be no further mutations to the file system (because the will not 

1486 # be present in the intermediate manifest) 

1487 cast("FSRootDir", fs_root).is_read_write = False 

1488 

1489 intermediate_manifest = list( 

1490 _generate_intermediate_manifest( 

1491 fs_root, 

1492 clamp_mtime_to, 

1493 ) 

1494 ) 

1495 return intermediate_manifest 

1496 

1497 def apply_to_binary_staging_directory( 

1498 self, 

1499 package: str, 

1500 fs_root: FSPath, 

1501 clamp_mtime_to: int, 

1502 ) -> IntermediateManifest: 

1503 self.apply_fs_transformations(package, fs_root) 

1504 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1505 

1506 

1507@dataclasses.dataclass(slots=True) 

1508class SearchDirOrderState: 

1509 search_dir: VirtualPath 

1510 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field( 

1511 default_factory=set 

1512 ) 

1513 after: Set[str] = dataclasses.field(default_factory=set) 

1514 

1515 

1516def _present_installation_dirs( 

1517 search_dirs: Sequence[SearchDir], 

1518 checked_missing_dirs: Sequence[VirtualPath], 

1519 all_pkgs: FrozenSet[BinaryPackage], 

1520) -> None: 

1521 _info("The following directories are considered search dirs (in order):") 

1522 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1523 for search_dir in search_dirs: 

1524 applies_to = "" 

1525 if search_dir.applies_to < all_pkgs: 

1526 names = ", ".join(p.name for p in search_dir.applies_to) 

1527 applies_to = f" [only applicable to: {names}]" 

1528 remark = "" 

1529 if not os.path.isdir(search_dir.search_dir.fs_path): 

1530 remark = " (skipped; absent)" 

1531 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1532 

1533 if checked_missing_dirs: 

1534 _info('The following directories are considered for "not-installed" paths;') 

1535 for d in checked_missing_dirs: 

1536 remark = "" 

1537 if not os.path.isdir(d.fs_path): 

1538 remark = " (skipped; absent)" 

1539 _info(f" * {d.fs_path:{max_len}}{remark}") 

1540 

1541 

1542def _determine_search_dir_order( 

1543 requested: Mapping[BinaryPackage, List[VirtualPath]], 

1544 all_pkgs: FrozenSet[BinaryPackage], 

1545 default_search_dirs: List[VirtualPath], 

1546 source_root: VirtualPath, 

1547) -> Sequence[SearchDir]: 

1548 search_dir_table = {} 

1549 assert requested.keys() <= all_pkgs 

1550 for pkg in all_pkgs: 

1551 paths = requested.get(pkg, default_search_dirs) 

1552 previous_search_dir: Optional[SearchDirOrderState] = None 

1553 for path in paths: 

1554 try: 

1555 search_dir_state = search_dir_table[path.fs_path] 

1556 except KeyError: 

1557 search_dir_state = SearchDirOrderState(path) 

1558 search_dir_table[path.fs_path] = search_dir_state 

1559 search_dir_state.applies_to.add(pkg) 

1560 if previous_search_dir is not None: 

1561 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1562 previous_search_dir = search_dir_state 

1563 

1564 search_dirs_in_order = [] 

1565 released = set() 

1566 remaining = set() 

1567 for search_dir_state in search_dir_table.values(): 

1568 if not (search_dir_state.after <= released): 

1569 remaining.add(search_dir_state.search_dir.fs_path) 

1570 continue 

1571 search_dirs_in_order.append(search_dir_state) 

1572 released.add(search_dir_state.search_dir.fs_path) 

1573 

1574 while remaining: 

1575 current_released = len(released) 

1576 for fs_path in remaining: 

1577 search_dir_state = search_dir_table[fs_path] 

1578 if not search_dir_state.after.issubset(released): 

1579 remaining.add(search_dir_state.search_dir.fs_path) 

1580 continue 

1581 search_dirs_in_order.append(search_dir_state) 

1582 released.add(search_dir_state.search_dir.fs_path) 

1583 

1584 if current_released == len(released): 

1585 names = ", ".join(remaining) 

1586 _error( 

1587 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1588 " Note that the search directories across all packages have to be ordered (and the" 

1589 " source root should generally be last)" 

1590 ) 

1591 remaining -= released 

1592 

1593 search_dirs_in_order.append( 

1594 SearchDirOrderState( 

1595 source_root, 

1596 all_pkgs, 

1597 ) 

1598 ) 

1599 

1600 return tuple( 

1601 # Avoid duplicating all_pkgs 

1602 SearchDir( 

1603 s.search_dir, 

1604 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1605 ) 

1606 for s in search_dirs_in_order 

1607 )