Coverage for src/debputy/highlevel_manifest.py: 67%
801 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import dataclasses
2import functools
3import os
4import textwrap
5from contextlib import suppress
6from dataclasses import dataclass, field
7from typing import (
8 List,
9 Dict,
10 Iterable,
11 Mapping,
12 Any,
13 Union,
14 Optional,
15 TypeVar,
16 Generic,
17 cast,
18 Set,
19 Tuple,
20 Sequence,
21 FrozenSet,
22)
24from debian.debian_support import DpkgArchTable
25from ._deb_options_profiles import DebBuildOptionsAndProfiles
26from ._manifest_constants import *
27from .architecture_support import DpkgArchitectureBuildProcessValuesTable
28from .builtin_manifest_rules import builtin_mode_normalization_rules
29from .debhelper_emulation import (
30 dhe_dbgsym_root_dir,
31 assert_no_dbgsym_migration,
32 read_dbgsym_file,
33)
34from .exceptions import DebputySubstitutionError
35from .filesystem_scan import FSPath, FSRootDir, FSROOverlay
36from .installations import (
37 InstallRule,
38 SourcePathMatcher,
39 PathAlreadyInstalledOrDiscardedError,
40 NoMatchForInstallPatternError,
41 InstallRuleContext,
42 BinaryPackageInstallRuleContext,
43 InstallSearchDirContext,
44 SearchDir,
45)
46from .intermediate_manifest import TarMember, PathType, IntermediateManifest
47from .maintscript_snippet import (
48 DpkgMaintscriptHelperCommand,
49 MaintscriptSnippetContainer,
50)
51from .manifest_conditions import ConditionContext
52from .manifest_parser.base_types import FileSystemMatchRule, FileSystemExactMatchRule
53from .manifest_parser.util import AttributePath
54from .packager_provided_files import PackagerProvidedFile
55from .packages import BinaryPackage, SourcePackage
56from .plugin.api.feature_set import PluginProvidedFeatureSet
57from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
58from .plugin.api.impl_types import (
59 PackageProcessingContextProvider,
60 PackageDataTable,
61)
62from .plugin.api.spec import FlushableSubstvars, VirtualPath
63from .plugin.debputy.binary_package_rules import ServiceRule
64from .substitution import Substitution
65from .transformation_rules import (
66 TransformationRule,
67 ModeNormalizationTransformationRule,
68 NormalizeShebangLineTransformation,
69)
70from .util import (
71 _error,
72 _warn,
73 debian_policy_normalize_symlink_target,
74 generated_content_dir,
75 _info,
76)
77from .yaml import MANIFEST_YAML
78from .yaml.compat import CommentedMap, CommentedSeq
81@dataclass(slots=True)
82class DbgsymInfo:
83 dbgsym_fs_root: FSPath
84 dbgsym_ids: List[str]
87@dataclass(slots=True, frozen=True)
88class BinaryPackageData:
89 source_package: SourcePackage
90 binary_package: BinaryPackage
91 binary_staging_root_dir: str
92 control_output_dir: Optional[str]
93 fs_root: FSPath
94 substvars: FlushableSubstvars
95 package_metadata_context: PackageProcessingContextProvider
96 ctrl_creator: BinaryCtrlAccessorProviderCreator
97 dbgsym_info: DbgsymInfo
100@dataclass(slots=True)
101class PackageTransformationDefinition:
102 binary_package: BinaryPackage
103 substitution: Substitution
104 is_auto_generated_package: bool
105 binary_version: Optional[str] = None
106 search_dirs: Optional[List[FileSystemExactMatchRule]] = None
107 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field(
108 default_factory=list
109 )
110 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field(
111 default_factory=dict
112 )
113 transformations: List[TransformationRule] = field(default_factory=list)
114 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field(
115 default_factory=dict
116 )
117 install_rules: List[InstallRule] = field(default_factory=list)
118 requested_service_rules: List[ServiceRule] = field(default_factory=list)
121def _path_to_tar_member(
122 path: FSPath,
123 clamp_mtime_to: int,
124) -> TarMember:
125 mtime = float(clamp_mtime_to)
126 owner, uid, group, gid = path.tar_owner_info
127 mode = path.mode
129 if path.has_fs_path:
130 mtime = min(mtime, path.mtime)
132 if path.is_dir:
133 path_type = PathType.DIRECTORY
134 elif path.is_file:
135 # TODO: someday we will need to deal with hardlinks and it might appear here.
136 path_type = PathType.FILE
137 elif path.is_symlink: 137 ↛ 157line 137 didn't jump to line 157, because the condition on line 137 was never false
138 # Special-case that we resolve immediately (since we need to normalize the target anyway)
139 link_target = debian_policy_normalize_symlink_target(
140 path.path,
141 path.readlink(),
142 )
143 return TarMember.virtual_path(
144 path.tar_path,
145 PathType.SYMLINK,
146 mtime,
147 link_target=link_target,
148 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
149 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
150 mode=0o0777,
151 owner=owner,
152 uid=uid,
153 group=group,
154 gid=gid,
155 )
156 else:
157 assert not path.is_symlink
158 raise AssertionError(
159 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
160 )
162 if not path.has_fs_path:
163 assert not path.is_file
164 return TarMember.virtual_path(
165 path.tar_path,
166 path_type,
167 mtime,
168 mode=mode,
169 owner=owner,
170 uid=uid,
171 group=group,
172 gid=gid,
173 )
174 may_steal_fs_path = path._can_replace_inline
175 return TarMember.from_file(
176 path.tar_path,
177 path.fs_path,
178 mode=mode,
179 uid=uid,
180 owner=owner,
181 gid=gid,
182 group=group,
183 path_type=path_type,
184 path_mtime=mtime,
185 clamp_mtime_to=clamp_mtime_to,
186 may_steal_fs_path=may_steal_fs_path,
187 )
190def _generate_intermediate_manifest(
191 fs_root: FSPath,
192 clamp_mtime_to: int,
193) -> Iterable[TarMember]:
194 symlinks = []
195 for path in fs_root.all_paths():
196 tar_member = _path_to_tar_member(path, clamp_mtime_to)
197 if tar_member.path_type == PathType.SYMLINK:
198 symlinks.append(tar_member)
199 continue
200 yield tar_member
201 yield from symlinks
204ST = TypeVar("ST")
205T = TypeVar("T")
208class AbstractYAMLSubStore(Generic[ST]):
209 def __init__(
210 self,
211 parent_store: Any,
212 parent_key: Optional[Union[int, str]],
213 store: Optional[ST] = None,
214 ) -> None:
215 if parent_store is not None and parent_key is not None:
216 try:
217 from_parent_store = parent_store[parent_key]
218 except (KeyError, IndexError):
219 from_parent_store = None
220 if ( 220 ↛ 225line 220 didn't jump to line 225
221 store is not None
222 and from_parent_store is not None
223 and store is not parent_store
224 ):
225 raise ValueError(
226 "Store is provided but is not the one already in the parent store"
227 )
228 if store is None: 228 ↛ 230line 228 didn't jump to line 230, because the condition on line 228 was never false
229 store = from_parent_store
230 self._parent_store = parent_store
231 self._parent_key = parent_key
232 self._is_detached = (
233 parent_key is None or parent_store is None or parent_key not in parent_store
234 )
235 assert self._is_detached or store is not None
236 if store is None:
237 store = self._create_new_instance()
238 self._store: ST = store
240 def _create_new_instance(self) -> ST:
241 raise NotImplementedError
243 def create_definition_if_missing(self) -> None:
244 if self._is_detached:
245 self.create_definition()
247 def create_definition(self) -> None:
248 if not self._is_detached: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true
249 raise RuntimeError("Definition is already present")
250 parent_store = self._parent_store
251 if parent_store is None: 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true
252 raise RuntimeError(
253 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
254 )
255 if isinstance(parent_store, list):
256 assert self._parent_key is None
257 self._parent_key = len(parent_store)
258 self._parent_store.append(self._store)
259 else:
260 parent_store[self._parent_key] = self._store
261 self._is_detached = False
263 def remove_definition(self) -> None:
264 self._ensure_attached()
265 del self._parent_store[self._parent_key]
266 if isinstance(self._parent_store, list):
267 self._parent_key = None
268 self._is_detached = True
270 def _ensure_attached(self) -> None:
271 if self._is_detached:
272 raise RuntimeError("The definition has been removed!")
275class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]):
276 def _create_new_instance(self) -> List[T]:
277 return CommentedSeq()
280class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]):
281 def _create_new_instance(self) -> Dict[str, T]:
282 return CommentedMap()
285class MutableCondition:
286 @classmethod
287 def arch_matches(cls, arch_filter: str) -> CommentedMap:
288 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
290 @classmethod
291 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
292 return CommentedMap(
293 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
294 )
297class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
298 @classmethod
299 def new_symlink(
300 cls, link_path: str, link_target: str, condition: Optional[Any]
301 ) -> "MutableYAMLSymlink":
302 inner = {
303 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
304 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
305 }
306 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
307 if condition is not None: 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true
308 inner["when"] = condition
309 return cls(None, None, store=CommentedMap(content))
311 @property
312 def symlink_path(self) -> str:
313 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
314 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
315 ]
317 @symlink_path.setter
318 def symlink_path(self, path: str) -> None:
319 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
320 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
321 ] = path
323 @property
324 def symlink_target(self) -> Optional[str]:
325 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
326 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
327 ]
329 @symlink_target.setter
330 def symlink_target(self, target: str) -> None:
331 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
332 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
333 ] = target
336class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
337 @classmethod
338 def rm_conffile(
339 cls,
340 conffile: str,
341 prior_to_version: Optional[str],
342 owning_package: Optional[str],
343 ) -> "MutableYAMLConffileManagementItem":
344 r = cls(
345 None,
346 None,
347 store=CommentedMap(
348 {
349 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
350 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
351 )
352 }
353 ),
354 )
355 r.prior_to_version = prior_to_version
356 r.owning_package = owning_package
357 return r
359 @classmethod
360 def mv_conffile(
361 cls,
362 old_conffile: str,
363 new_conffile: str,
364 prior_to_version: Optional[str],
365 owning_package: Optional[str],
366 ) -> "MutableYAMLConffileManagementItem":
367 r = cls(
368 None,
369 None,
370 store=CommentedMap(
371 {
372 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
373 {
374 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
375 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
376 }
377 )
378 }
379 ),
380 )
381 r.prior_to_version = prior_to_version
382 r.owning_package = owning_package
383 return r
385 @property
386 def _container(self) -> Dict[str, Any]:
387 assert len(self._store) == 1
388 return next(iter(self._store.values()))
390 @property
391 def command(self) -> str:
392 assert len(self._store) == 1
393 return next(iter(self._store))
395 @property
396 def obsolete_conffile(self) -> str:
397 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
398 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
399 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
400 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
402 @obsolete_conffile.setter
403 def obsolete_conffile(self, value: str) -> None:
404 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
405 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
406 else:
407 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
408 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
410 @property
411 def new_conffile(self) -> str:
412 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
413 raise TypeError(
414 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
415 f" This is a {self.command}"
416 )
417 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
419 @new_conffile.setter
420 def new_conffile(self, value: str) -> None:
421 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
422 raise TypeError(
423 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
424 f" This is a {self.command}"
425 )
426 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
428 @property
429 def prior_to_version(self) -> Optional[str]:
430 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
432 @prior_to_version.setter
433 def prior_to_version(self, value: Optional[str]) -> None:
434 if value is None:
435 try:
436 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
437 except KeyError:
438 pass
439 else:
440 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
442 @property
443 def owning_package(self) -> Optional[str]:
444 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
446 @owning_package.setter
447 def owning_package(self, value: Optional[str]) -> None:
448 if value is None:
449 try:
450 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
451 except KeyError:
452 pass
453 else:
454 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
457class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
458 def _list_store(
459 self, key, *, create_if_absent: bool = False
460 ) -> Optional[List[Dict[str, Any]]]:
461 if self._is_detached or key not in self._store:
462 if create_if_absent: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true
463 return None
464 self.create_definition_if_missing()
465 self._store[key] = []
466 return self._store[key]
468 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
469 parent_store = self._list_store(key, create_if_absent=True)
470 assert parent_store is not None
471 if not item._is_detached or ( 471 ↛ 474line 471 didn't jump to line 474, because the condition on line 471 was never true
472 item._parent_store is not None and item._parent_store is not parent_store
473 ):
474 raise RuntimeError(
475 "Item is already attached or associated with a different container"
476 )
477 item._parent_store = parent_store
478 item.create_definition()
480 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
481 self._insert_item(MK_TRANSFORMATIONS, symlink)
483 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
484 store = self._list_store(MK_TRANSFORMATIONS)
485 if store is None: 485 ↛ 486line 485 didn't jump to line 486, because the condition on line 485 was never true
486 return
487 for i in range(len(store)): 487 ↛ 488line 487 didn't jump to line 488, because the loop on line 487 never started
488 d = store[i]
489 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
490 yield MutableYAMLSymlink(store, i)
492 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
493 store = self._list_store(MK_CONFFILE_MANAGEMENT)
494 if store is None: 494 ↛ 495line 494 didn't jump to line 495, because the condition on line 494 was never true
495 return
496 yield from (
497 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
498 )
500 def add_conffile_management(
501 self, conffile_management_item: MutableYAMLConffileManagementItem
502 ) -> None:
503 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
506class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
507 @property
508 def _container(self) -> Dict[str, Any]:
509 assert len(self._store) == 1
510 return next(iter(self._store.values()))
512 @property
513 def into(self) -> Optional[List[str]]:
514 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
515 if v is None:
516 return None
517 if isinstance(v, str):
518 return [v]
519 return v
521 @into.setter
522 def into(self, new_value: Optional[Union[str, List[str]]]) -> None:
523 if new_value is None: 523 ↛ 527line 523 didn't jump to line 527, because the condition on line 523 was never false
524 with suppress(KeyError):
525 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
526 return
527 if isinstance(new_value, str):
528 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
529 return
530 new_list = CommentedSeq(new_value)
531 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
533 @property
534 def when(self) -> Optional[Union[str, Mapping[str, Any]]]:
535 return self._container[MK_CONDITION_WHEN]
537 @when.setter
538 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None:
539 if new_value is None: 539 ↛ 540line 539 didn't jump to line 540, because the condition on line 539 was never true
540 with suppress(KeyError):
541 del self._container[MK_CONDITION_WHEN]
542 return
543 if isinstance(new_value, str): 543 ↛ 544line 543 didn't jump to line 544, because the condition on line 543 was never true
544 self._container[MK_CONDITION_WHEN] = new_value
545 return
546 new_map = CommentedMap(new_value)
547 self._container[MK_CONDITION_WHEN] = new_map
549 @classmethod
550 def install_dest(
551 cls,
552 sources: Union[str, List[str]],
553 into: Optional[Union[str, List[str]]],
554 *,
555 dest_dir: Optional[str] = None,
556 when: Optional[Union[str, Mapping[str, Any]]] = None,
557 ) -> "MutableYAMLInstallRuleInstall":
558 k = MK_INSTALLATIONS_INSTALL_SOURCES
559 if isinstance(sources, str):
560 k = MK_INSTALLATIONS_INSTALL_SOURCE
561 r = MutableYAMLInstallRuleInstall(
562 None,
563 None,
564 store=CommentedMap(
565 {
566 MK_INSTALLATIONS_INSTALL: CommentedMap(
567 {
568 k: sources,
569 }
570 )
571 }
572 ),
573 )
574 r.dest_dir = dest_dir
575 r.into = into
576 if when is not None:
577 r.when = when
578 return r
580 @classmethod
581 def multi_dest_install(
582 cls,
583 sources: Union[str, List[str]],
584 dest_dirs: Sequence[str],
585 into: Optional[Union[str, List[str]]],
586 *,
587 when: Optional[Union[str, Mapping[str, Any]]] = None,
588 ) -> "MutableYAMLInstallRuleInstall":
589 k = MK_INSTALLATIONS_INSTALL_SOURCES
590 if isinstance(sources, str): 590 ↛ 592line 590 didn't jump to line 592, because the condition on line 590 was never false
591 k = MK_INSTALLATIONS_INSTALL_SOURCE
592 r = MutableYAMLInstallRuleInstall(
593 None,
594 None,
595 store=CommentedMap(
596 {
597 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
598 {
599 k: sources,
600 "dest-dirs": dest_dirs,
601 }
602 )
603 }
604 ),
605 )
606 r.into = into
607 if when is not None: 607 ↛ 608line 607 didn't jump to line 608, because the condition on line 607 was never true
608 r.when = when
609 return r
611 @classmethod
612 def install_as(
613 cls,
614 source: str,
615 install_as: str,
616 into: Optional[Union[str, List[str]]],
617 when: Optional[Union[str, Mapping[str, Any]]] = None,
618 ) -> "MutableYAMLInstallRuleInstall":
619 r = MutableYAMLInstallRuleInstall(
620 None,
621 None,
622 store=CommentedMap(
623 {
624 MK_INSTALLATIONS_INSTALL: CommentedMap(
625 {
626 MK_INSTALLATIONS_INSTALL_SOURCE: source,
627 MK_INSTALLATIONS_INSTALL_AS: install_as,
628 }
629 )
630 }
631 ),
632 )
633 r.into = into
634 if when is not None: 634 ↛ 635line 634 didn't jump to line 635, because the condition on line 634 was never true
635 r.when = when
636 return r
638 @classmethod
639 def install_doc_as(
640 cls,
641 source: str,
642 install_as: str,
643 into: Optional[Union[str, List[str]]],
644 when: Optional[Union[str, Mapping[str, Any]]] = None,
645 ) -> "MutableYAMLInstallRuleInstall":
646 r = MutableYAMLInstallRuleInstall(
647 None,
648 None,
649 store=CommentedMap(
650 {
651 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
652 {
653 MK_INSTALLATIONS_INSTALL_SOURCE: source,
654 MK_INSTALLATIONS_INSTALL_AS: install_as,
655 }
656 )
657 }
658 ),
659 )
660 r.into = into
661 if when is not None:
662 r.when = when
663 return r
665 @classmethod
666 def install_docs(
667 cls,
668 sources: Union[str, List[str]],
669 into: Optional[Union[str, List[str]]],
670 *,
671 dest_dir: Optional[str] = None,
672 when: Optional[Union[str, Mapping[str, Any]]] = None,
673 ) -> "MutableYAMLInstallRuleInstall":
674 k = MK_INSTALLATIONS_INSTALL_SOURCES
675 if isinstance(sources, str):
676 k = MK_INSTALLATIONS_INSTALL_SOURCE
677 r = MutableYAMLInstallRuleInstall(
678 None,
679 None,
680 store=CommentedMap(
681 {
682 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
683 {
684 k: sources,
685 }
686 )
687 }
688 ),
689 )
690 r.into = into
691 r.dest_dir = dest_dir
692 if when is not None:
693 r.when = when
694 return r
696 @classmethod
697 def install_examples(
698 cls,
699 sources: Union[str, List[str]],
700 into: Optional[Union[str, List[str]]],
701 when: Optional[Union[str, Mapping[str, Any]]] = None,
702 ) -> "MutableYAMLInstallRuleInstallExamples":
703 k = MK_INSTALLATIONS_INSTALL_SOURCES
704 if isinstance(sources, str):
705 k = MK_INSTALLATIONS_INSTALL_SOURCE
706 r = MutableYAMLInstallRuleInstallExamples(
707 None,
708 None,
709 store=CommentedMap(
710 {
711 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
712 {
713 k: sources,
714 }
715 )
716 }
717 ),
718 )
719 r.into = into
720 if when is not None: 720 ↛ 721line 720 didn't jump to line 721, because the condition on line 720 was never true
721 r.when = when
722 return r
724 @classmethod
725 def install_man(
726 cls,
727 sources: Union[str, List[str]],
728 into: Optional[Union[str, List[str]]],
729 language: Optional[str],
730 when: Optional[Union[str, Mapping[str, Any]]] = None,
731 ) -> "MutableYAMLInstallRuleMan":
732 k = MK_INSTALLATIONS_INSTALL_SOURCES
733 if isinstance(sources, str): 733 ↛ 734line 733 didn't jump to line 734, because the condition on line 733 was never true
734 k = MK_INSTALLATIONS_INSTALL_SOURCE
735 r = MutableYAMLInstallRuleMan(
736 None,
737 None,
738 store=CommentedMap(
739 {
740 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
741 {
742 k: sources,
743 }
744 )
745 }
746 ),
747 )
748 r.language = language
749 r.into = into
750 if when is not None: 750 ↛ 751line 750 didn't jump to line 751, because the condition on line 750 was never true
751 r.when = when
752 return r
754 @classmethod
755 def discard(
756 cls,
757 sources: Union[str, List[str]],
758 ) -> "MutableYAMLInstallRuleDiscard":
759 return MutableYAMLInstallRuleDiscard(
760 None,
761 None,
762 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
763 )
766class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
767 pass
770class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
771 @property
772 def language(self) -> Optional[str]:
773 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
775 @language.setter
776 def language(self, new_value: Optional[str]) -> None:
777 if new_value is not None:
778 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
779 return
780 with suppress(KeyError):
781 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
784class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
785 pass
788class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
789 @property
790 def sources(self) -> List[str]:
791 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
792 if isinstance(v, str):
793 return [v]
794 return v
796 @sources.setter
797 def sources(self, new_value: Union[str, List[str]]) -> None:
798 if isinstance(new_value, str):
799 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
800 return
801 new_list = CommentedSeq(new_value)
802 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
804 @property
805 def dest_dir(self) -> Optional[str]:
806 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
808 @dest_dir.setter
809 def dest_dir(self, new_value: Optional[str]) -> None:
810 if new_value is not None and self.dest_as is not None: 810 ↛ 811line 810 didn't jump to line 811, because the condition on line 810 was never true
811 raise ValueError(
812 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
813 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
814 )
815 if new_value is not None:
816 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
817 else:
818 with suppress(KeyError):
819 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
821 @property
822 def dest_as(self) -> Optional[str]:
823 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
825 @dest_as.setter
826 def dest_as(self, new_value: Optional[str]) -> None:
827 if new_value is not None:
828 if self.dest_dir is not None:
829 raise ValueError(
830 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
831 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
832 )
834 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
835 if isinstance(sources, list):
836 if len(sources) != 1:
837 raise ValueError(
838 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
839 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
840 )
841 self.sources = sources[0]
842 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
843 else:
844 with suppress(KeyError):
845 del self._container[MK_INSTALLATIONS_INSTALL_AS]
848class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
849 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
850 parent_store = self._store
851 if not install_rule._is_detached or ( 851 ↛ 855line 851 didn't jump to line 855, because the condition on line 851 was never true
852 install_rule._parent_store is not None
853 and install_rule._parent_store is not parent_store
854 ):
855 raise RuntimeError(
856 "Item is already attached or associated with a different container"
857 )
858 self.create_definition_if_missing()
859 install_rule._parent_store = parent_store
860 install_rule.create_definition()
862 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
863 parent_store = self._store
864 for install_rule in install_rules:
865 if not install_rule._is_detached or ( 865 ↛ 869line 865 didn't jump to line 869, because the condition on line 865 was never true
866 install_rule._parent_store is not None
867 and install_rule._parent_store is not parent_store
868 ):
869 raise RuntimeError(
870 "Item is already attached or associated with a different container"
871 )
872 self.create_definition_if_missing()
873 install_rule._parent_store = parent_store
874 install_rule.create_definition()
877class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
878 @property
879 def variables(self) -> Dict[str, Any]:
880 return self._store
882 def __setitem__(self, key: str, value: Any) -> None:
883 self._store[key] = value
884 self.create_definition_if_missing()
887class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
888 def manifest_variables(
889 self, *, create_if_absent: bool = True
890 ) -> MutableYAMLManifestVariables:
891 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
892 if create_if_absent: 892 ↛ 893line 892 didn't jump to line 893, because the condition on line 892 was never true
893 d.create_definition_if_missing()
894 return d
897class MutableYAMLManifest:
898 def __init__(self, store: Any) -> None:
899 self._store = store
901 @classmethod
902 def empty_manifest(cls) -> "MutableYAMLManifest":
903 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
905 @property
906 def manifest_version(self) -> str:
907 return self._store[MK_MANIFEST_VERSION]
909 @manifest_version.setter
910 def manifest_version(self, version: str) -> None:
911 if version not in SUPPORTED_MANIFEST_VERSIONS:
912 raise ValueError("Unsupported version")
913 self._store[MK_MANIFEST_VERSION] = version
915 def installations(
916 self,
917 *,
918 create_if_absent: bool = True,
919 ) -> MutableYAMLInstallationsDefinition:
920 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
921 if create_if_absent: 921 ↛ 922line 921 didn't jump to line 922, because the condition on line 921 was never true
922 d.create_definition_if_missing()
923 return d
925 def manifest_definitions(
926 self,
927 *,
928 create_if_absent: bool = True,
929 ) -> MutableYAMLManifestDefinitions:
930 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
931 if create_if_absent: 931 ↛ 932line 931 didn't jump to line 932, because the condition on line 931 was never true
932 d.create_definition_if_missing()
933 return d
935 def package(
936 self, name: str, *, create_if_absent: bool = True
937 ) -> MutableYAMLPackageDefinition:
938 if MK_PACKAGES not in self._store: 938 ↛ 940line 938 didn't jump to line 940, because the condition on line 938 was never false
939 self._store[MK_PACKAGES] = CommentedMap()
940 packages_store = self._store[MK_PACKAGES]
941 package = packages_store.get(name)
942 if package is None: 942 ↛ 949line 942 didn't jump to line 949, because the condition on line 942 was never false
943 if not create_if_absent: 943 ↛ 944line 943 didn't jump to line 944, because the condition on line 943 was never true
944 raise KeyError(name)
945 assert packages_store is not None
946 d = MutableYAMLPackageDefinition(packages_store, name)
947 d.create_definition()
948 else:
949 d = MutableYAMLPackageDefinition(packages_store, name)
950 return d
952 def write_to(self, fd) -> None:
953 MANIFEST_YAML.dump(self._store, fd)
956def _describe_missing_path(entry: VirtualPath) -> str:
957 if entry.is_dir:
958 return f"{entry.fs_path}/ (empty directory; possible integration point)"
959 if entry.is_symlink:
960 target = os.readlink(entry.fs_path)
961 return f"{entry.fs_path} (symlink; links to {target})"
962 if entry.is_file:
963 return f"{entry.fs_path} (file)"
964 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
967def _detect_missing_installations(
968 path_matcher: SourcePathMatcher,
969 search_dir: VirtualPath,
970) -> None:
971 if not os.path.isdir(search_dir.fs_path): 971 ↛ 973line 971 didn't jump to line 973, because the condition on line 971 was never false
972 return
973 missing = list(path_matcher.detect_missing(search_dir))
974 if not missing:
975 return
977 _warn(
978 f"The following paths were present in {search_dir.fs_path}, but not installed (nor explicitly discarded)."
979 )
980 _warn("")
981 for entry in missing:
982 desc = _describe_missing_path(entry)
983 _warn(f" * {desc}")
984 _warn("")
986 excl = textwrap.dedent(
987 """\
988 - discard: "*"
989 """
990 )
992 _error(
993 "Please review the list and add either install rules or exclusions to `installations` in"
994 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
995 f" end of your 'installations`:\n\n{excl}\n"
996 )
999def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1000 used_discard_rules = path_matcher.used_auto_discard_rules
1001 # Discard rules can match and then be overridden. In that case, they appear
1002 # but have 0 matches.
1003 if not sum((len(v) for v in used_discard_rules.values()), 0):
1004 return
1005 _info("The following automatic discard rules were triggered:")
1006 example_path: Optional[str] = None
1007 for rule in sorted(used_discard_rules):
1008 for fs_path in sorted(used_discard_rules[rule]):
1009 if example_path is None: 1009 ↛ 1011line 1009 didn't jump to line 1011, because the condition on line 1009 was never false
1010 example_path = fs_path
1011 _info(f" * {rule} -> {fs_path}")
1012 assert example_path is not None
1013 _info("")
1014 _info(
1015 "Note that some of these may have been overruled. The overrule detection logic is not"
1016 )
1017 _info("100% reliable.")
1018 _info("")
1019 _info(
1020 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1021 )
1022 _info(" installations:")
1023 _info(" - install:")
1024 _info(f" source: {example_path}")
1027def _install_everything_from_source_dir_if_present(
1028 dctrl_bin: BinaryPackage,
1029 substitution: Substitution,
1030 path_matcher: SourcePathMatcher,
1031 install_rule_context: InstallRuleContext,
1032 source_condition_context: ConditionContext,
1033 source_dir: VirtualPath,
1034 *,
1035 into_dir: Optional[VirtualPath] = None,
1036) -> None:
1037 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1038 pkg_set = frozenset([dctrl_bin])
1039 install_rule = InstallRule.install_dest(
1040 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1041 None,
1042 pkg_set,
1043 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1044 None,
1045 )
1046 pkg_search_dir: Tuple[SearchDir] = (
1047 SearchDir(
1048 source_dir,
1049 pkg_set,
1050 ),
1051 )
1052 replacements = {
1053 "search_dirs": pkg_search_dir,
1054 }
1055 if into_dir is not None: 1055 ↛ 1056line 1055 didn't jump to line 1056, because the condition on line 1055 was never true
1056 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1057 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1058 binary_package_contexts[dctrl_bin.name] = updated
1059 replacements["binary_package_contexts"] = binary_package_contexts
1061 fake_install_rule_context = install_rule_context.replace(**replacements)
1062 try:
1063 install_rule.perform_install(
1064 path_matcher,
1065 fake_install_rule_context,
1066 source_condition_context,
1067 )
1068 except (
1069 NoMatchForInstallPatternError,
1070 PathAlreadyInstalledOrDiscardedError,
1071 ):
1072 # Empty directory or everything excluded by default; ignore the error
1073 pass
1076class HighLevelManifest:
1077 def __init__(
1078 self,
1079 manifest_path: str,
1080 mutable_manifest: Optional[MutableYAMLManifest],
1081 install_rules: Optional[List[InstallRule]],
1082 source_package: SourcePackage,
1083 binary_packages: Mapping[str, BinaryPackage],
1084 substitution: Substitution,
1085 package_transformations: Mapping[str, PackageTransformationDefinition],
1086 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1087 dpkg_arch_query_table: DpkgArchTable,
1088 build_env: DebBuildOptionsAndProfiles,
1089 plugin_provided_feature_set: PluginProvidedFeatureSet,
1090 debian_dir: VirtualPath,
1091 ) -> None:
1092 self.manifest_path = manifest_path
1093 self.mutable_manifest = mutable_manifest
1094 self._install_rules = install_rules
1095 self._source_package = source_package
1096 self._binary_packages = binary_packages
1097 self.substitution = substitution
1098 self.package_transformations = package_transformations
1099 self._dpkg_architecture_variables = dpkg_architecture_variables
1100 self._dpkg_arch_query_table = dpkg_arch_query_table
1101 self._build_env = build_env
1102 self._used_for: Set[str] = set()
1103 self._plugin_provided_feature_set = plugin_provided_feature_set
1104 self._debian_dir = debian_dir
1106 def source_version(self, include_binnmu_version: bool = True) -> str:
1107 # TODO: There should an easier way to determine the source version; really.
1108 version_var = "{{DEB_VERSION}}"
1109 if not include_binnmu_version:
1110 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1111 try:
1112 return self.substitution.substitute(
1113 version_var, "internal (resolve version)"
1114 )
1115 except DebputySubstitutionError as e:
1116 raise AssertionError(f"Could not resolve {version_var}") from e
1118 @property
1119 def debian_dir(self) -> VirtualPath:
1120 return self._debian_dir
1122 @property
1123 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1124 return self._dpkg_architecture_variables
1126 @property
1127 def build_env(self) -> DebBuildOptionsAndProfiles:
1128 return self._build_env
1130 @property
1131 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1132 return self._plugin_provided_feature_set
1134 @property
1135 def active_packages(self) -> Iterable[BinaryPackage]:
1136 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1138 @property
1139 def all_packages(self) -> Iterable[BinaryPackage]:
1140 yield from self._binary_packages.values()
1142 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1143 return self.package_transformations[package]
1145 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1146 name = package.name
1147 # If it is not a -doc package, then docs should be installed
1148 # under its own package name.
1149 if not name.endswith("-doc"): 1149 ↛ 1151line 1149 didn't jump to line 1151, because the condition on line 1149 was never false
1150 return package
1151 name = name[:-4]
1152 main_package = self._binary_packages.get(name)
1153 if main_package:
1154 return main_package
1155 if name.startswith("lib"):
1156 dev_pkg = self._binary_packages.get(f"{name}-dev")
1157 if dev_pkg:
1158 return dev_pkg
1160 # If we found no better match; default to the doc package itself.
1161 return package
1163 def perform_installations(
1164 self,
1165 *,
1166 install_request_context: Optional[InstallSearchDirContext] = None,
1167 enable_manifest_installation_feature: bool = True,
1168 ) -> PackageDataTable:
1169 package_data_dict = {}
1170 package_data_table = PackageDataTable(package_data_dict)
1171 if install_request_context is None: 1171 ↛ 1173line 1171 didn't jump to line 1173, because the condition on line 1171 was never true
1173 @functools.lru_cache(None)
1174 def _as_path(fs_path: str) -> VirtualPath:
1175 return FSROOverlay.create_root_dir(".", fs_path)
1177 dtmp_dir = _as_path("debian/tmp")
1178 source_root_dir = _as_path(".")
1179 into = frozenset(self._binary_packages.values())
1180 default_search_dirs = [dtmp_dir]
1181 per_package_search_dirs = {
1182 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1183 for t in self.package_transformations.values()
1184 if t.search_dirs is not None
1185 }
1186 search_dirs = _determine_search_dir_order(
1187 per_package_search_dirs,
1188 into,
1189 default_search_dirs,
1190 source_root_dir,
1191 )
1192 check_for_uninstalled_dirs = tuple(
1193 s.search_dir
1194 for s in search_dirs
1195 if s.search_dir.fs_path != source_root_dir.fs_path
1196 )
1197 _present_installation_dirs(search_dirs, check_for_uninstalled_dirs, into)
1198 else:
1199 dtmp_dir = None
1200 search_dirs = install_request_context.search_dirs
1201 into = frozenset(self._binary_packages.values())
1202 seen = set()
1203 for search_dir in search_dirs:
1204 seen.update(search_dir.applies_to)
1206 missing = into - seen
1207 if missing: 1207 ↛ 1208line 1207 didn't jump to line 1208, because the condition on line 1207 was never true
1208 names = ", ".join(p.name for p in missing)
1209 raise ValueError(
1210 f"The following package(s) had no search dirs: {names}."
1211 " (Generally, the source root would be applicable to all packages)"
1212 )
1213 extra_names = seen - into
1214 if extra_names: 1214 ↛ 1215line 1214 didn't jump to line 1215, because the condition on line 1214 was never true
1215 names = ", ".join(p.name for p in extra_names)
1216 raise ValueError(
1217 f"The install_request_context referenced the following unknown package(s): {names}"
1218 )
1220 check_for_uninstalled_dirs = (
1221 install_request_context.check_for_uninstalled_dirs
1222 )
1224 install_rule_context = InstallRuleContext(search_dirs)
1226 if ( 1226 ↛ 1232line 1226 didn't jump to line 1232
1227 enable_manifest_installation_feature
1228 and self._install_rules is None
1229 and dtmp_dir is not None
1230 and os.path.isdir(dtmp_dir.fs_path)
1231 ):
1232 msg = (
1233 "The build system appears to have provided the output of upstream build system's"
1234 " install in debian/tmp. However, these are no provisions for debputy to install"
1235 " any of that into any of the debian packages listed in debian/control."
1236 " To avoid accidentally creating empty packages, debputy will insist that you "
1237 " explicitly define an empty installation definition if you did not want to "
1238 " install any of those files even though they have been provided."
1239 ' Example: "installations: []"'
1240 )
1241 _error(msg)
1242 elif ( 1242 ↛ 1245line 1242 didn't jump to line 1245
1243 not enable_manifest_installation_feature and self._install_rules is not None
1244 ):
1245 _error(
1246 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1247 f" Please remove or comment out the `installations` keyword."
1248 )
1250 for dctrl_bin in self.all_packages:
1251 package = dctrl_bin.name
1252 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1254 install_rule_context[package] = BinaryPackageInstallRuleContext(
1255 dctrl_bin,
1256 FSRootDir(),
1257 doc_main_package,
1258 )
1260 if enable_manifest_installation_feature: 1260 ↛ 1265line 1260 didn't jump to line 1265
1261 discard_rules = list(
1262 self.plugin_provided_feature_set.auto_discard_rules.values()
1263 )
1264 else:
1265 discard_rules = [
1266 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1267 ]
1268 path_matcher = SourcePathMatcher(discard_rules)
1270 source_condition_context = ConditionContext(
1271 binary_package=None,
1272 substitution=self.substitution,
1273 build_env=self._build_env,
1274 dpkg_architecture_variables=self._dpkg_architecture_variables,
1275 dpkg_arch_query_table=self._dpkg_arch_query_table,
1276 )
1278 for dctrl_bin in self.active_packages:
1279 package = dctrl_bin.name
1280 if install_request_context: 1280 ↛ 1285line 1280 didn't jump to line 1285, because the condition on line 1280 was never false
1281 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1282 package
1283 )
1284 else:
1285 build_system_staging_dir_fs_path = os.path.join("debian", package)
1286 if os.path.isdir(build_system_staging_dir_fs_path):
1287 build_system_staging_dir = FSROOverlay.create_root_dir(
1288 ".",
1289 build_system_staging_dir_fs_path,
1290 )
1291 else:
1292 build_system_staging_dir = None
1294 if build_system_staging_dir is not None:
1295 _install_everything_from_source_dir_if_present(
1296 dctrl_bin,
1297 self.substitution,
1298 path_matcher,
1299 install_rule_context,
1300 source_condition_context,
1301 build_system_staging_dir,
1302 )
1304 if self._install_rules:
1305 # FIXME: Check that every install rule remains used after transformations have run.
1306 # What we want to check is transformations do not exclude everything from an install
1307 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1308 # match.
1309 for install_rule in self._install_rules:
1310 install_rule.perform_install(
1311 path_matcher,
1312 install_rule_context,
1313 source_condition_context,
1314 )
1316 if enable_manifest_installation_feature: 1316 ↛ 1320line 1316 didn't jump to line 1320, because the condition on line 1316 was never false
1317 for search_dir in check_for_uninstalled_dirs:
1318 _detect_missing_installations(path_matcher, search_dir)
1320 for dctrl_bin in self.all_packages:
1321 package = dctrl_bin.name
1322 binary_install_rule_context = install_rule_context[package]
1323 build_system_pkg_staging_dir = os.path.join("debian", package)
1324 fs_root = binary_install_rule_context.fs_root
1326 context = self.package_transformations[package]
1327 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 1327 ↛ 1335line 1327 didn't jump to line 1335, because the condition on line 1327 was never false
1328 for special_install_rule in context.install_rules: 1328 ↛ 1329line 1328 didn't jump to line 1329, because the loop on line 1328 never started
1329 special_install_rule.perform_install(
1330 path_matcher,
1331 install_rule_context,
1332 source_condition_context,
1333 )
1335 if dctrl_bin.should_be_acted_on: 1335 ↛ 1347line 1335 didn't jump to line 1347, because the condition on line 1335 was never false
1336 self.apply_fs_transformations(package, fs_root)
1337 substvars_file = f"debian/{package}.substvars"
1338 substvars = FlushableSubstvars.load_from_path(
1339 substvars_file, missing_ok=True
1340 )
1341 # We do not want to touch the substvars file (non-clean rebuild contamination)
1342 substvars.substvars_path = None
1343 control_output_dir = generated_content_dir(
1344 package=dctrl_bin, subdir_key="DEBIAN"
1345 )
1346 else:
1347 substvars = FlushableSubstvars()
1348 control_output_dir = None
1350 udeb_package = self._binary_packages.get(f"{package}-udeb")
1351 if udeb_package and not udeb_package.is_udeb: 1351 ↛ 1352line 1351 didn't jump to line 1352, because the condition on line 1351 was never true
1352 udeb_package = None
1354 package_metadata_context = PackageProcessingContextProvider(
1355 self,
1356 dctrl_bin,
1357 udeb_package,
1358 package_data_table,
1359 # FIXME: source_package
1360 )
1362 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1363 package_metadata_context,
1364 substvars,
1365 context.maintscript_snippets,
1366 context.substitution,
1367 )
1369 if not enable_manifest_installation_feature: 1369 ↛ 1370line 1369 didn't jump to line 1370, because the condition on line 1369 was never true
1370 assert_no_dbgsym_migration(dctrl_bin)
1371 dh_dbgsym_root_fs = FSROOverlay.create_root_dir(
1372 "", dhe_dbgsym_root_dir(dctrl_bin)
1373 )
1374 dbgsym_root_fs = FSRootDir()
1375 _install_everything_from_source_dir_if_present(
1376 dctrl_bin,
1377 self.substitution,
1378 path_matcher,
1379 install_rule_context,
1380 source_condition_context,
1381 dh_dbgsym_root_fs,
1382 into_dir=dbgsym_root_fs,
1383 )
1384 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1385 dbgsym_info = DbgsymInfo(
1386 dbgsym_root_fs,
1387 dbgsym_build_ids,
1388 )
1389 else:
1390 dbgsym_info = DbgsymInfo(
1391 FSRootDir(),
1392 [],
1393 )
1395 package_data_dict[package] = BinaryPackageData(
1396 self._source_package,
1397 dctrl_bin,
1398 build_system_pkg_staging_dir,
1399 control_output_dir,
1400 fs_root,
1401 substvars,
1402 package_metadata_context,
1403 ctrl_creator,
1404 dbgsym_info,
1405 )
1407 _list_automatic_discard_rules(path_matcher)
1409 return package_data_table
1411 def condition_context(
1412 self, binary_package: Optional[Union[BinaryPackage, str]]
1413 ) -> ConditionContext:
1414 if binary_package is None: 1414 ↛ 1415line 1414 didn't jump to line 1415, because the condition on line 1414 was never true
1415 return ConditionContext(
1416 binary_package=None,
1417 substitution=self.substitution,
1418 build_env=self._build_env,
1419 dpkg_architecture_variables=self._dpkg_architecture_variables,
1420 dpkg_arch_query_table=self._dpkg_arch_query_table,
1421 )
1422 if not isinstance(binary_package, str): 1422 ↛ 1423line 1422 didn't jump to line 1423, because the condition on line 1422 was never true
1423 binary_package = binary_package.name
1425 package_transformation = self.package_transformations[binary_package]
1426 return ConditionContext(
1427 binary_package=package_transformation.binary_package,
1428 substitution=package_transformation.substitution,
1429 build_env=self._build_env,
1430 dpkg_architecture_variables=self._dpkg_architecture_variables,
1431 dpkg_arch_query_table=self._dpkg_arch_query_table,
1432 )
1434 def apply_fs_transformations(
1435 self,
1436 package: str,
1437 fs_root: FSPath,
1438 ) -> None:
1439 if package in self._used_for: 1439 ↛ 1440line 1439 didn't jump to line 1440, because the condition on line 1439 was never true
1440 raise ValueError(
1441 f"data.tar contents for {package} has already been finalized!?"
1442 )
1443 if package not in self.package_transformations: 1443 ↛ 1444line 1443 didn't jump to line 1444, because the condition on line 1443 was never true
1444 raise ValueError(
1445 f'The package "{package}" was not relevant for the manifest!?'
1446 )
1447 package_transformation = self.package_transformations[package]
1448 condition_context = ConditionContext(
1449 binary_package=package_transformation.binary_package,
1450 substitution=package_transformation.substitution,
1451 build_env=self._build_env,
1452 dpkg_architecture_variables=self._dpkg_architecture_variables,
1453 dpkg_arch_query_table=self._dpkg_arch_query_table,
1454 )
1455 norm_rules = list(
1456 builtin_mode_normalization_rules(
1457 self._dpkg_architecture_variables,
1458 package_transformation.binary_package,
1459 package_transformation.substitution,
1460 )
1461 )
1462 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1463 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1464 for transformation in package_transformation.transformations:
1465 transformation.transform_file_system(fs_root, condition_context)
1466 interpreter_normalization = NormalizeShebangLineTransformation()
1467 interpreter_normalization.transform_file_system(fs_root, condition_context)
1469 def finalize_data_tar_contents(
1470 self,
1471 package: str,
1472 fs_root: FSPath,
1473 clamp_mtime_to: int,
1474 ) -> IntermediateManifest:
1475 if package in self._used_for: 1475 ↛ 1476line 1475 didn't jump to line 1476, because the condition on line 1475 was never true
1476 raise ValueError(
1477 f"data.tar contents for {package} has already been finalized!?"
1478 )
1479 if package not in self.package_transformations: 1479 ↛ 1480line 1479 didn't jump to line 1480, because the condition on line 1479 was never true
1480 raise ValueError(
1481 f'The package "{package}" was not relevant for the manifest!?'
1482 )
1483 self._used_for.add(package)
1485 # At this point, there so be no further mutations to the file system (because the will not
1486 # be present in the intermediate manifest)
1487 cast("FSRootDir", fs_root).is_read_write = False
1489 intermediate_manifest = list(
1490 _generate_intermediate_manifest(
1491 fs_root,
1492 clamp_mtime_to,
1493 )
1494 )
1495 return intermediate_manifest
1497 def apply_to_binary_staging_directory(
1498 self,
1499 package: str,
1500 fs_root: FSPath,
1501 clamp_mtime_to: int,
1502 ) -> IntermediateManifest:
1503 self.apply_fs_transformations(package, fs_root)
1504 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1507@dataclasses.dataclass(slots=True)
1508class SearchDirOrderState:
1509 search_dir: VirtualPath
1510 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field(
1511 default_factory=set
1512 )
1513 after: Set[str] = dataclasses.field(default_factory=set)
1516def _present_installation_dirs(
1517 search_dirs: Sequence[SearchDir],
1518 checked_missing_dirs: Sequence[VirtualPath],
1519 all_pkgs: FrozenSet[BinaryPackage],
1520) -> None:
1521 _info("The following directories are considered search dirs (in order):")
1522 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1523 for search_dir in search_dirs:
1524 applies_to = ""
1525 if search_dir.applies_to < all_pkgs:
1526 names = ", ".join(p.name for p in search_dir.applies_to)
1527 applies_to = f" [only applicable to: {names}]"
1528 remark = ""
1529 if not os.path.isdir(search_dir.search_dir.fs_path):
1530 remark = " (skipped; absent)"
1531 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1533 if checked_missing_dirs:
1534 _info('The following directories are considered for "not-installed" paths;')
1535 for d in checked_missing_dirs:
1536 remark = ""
1537 if not os.path.isdir(d.fs_path):
1538 remark = " (skipped; absent)"
1539 _info(f" * {d.fs_path:{max_len}}{remark}")
1542def _determine_search_dir_order(
1543 requested: Mapping[BinaryPackage, List[VirtualPath]],
1544 all_pkgs: FrozenSet[BinaryPackage],
1545 default_search_dirs: List[VirtualPath],
1546 source_root: VirtualPath,
1547) -> Sequence[SearchDir]:
1548 search_dir_table = {}
1549 assert requested.keys() <= all_pkgs
1550 for pkg in all_pkgs:
1551 paths = requested.get(pkg, default_search_dirs)
1552 previous_search_dir: Optional[SearchDirOrderState] = None
1553 for path in paths:
1554 try:
1555 search_dir_state = search_dir_table[path.fs_path]
1556 except KeyError:
1557 search_dir_state = SearchDirOrderState(path)
1558 search_dir_table[path.fs_path] = search_dir_state
1559 search_dir_state.applies_to.add(pkg)
1560 if previous_search_dir is not None:
1561 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1562 previous_search_dir = search_dir_state
1564 search_dirs_in_order = []
1565 released = set()
1566 remaining = set()
1567 for search_dir_state in search_dir_table.values():
1568 if not (search_dir_state.after <= released):
1569 remaining.add(search_dir_state.search_dir.fs_path)
1570 continue
1571 search_dirs_in_order.append(search_dir_state)
1572 released.add(search_dir_state.search_dir.fs_path)
1574 while remaining:
1575 current_released = len(released)
1576 for fs_path in remaining:
1577 search_dir_state = search_dir_table[fs_path]
1578 if not search_dir_state.after.issubset(released):
1579 remaining.add(search_dir_state.search_dir.fs_path)
1580 continue
1581 search_dirs_in_order.append(search_dir_state)
1582 released.add(search_dir_state.search_dir.fs_path)
1584 if current_released == len(released):
1585 names = ", ".join(remaining)
1586 _error(
1587 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1588 " Note that the search directories across all packages have to be ordered (and the"
1589 " source root should generally be last)"
1590 )
1591 remaining -= released
1593 search_dirs_in_order.append(
1594 SearchDirOrderState(
1595 source_root,
1596 all_pkgs,
1597 )
1598 )
1600 return tuple(
1601 # Avoid duplicating all_pkgs
1602 SearchDir(
1603 s.search_dir,
1604 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1605 )
1606 for s in search_dirs_in_order
1607 )