Coverage for src/debputy/util.py: 65%
426 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import argparse
2import collections
3import functools
4import glob
5import logging
6import os
7import re
8import shutil
9import subprocess
10import sys
11import time
12from itertools import zip_longest
13from pathlib import Path
14from typing import (
15 NoReturn,
16 TYPE_CHECKING,
17 Union,
18 Set,
19 FrozenSet,
20 Optional,
21 TypeVar,
22 Dict,
23 Iterator,
24 Iterable,
25 Literal,
26 Tuple,
27 Sequence,
28 List,
29 Mapping,
30 Any,
31)
33from debian.deb822 import Deb822
35from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
36from debputy.exceptions import DebputySubstitutionError
38if TYPE_CHECKING:
39 from debputy.packages import BinaryPackage
40 from debputy.substitution import Substitution
43T = TypeVar("T")
46SLASH_PRUNE = re.compile("//+")
47PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
48PKGVERSION_REGEX = re.compile(
49 r"""
50 (?: \d+ : )? # Optional epoch
51 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
52 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
53""",
54 re.VERBOSE | re.ASCII,
55)
56DEFAULT_PACKAGE_TYPE = "deb"
57DBGSYM_PACKAGE_TYPE = "deb"
58UDEB_PACKAGE_TYPE = "udeb"
60POSTINST_DEFAULT_CONDITION = (
61 '[ "$1" = "configure" ]'
62 ' || [ "$1" = "abort-upgrade" ]'
63 ' || [ "$1" = "abort-deconfigure" ]'
64 ' || [ "$1" = "abort-remove" ]'
65)
68_SPACE_RE = re.compile(r"\s")
69_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
70_REGULAR_ESCAPEES = re.compile(r'([\s!"$()*+#;<>?@\[\]\\`|~])')
71_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
72_DEFAULT_LOGGER: Optional[logging.Logger] = None
73_STDOUT_HANDLER: Optional[logging.StreamHandler] = None
74_STDERR_HANDLER: Optional[logging.StreamHandler] = None
77def assume_not_none(x: Optional[T]) -> T:
78 if x is None: # pragma: no cover
79 raise ValueError(
80 'Internal error: None was given, but the receiver assumed "not None" here'
81 )
82 return x
85def _info(msg: str) -> None:
86 global _DEFAULT_LOGGER
87 logger = _DEFAULT_LOGGER
88 if logger:
89 logger.info(msg)
90 # No fallback print for info
93def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn":
94 global _DEFAULT_LOGGER
95 logger = _DEFAULT_LOGGER
96 if logger:
97 logger.error(msg)
98 else:
99 me = os.path.basename(sys.argv[0]) if prog is None else prog
100 print(
101 f"{me}: error: {msg}",
102 file=sys.stderr,
103 )
104 sys.exit(1)
107def _warn(msg: str, *, prog: Optional[str] = None) -> None:
108 global _DEFAULT_LOGGER
109 logger = _DEFAULT_LOGGER
110 if logger: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true
111 logger.warning(msg)
112 else:
113 me = os.path.basename(sys.argv[0]) if prog is None else prog
115 print(
116 f"{me}: warning: {msg}",
117 file=sys.stderr,
118 )
121class ColorizedArgumentParser(argparse.ArgumentParser):
122 def error(self, message: str) -> NoReturn:
123 self.print_usage(sys.stderr)
124 _error(message, prog=self.prog)
127def ensure_dir(path: str) -> None:
128 if not os.path.isdir(path): 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true
129 os.makedirs(path, mode=0o755, exist_ok=True)
132def _clean_path(orig_p: str) -> str:
133 p = SLASH_PRUNE.sub("/", orig_p)
134 if "." in p: 134 ↛ 147line 134 didn't jump to line 147, because the condition on line 134 was never false
135 path_base = p
136 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
137 # of a normalized path to be a no-op.
138 if path_base.startswith("./"):
139 path_base = path_base[2:]
140 assert path_base
141 for segment in path_base.split("/"):
142 if segment in (".", ".."):
143 raise ValueError(
144 'Please provide paths that are normalized (i.e., no ".." or ".").'
145 f' Offending input "{orig_p}"'
146 )
147 return p
150def _normalize_path(path: str, with_prefix: bool = True) -> str:
151 path = path.strip("/")
152 if not path or path == ".": 152 ↛ 153line 152 didn't jump to line 153, because the condition on line 152 was never true
153 return "."
154 if "//" in path or "." in path:
155 path = _clean_path(path)
156 if with_prefix ^ path.startswith("./"):
157 if with_prefix: 157 ↛ 160line 157 didn't jump to line 160, because the condition on line 157 was never false
158 path = "./" + path
159 else:
160 path = path[2:]
161 return path
164def _normalize_link_target(link_target: str) -> str:
165 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
166 result: List[str] = []
167 for segment in link_target.split("/"):
168 if segment in (".", ""):
169 # Ignore these - the empty string is generally a trailing slash
170 continue
171 if segment == "..":
172 # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
173 if result: 173 ↛ 167line 173 didn't jump to line 167, because the condition on line 173 was never false
174 result.pop()
175 else:
176 result.append(segment)
177 return "/".join(result)
180def _backslash_escape(m: re.Match[str]) -> str:
181 return "\\" + m.group(0)
184def _escape_shell_word(w: str) -> str:
185 if _SPACE_RE.match(w): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true
186 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
187 return f'"{w}"'
188 return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
191def escape_shell(*args: str) -> str:
192 return " ".join(_escape_shell_word(w) for w in args)
195def print_command(*args: str) -> None:
196 print(f" {escape_shell(*args)}")
199def debian_policy_normalize_symlink_target(
200 link_path: str,
201 link_target: str,
202 normalize_link_path: bool = False,
203) -> str:
204 if normalize_link_path:
205 link_path = _normalize_path(link_path)
206 elif not link_path.startswith("./"): 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 raise ValueError("Link part was not normalized")
209 link_path = link_path[2:]
211 if not link_target.startswith("/"):
212 link_target = "/" + os.path.dirname(link_path) + "/" + link_target
214 link_path_parts = link_path.split("/")
215 link_target_parts = [
216 s for s in _normalize_link_target(link_target).split("/") if s != "."
217 ]
219 assert link_path_parts
221 if link_target_parts and link_path_parts[0] == link_target_parts[0]:
222 # Per Debian Policy, must be relative
224 # First determine the length of the overlap
225 common_segment_count = 1
226 shortest_path_length = min(len(link_target_parts), len(link_path_parts))
227 while (
228 common_segment_count < shortest_path_length
229 and link_target_parts[common_segment_count]
230 == link_path_parts[common_segment_count]
231 ):
232 common_segment_count += 1
234 if common_segment_count == shortest_path_length and len(
235 link_path_parts
236 ) - 1 == len(link_target_parts):
237 normalized_link_target = "."
238 else:
239 up_dir_count = len(link_path_parts) - 1 - common_segment_count
240 normalized_link_target_parts = []
241 if up_dir_count:
242 up_dir_part = "../" * up_dir_count
243 # We overshoot with a single '/', so rstrip it away
244 normalized_link_target_parts.append(up_dir_part.rstrip("/"))
245 # Add the relevant down parts
246 normalized_link_target_parts.extend(
247 link_target_parts[common_segment_count:]
248 )
250 normalized_link_target = "/".join(normalized_link_target_parts)
251 else:
252 # Per Debian Policy, must be absolute
253 normalized_link_target = "/" + "/".join(link_target_parts)
255 return normalized_link_target
258def has_glob_magic(pattern: str) -> bool:
259 return glob.has_magic(pattern) or "{" in pattern
262def glob_escape(replacement_value: str) -> str:
263 if not glob.has_magic(replacement_value) or "{" not in replacement_value:
264 return replacement_value
265 return (
266 replacement_value.replace("[", "[[]")
267 .replace("]", "[]]")
268 .replace("*", "[*]")
269 .replace("?", "[?]")
270 .replace("{", "[{]")
271 .replace("}", "[}]")
272 )
275# TODO: This logic should probably be moved to `python-debian`
276def active_profiles_match(
277 profiles_raw: str,
278 active_build_profiles: Union[Set[str], FrozenSet[str]],
279) -> bool:
280 profiles_raw = profiles_raw.strip()
281 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true
282 raise ValueError(
283 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
284 )
285 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
286 for profile_group_raw in profile_groups: 286 ↛ 302line 286 didn't jump to line 302, because the loop on line 286 didn't complete
287 should_process_package = True
288 for profile_name in profile_group_raw.split():
289 negation = False
290 if profile_name[0] == "!": 290 ↛ 294line 290 didn't jump to line 294, because the condition on line 290 was never false
291 negation = True
292 profile_name = profile_name[1:]
294 matched_profile = profile_name in active_build_profiles
295 if matched_profile == negation: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true
296 should_process_package = False
297 break
299 if should_process_package: 299 ↛ 286line 299 didn't jump to line 286, because the condition on line 299 was never false
300 return True
302 return False
305def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]:
306 profiles_raw = build_profiles_raw.strip()
307 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true
308 raise ValueError(
309 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
310 )
311 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
312 return frozenset(frozenset(g.split()) for g in profile_groups)
315def resolve_source_date_epoch(
316 command_line_value: Optional[int],
317 *,
318 substitution: Optional["Substitution"] = None,
319) -> int:
320 mtime = command_line_value
321 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
322 sde_raw = os.environ["SOURCE_DATE_EPOCH"]
323 if sde_raw == "":
324 _error("SOURCE_DATE_EPOCH is set but empty.")
325 mtime = int(sde_raw)
326 if mtime is None and substitution is not None:
327 try:
328 sde_raw = substitution.substitute(
329 "{{SOURCE_DATE_EPOCH}}",
330 "Internal resolution",
331 )
332 mtime = int(sde_raw)
333 except (DebputySubstitutionError, ValueError):
334 pass
335 if mtime is None:
336 mtime = int(time.time())
337 os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
338 return mtime
341def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
342 with open(os.path.join(control_root_dir, "control"), "rt") as fd:
343 control_file = Deb822(fd)
345 package_name = control_file["Package"]
346 package_version = control_file["Version"]
347 package_architecture = control_file["Architecture"]
348 extension = control_file.get("Package-Type") or "deb"
349 if ":" in package_version:
350 package_version = package_version.split(":", 1)[1]
351 if is_udeb:
352 extension = "udeb"
354 return f"{package_name}_{package_version}_{package_architecture}.{extension}"
357_SCRATCH_DIR = None
358_DH_INTEGRATION_MODE = False
361def integrated_with_debhelper() -> None:
362 global _DH_INTEGRATION_MODE
363 _DH_INTEGRATION_MODE = True
366def scratch_dir() -> str:
367 global _SCRATCH_DIR
368 if _SCRATCH_DIR is not None:
369 return _SCRATCH_DIR
370 debputy_scratch_dir = "debian/.debputy/scratch-dir"
371 is_debputy_dir = True
372 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 372 ↛ 374line 372 didn't jump to line 374, because the condition on line 372 was never false
373 _SCRATCH_DIR = debputy_scratch_dir
374 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE:
375 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
376 is_debputy_dir = False
377 else:
378 _SCRATCH_DIR = debputy_scratch_dir
379 ensure_dir(_SCRATCH_DIR)
380 if is_debputy_dir: 380 ↛ 382line 380 didn't jump to line 382, because the condition on line 380 was never false
381 Path("debian/.debputy/.gitignore").write_text("*\n")
382 return _SCRATCH_DIR
385_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None
388def generated_content_dir(
389 *,
390 package: Optional["BinaryPackage"] = None,
391 subdir_key: Optional[str] = None,
392) -> str:
393 global _RUNTIME_CONTAINER_DIR_KEY
394 container_dir = _RUNTIME_CONTAINER_DIR_KEY
395 first_run = False
397 if container_dir is None:
398 first_run = True
399 container_dir = f"_pb-{os.getpid()}"
400 _RUNTIME_CONTAINER_DIR_KEY = container_dir
402 directory = os.path.join(scratch_dir(), container_dir)
404 if first_run and os.path.isdir(directory): 404 ↛ 409line 404 didn't jump to line 409, because the condition on line 404 was never true
405 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
406 # see "stale" data.
407 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
408 # reliable enough for that and we do not have an obvious hook for it.
409 shutil.rmtree(directory)
411 directory = os.path.join(
412 directory,
413 "generated-fs-content",
414 f"pkg_{package.name}" if package else "no-package",
415 )
416 if subdir_key is not None:
417 directory = os.path.join(directory, subdir_key)
419 os.makedirs(directory, exist_ok=True)
420 return directory
423PerlIncDir = collections.namedtuple("PerlIncDir", ["vendorlib", "vendorarch"])
424PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
425_PERL_MODULE_DIRS: Dict[str, PerlIncDir] = {}
428@functools.lru_cache(1)
429def _perl_config_data() -> PerlConfigData:
430 d = (
431 subprocess.check_output(
432 [
433 "perl",
434 "-MConfig",
435 "-e",
436 'print "$Config{version}\n$Config{debian_abi}\n"',
437 ]
438 )
439 .decode("utf-8")
440 .splitlines()
441 )
442 return PerlConfigData(*d)
445def _perl_version() -> str:
446 return _perl_config_data().version
449def perlxs_api_dependency() -> str:
450 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
451 # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
452 config = _perl_config_data()
453 if config.debian_abi is not None and config.debian_abi != "":
454 return f"perlapi-{config.debian_abi}"
455 return f"perlapi-{config.version}"
458def perl_module_dirs(
459 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
460 dctrl_bin: "BinaryPackage",
461) -> PerlIncDir:
462 global _PERL_MODULE_DIRS
463 arch = (
464 dctrl_bin.resolved_architecture
465 if dpkg_architecture_variables.is_cross_compiling
466 else "_default_"
467 )
468 module_dir = _PERL_MODULE_DIRS.get(arch)
469 if module_dir is None:
470 cmd = ["perl"]
471 if dpkg_architecture_variables.is_cross_compiling: 471 ↛ 472line 471 didn't jump to line 472, because the condition on line 471 was never true
472 version = _perl_version()
473 inc_dir = f"/usr/lib/{dctrl_bin.deb_multiarch}/perl/cross-config-{version}"
474 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
475 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
476 # cross builds... meh.
477 if os.path.exists(os.path.join(inc_dir, "Config.pm")):
478 cmd.append(f"-I{inc_dir}")
479 cmd.extend(
480 ["-MConfig", "-e", 'print "$Config{vendorlib}\n$Config{vendorarch}\n"']
481 )
482 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
483 if len(output) != 2: 483 ↛ 484line 483 didn't jump to line 484, because the condition on line 483 was never true
484 raise ValueError(
485 "Internal error: Unable to determine the perl include directories:"
486 f" Raw output from perl snippet: {output}"
487 )
488 module_dir = PerlIncDir(
489 vendorlib=_normalize_path(output[0]),
490 vendorarch=_normalize_path(output[1]),
491 )
492 _PERL_MODULE_DIRS[arch] = module_dir
493 return module_dir
496@functools.lru_cache(1)
497def detect_fakeroot() -> bool:
498 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
499 return False
500 env = dict(os.environ)
501 del env["LD_PRELOAD"]
502 try:
503 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
504 except subprocess.CalledProcessError:
505 print(
506 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
507 file=sys.stderr,
508 )
509 return False
512@functools.lru_cache(1)
513def _sc_arg_max() -> Optional[int]:
514 try:
515 return os.sysconf("SC_ARG_MAX")
516 except RuntimeError:
517 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
518 return None
521def _split_xargs_args(
522 static_cmd: Sequence[str],
523 max_args_byte_len: int,
524 varargs: Iterable[str],
525 reuse_list_ok: bool,
526) -> Iterator[List[str]]:
527 static_cmd_len = len(static_cmd)
528 remaining_len = max_args_byte_len
529 pending_args = list(static_cmd)
530 for arg in varargs:
531 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
532 remaining_len -= arg_len
533 if not remaining_len:
534 if len(pending_args) <= static_cmd_len:
535 raise ValueError(
536 f"Could not fit a single argument into the command line !?"
537 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
538 )
539 yield pending_args
540 remaining_len = max_args_byte_len - arg_len
541 if reuse_list_ok:
542 pending_args.clear()
543 pending_args.extend(static_cmd)
544 else:
545 pending_args = list(static_cmd)
546 pending_args.append(arg)
548 if len(pending_args) > static_cmd_len:
549 yield pending_args
552def xargs(
553 static_cmd: Sequence[str],
554 varargs: Iterable[str],
555 *,
556 env: Optional[Mapping[str, str]] = None,
557 reuse_list_ok: bool = False,
558) -> Iterator[List[str]]:
559 max_args_bytes = _sc_arg_max()
560 # len overshoots with one space explaining the -1. The _split_xargs_args
561 # will account for the space for the first argument
562 static_byte_len = (
563 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
564 )
565 if max_args_bytes is not None:
566 if env is None:
567 # +2 for nul bytes after key and value
568 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
569 else:
570 # +2 for nul bytes after key and value
571 static_byte_len += sum(
572 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
573 for k, v in env.items()
574 )
575 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
576 # that)
577 static_byte_len += 2 * 4096
578 else:
579 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
580 max_args_bytes = 20_000
581 remain_len = max_args_bytes - static_byte_len
582 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
585# itertools recipe
586def grouper(
587 iterable: Iterable[T],
588 n: int,
589 *,
590 incomplete: Literal["fill", "strict", "ignore"] = "fill",
591 fillvalue: Optional[T] = None,
592) -> Iterator[Tuple[T, ...]]:
593 """Collect data into non-overlapping fixed-length chunks or blocks"""
594 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
595 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
596 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
597 args = [iter(iterable)] * n
598 if incomplete == "fill":
599 return zip_longest(*args, fillvalue=fillvalue)
600 if incomplete == "strict":
601 return zip(*args, strict=True)
602 if incomplete == "ignore":
603 return zip(*args)
604 else:
605 raise ValueError("Expected fill, strict, or ignore")
608_LOGGING_SET_UP = False
611def _check_color() -> Tuple[bool, bool, Optional[str]]:
612 dpkg_or_default = os.environ.get(
613 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
614 )
615 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
616 bad_request = None
617 if requested_color not in {"auto", "always", "never"}: 617 ↛ 618line 617 didn't jump to line 618, because the condition on line 617 was never true
618 bad_request = requested_color
619 requested_color = "auto"
621 if requested_color == "auto": 621 ↛ 625line 621 didn't jump to line 625, because the condition on line 621 was never false
622 stdout_color = sys.stdout.isatty()
623 stderr_color = sys.stdout.isatty()
624 else:
625 enable = requested_color == "always"
626 stdout_color = enable
627 stderr_color = enable
628 return stdout_color, stderr_color, bad_request
631def program_name() -> str:
632 name = os.path.basename(sys.argv[0])
633 if name.endswith(".py"): 633 ↛ 634line 633 didn't jump to line 634, because the condition on line 633 was never true
634 name = name[:-3]
635 if name == "__main__": 635 ↛ 636line 635 didn't jump to line 636, because the condition on line 635 was never true
636 name = os.path.basename(os.path.dirname(sys.argv[0]))
637 # FIXME: Not optimal that we have to hardcode these kind of things here
638 if name == "debputy_cmd": 638 ↛ 639line 638 didn't jump to line 639, because the condition on line 638 was never true
639 name = "debputy"
640 return name
643def package_cross_check_precheck(
644 pkg_a: "BinaryPackage",
645 pkg_b: "BinaryPackage",
646) -> Tuple[bool, bool]:
647 """Whether these two packages can do content cross-checks
649 :param pkg_a: The first package
650 :param pkg_b: The second package
651 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
652 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
653 that involves binary_package_a. Both can be True and both can be False at the same time, which
654 happens in common cases (arch:all + arch:any cases both to be False as a common example).
655 """
657 # Handle the two most obvious base-cases
658 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
659 return False, False
660 if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
661 return False, False
663 a_may_see_b = True
664 b_may_see_a = True
666 a_bp = pkg_a.fields.get("Build-Profiles", "")
667 b_bp = pkg_b.fields.get("Build-Profiles", "")
669 if a_bp != b_bp:
670 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
671 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
673 # Check for build profiles being identically but just ordered differently.
674 if a_bp_set != b_bp_set:
675 # For simplicity, we let groups cancel each other out. If one side has no clauses
676 # left, then it will always be built when the other is built.
677 #
678 # Eventually, someone will be here with a special case where more complex logic is
679 # required. Good luck to you! Remember to add test cases for it (the existing logic
680 # has some for a reason and if the logic is going to be more complex, it will need
681 # tests cases to assert it fixes the problem and does not regress)
682 if a_bp_set - b_bp_set:
683 a_may_see_b = False
684 if b_bp_set - a_bp_set:
685 b_may_see_a = False
687 if pkg_a.declared_architecture != pkg_b.declared_architecture:
688 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
689 if pkg_a.declared_architecture != "any": 689 ↛ 691line 689 didn't jump to line 691, because the condition on line 689 was never false
690 b_may_see_a = False
691 if pkg_a.declared_architecture != "any": 691 ↛ 694line 691 didn't jump to line 694, because the condition on line 691 was never false
692 a_may_see_b = False
694 return a_may_see_b, b_may_see_a
697def setup_logging(
698 *, log_only_to_stderr: bool = False, reconfigure_logging: bool = False
699) -> None:
700 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
701 if _LOGGING_SET_UP and not reconfigure_logging: 701 ↛ 702line 701 didn't jump to line 702, because the condition on line 701 was never true
702 raise RuntimeError(
703 "Logging has already been configured."
704 " Use reconfigure_logging=True if you need to reconfigure it"
705 )
706 stdout_color, stderr_color, bad_request = _check_color()
708 if stdout_color or stderr_color: 708 ↛ 709line 708 didn't jump to line 709, because the condition on line 708 was never true
709 try:
710 import colorlog
711 except ImportError:
712 stdout_color = False
713 stderr_color = False
715 if log_only_to_stderr:
716 stdout = sys.stderr
717 stdout_color = stderr_color
718 else:
719 stdout = sys.stderr
721 class LogLevelFilter(logging.Filter):
722 def __init__(self, threshold: int, above: bool):
723 super().__init__()
724 self.threshold = threshold
725 self.above = above
727 def filter(self, record: logging.LogRecord) -> bool:
728 if self.above:
729 return record.levelno >= self.threshold
730 else:
731 return record.levelno < self.threshold
733 color_format = (
734 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
735 )
736 colorless_format = "{name}: {levelnamelower}: {message}"
738 existing_stdout_handler = _STDOUT_HANDLER
739 existing_stderr_handler = _STDERR_HANDLER
741 if stdout_color: 741 ↛ 742line 741 didn't jump to line 742, because the condition on line 741 was never true
742 stdout_handler = colorlog.StreamHandler(stdout)
743 stdout_handler.setFormatter(
744 colorlog.ColoredFormatter(color_format, style="{", force_color=True)
745 )
746 logger = colorlog.getLogger()
747 if existing_stdout_handler is not None:
748 logger.removeHandler(existing_stdout_handler)
749 _STDOUT_HANDLER = stdout_handler
750 logger.addHandler(stdout_handler)
751 else:
752 stdout_handler = logging.StreamHandler(stdout)
753 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
754 logger = logging.getLogger()
755 if existing_stdout_handler is not None:
756 logger.removeHandler(existing_stdout_handler)
757 _STDOUT_HANDLER = stdout_handler
758 logger.addHandler(stdout_handler)
760 if stderr_color: 760 ↛ 761line 760 didn't jump to line 761, because the condition on line 760 was never true
761 stderr_handler = colorlog.StreamHandler(sys.stderr)
762 stderr_handler.setFormatter(
763 colorlog.ColoredFormatter(color_format, style="{", force_color=True)
764 )
765 logger = logging.getLogger()
766 if existing_stdout_handler is not None:
767 logger.removeHandler(existing_stderr_handler)
768 _STDERR_HANDLER = stderr_handler
769 logger.addHandler(stderr_handler)
770 else:
771 stderr_handler = logging.StreamHandler(sys.stderr)
772 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
773 logger = logging.getLogger()
774 if existing_stdout_handler is not None:
775 logger.removeHandler(existing_stderr_handler)
776 _STDERR_HANDLER = stderr_handler
777 logger.addHandler(stderr_handler)
779 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
780 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
782 name = program_name()
784 old_factory = logging.getLogRecordFactory()
786 def record_factory(
787 *args: Any, **kwargs: Any
788 ) -> logging.LogRecord: # pragma: no cover
789 record = old_factory(*args, **kwargs)
790 record.levelnamelower = record.levelname.lower()
791 return record
793 logging.setLogRecordFactory(record_factory)
795 logging.getLogger().setLevel(logging.INFO)
796 _DEFAULT_LOGGER = logging.getLogger(name)
798 if bad_request: 798 ↛ 799line 798 didn't jump to line 799, because the condition on line 798 was never true
799 _DEFAULT_LOGGER.warning(
800 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
801 ' Resetting to "auto".'
802 )
804 _LOGGING_SET_UP = True