Coverage for src/debputy/util.py: 65%

426 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import argparse 

2import collections 

3import functools 

4import glob 

5import logging 

6import os 

7import re 

8import shutil 

9import subprocess 

10import sys 

11import time 

12from itertools import zip_longest 

13from pathlib import Path 

14from typing import ( 

15 NoReturn, 

16 TYPE_CHECKING, 

17 Union, 

18 Set, 

19 FrozenSet, 

20 Optional, 

21 TypeVar, 

22 Dict, 

23 Iterator, 

24 Iterable, 

25 Literal, 

26 Tuple, 

27 Sequence, 

28 List, 

29 Mapping, 

30 Any, 

31) 

32 

33from debian.deb822 import Deb822 

34 

35from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

36from debputy.exceptions import DebputySubstitutionError 

37 

38if TYPE_CHECKING: 

39 from debputy.packages import BinaryPackage 

40 from debputy.substitution import Substitution 

41 

42 

43T = TypeVar("T") 

44 

45 

46SLASH_PRUNE = re.compile("//+") 

47PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

48PKGVERSION_REGEX = re.compile( 

49 r""" 

50 (?: \d+ : )? # Optional epoch 

51 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

52 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

53""", 

54 re.VERBOSE | re.ASCII, 

55) 

56DEFAULT_PACKAGE_TYPE = "deb" 

57DBGSYM_PACKAGE_TYPE = "deb" 

58UDEB_PACKAGE_TYPE = "udeb" 

59 

60POSTINST_DEFAULT_CONDITION = ( 

61 '[ "$1" = "configure" ]' 

62 ' || [ "$1" = "abort-upgrade" ]' 

63 ' || [ "$1" = "abort-deconfigure" ]' 

64 ' || [ "$1" = "abort-remove" ]' 

65) 

66 

67 

68_SPACE_RE = re.compile(r"\s") 

69_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

70_REGULAR_ESCAPEES = re.compile(r'([\s!"$()*+#;<>?@\[\]\\`|~])') 

71_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

72_DEFAULT_LOGGER: Optional[logging.Logger] = None 

73_STDOUT_HANDLER: Optional[logging.StreamHandler] = None 

74_STDERR_HANDLER: Optional[logging.StreamHandler] = None 

75 

76 

77def assume_not_none(x: Optional[T]) -> T: 

78 if x is None: # pragma: no cover 

79 raise ValueError( 

80 'Internal error: None was given, but the receiver assumed "not None" here' 

81 ) 

82 return x 

83 

84 

85def _info(msg: str) -> None: 

86 global _DEFAULT_LOGGER 

87 logger = _DEFAULT_LOGGER 

88 if logger: 

89 logger.info(msg) 

90 # No fallback print for info 

91 

92 

93def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn": 

94 global _DEFAULT_LOGGER 

95 logger = _DEFAULT_LOGGER 

96 if logger: 

97 logger.error(msg) 

98 else: 

99 me = os.path.basename(sys.argv[0]) if prog is None else prog 

100 print( 

101 f"{me}: error: {msg}", 

102 file=sys.stderr, 

103 ) 

104 sys.exit(1) 

105 

106 

107def _warn(msg: str, *, prog: Optional[str] = None) -> None: 

108 global _DEFAULT_LOGGER 

109 logger = _DEFAULT_LOGGER 

110 if logger: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 logger.warning(msg) 

112 else: 

113 me = os.path.basename(sys.argv[0]) if prog is None else prog 

114 

115 print( 

116 f"{me}: warning: {msg}", 

117 file=sys.stderr, 

118 ) 

119 

120 

121class ColorizedArgumentParser(argparse.ArgumentParser): 

122 def error(self, message: str) -> NoReturn: 

123 self.print_usage(sys.stderr) 

124 _error(message, prog=self.prog) 

125 

126 

127def ensure_dir(path: str) -> None: 

128 if not os.path.isdir(path): 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

129 os.makedirs(path, mode=0o755, exist_ok=True) 

130 

131 

132def _clean_path(orig_p: str) -> str: 

133 p = SLASH_PRUNE.sub("/", orig_p) 

134 if "." in p: 134 ↛ 147line 134 didn't jump to line 147, because the condition on line 134 was never false

135 path_base = p 

136 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

137 # of a normalized path to be a no-op. 

138 if path_base.startswith("./"): 

139 path_base = path_base[2:] 

140 assert path_base 

141 for segment in path_base.split("/"): 

142 if segment in (".", ".."): 

143 raise ValueError( 

144 'Please provide paths that are normalized (i.e., no ".." or ".").' 

145 f' Offending input "{orig_p}"' 

146 ) 

147 return p 

148 

149 

150def _normalize_path(path: str, with_prefix: bool = True) -> str: 

151 path = path.strip("/") 

152 if not path or path == ".": 152 ↛ 153line 152 didn't jump to line 153, because the condition on line 152 was never true

153 return "." 

154 if "//" in path or "." in path: 

155 path = _clean_path(path) 

156 if with_prefix ^ path.startswith("./"): 

157 if with_prefix: 157 ↛ 160line 157 didn't jump to line 160, because the condition on line 157 was never false

158 path = "./" + path 

159 else: 

160 path = path[2:] 

161 return path 

162 

163 

164def _normalize_link_target(link_target: str) -> str: 

165 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

166 result: List[str] = [] 

167 for segment in link_target.split("/"): 

168 if segment in (".", ""): 

169 # Ignore these - the empty string is generally a trailing slash 

170 continue 

171 if segment == "..": 

172 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

173 if result: 173 ↛ 167line 173 didn't jump to line 167, because the condition on line 173 was never false

174 result.pop() 

175 else: 

176 result.append(segment) 

177 return "/".join(result) 

178 

179 

180def _backslash_escape(m: re.Match[str]) -> str: 

181 return "\\" + m.group(0) 

182 

183 

184def _escape_shell_word(w: str) -> str: 

185 if _SPACE_RE.match(w): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

187 return f'"{w}"' 

188 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

189 

190 

191def escape_shell(*args: str) -> str: 

192 return " ".join(_escape_shell_word(w) for w in args) 

193 

194 

195def print_command(*args: str) -> None: 

196 print(f" {escape_shell(*args)}") 

197 

198 

199def debian_policy_normalize_symlink_target( 

200 link_path: str, 

201 link_target: str, 

202 normalize_link_path: bool = False, 

203) -> str: 

204 if normalize_link_path: 

205 link_path = _normalize_path(link_path) 

206 elif not link_path.startswith("./"): 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 raise ValueError("Link part was not normalized") 

208 

209 link_path = link_path[2:] 

210 

211 if not link_target.startswith("/"): 

212 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

213 

214 link_path_parts = link_path.split("/") 

215 link_target_parts = [ 

216 s for s in _normalize_link_target(link_target).split("/") if s != "." 

217 ] 

218 

219 assert link_path_parts 

220 

221 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

222 # Per Debian Policy, must be relative 

223 

224 # First determine the length of the overlap 

225 common_segment_count = 1 

226 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

227 while ( 

228 common_segment_count < shortest_path_length 

229 and link_target_parts[common_segment_count] 

230 == link_path_parts[common_segment_count] 

231 ): 

232 common_segment_count += 1 

233 

234 if common_segment_count == shortest_path_length and len( 

235 link_path_parts 

236 ) - 1 == len(link_target_parts): 

237 normalized_link_target = "." 

238 else: 

239 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

240 normalized_link_target_parts = [] 

241 if up_dir_count: 

242 up_dir_part = "../" * up_dir_count 

243 # We overshoot with a single '/', so rstrip it away 

244 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

245 # Add the relevant down parts 

246 normalized_link_target_parts.extend( 

247 link_target_parts[common_segment_count:] 

248 ) 

249 

250 normalized_link_target = "/".join(normalized_link_target_parts) 

251 else: 

252 # Per Debian Policy, must be absolute 

253 normalized_link_target = "/" + "/".join(link_target_parts) 

254 

255 return normalized_link_target 

256 

257 

258def has_glob_magic(pattern: str) -> bool: 

259 return glob.has_magic(pattern) or "{" in pattern 

260 

261 

262def glob_escape(replacement_value: str) -> str: 

263 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

264 return replacement_value 

265 return ( 

266 replacement_value.replace("[", "[[]") 

267 .replace("]", "[]]") 

268 .replace("*", "[*]") 

269 .replace("?", "[?]") 

270 .replace("{", "[{]") 

271 .replace("}", "[}]") 

272 ) 

273 

274 

275# TODO: This logic should probably be moved to `python-debian` 

276def active_profiles_match( 

277 profiles_raw: str, 

278 active_build_profiles: Union[Set[str], FrozenSet[str]], 

279) -> bool: 

280 profiles_raw = profiles_raw.strip() 

281 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true

282 raise ValueError( 

283 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

284 ) 

285 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

286 for profile_group_raw in profile_groups: 286 ↛ 302line 286 didn't jump to line 302, because the loop on line 286 didn't complete

287 should_process_package = True 

288 for profile_name in profile_group_raw.split(): 

289 negation = False 

290 if profile_name[0] == "!": 290 ↛ 294line 290 didn't jump to line 294, because the condition on line 290 was never false

291 negation = True 

292 profile_name = profile_name[1:] 

293 

294 matched_profile = profile_name in active_build_profiles 

295 if matched_profile == negation: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true

296 should_process_package = False 

297 break 

298 

299 if should_process_package: 299 ↛ 286line 299 didn't jump to line 286, because the condition on line 299 was never false

300 return True 

301 

302 return False 

303 

304 

305def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]: 

306 profiles_raw = build_profiles_raw.strip() 

307 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true

308 raise ValueError( 

309 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

310 ) 

311 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

312 return frozenset(frozenset(g.split()) for g in profile_groups) 

313 

314 

315def resolve_source_date_epoch( 

316 command_line_value: Optional[int], 

317 *, 

318 substitution: Optional["Substitution"] = None, 

319) -> int: 

320 mtime = command_line_value 

321 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

322 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

323 if sde_raw == "": 

324 _error("SOURCE_DATE_EPOCH is set but empty.") 

325 mtime = int(sde_raw) 

326 if mtime is None and substitution is not None: 

327 try: 

328 sde_raw = substitution.substitute( 

329 "{{SOURCE_DATE_EPOCH}}", 

330 "Internal resolution", 

331 ) 

332 mtime = int(sde_raw) 

333 except (DebputySubstitutionError, ValueError): 

334 pass 

335 if mtime is None: 

336 mtime = int(time.time()) 

337 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

338 return mtime 

339 

340 

341def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

342 with open(os.path.join(control_root_dir, "control"), "rt") as fd: 

343 control_file = Deb822(fd) 

344 

345 package_name = control_file["Package"] 

346 package_version = control_file["Version"] 

347 package_architecture = control_file["Architecture"] 

348 extension = control_file.get("Package-Type") or "deb" 

349 if ":" in package_version: 

350 package_version = package_version.split(":", 1)[1] 

351 if is_udeb: 

352 extension = "udeb" 

353 

354 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

355 

356 

357_SCRATCH_DIR = None 

358_DH_INTEGRATION_MODE = False 

359 

360 

361def integrated_with_debhelper() -> None: 

362 global _DH_INTEGRATION_MODE 

363 _DH_INTEGRATION_MODE = True 

364 

365 

366def scratch_dir() -> str: 

367 global _SCRATCH_DIR 

368 if _SCRATCH_DIR is not None: 

369 return _SCRATCH_DIR 

370 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

371 is_debputy_dir = True 

372 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 372 ↛ 374line 372 didn't jump to line 374, because the condition on line 372 was never false

373 _SCRATCH_DIR = debputy_scratch_dir 

374 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 

375 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

376 is_debputy_dir = False 

377 else: 

378 _SCRATCH_DIR = debputy_scratch_dir 

379 ensure_dir(_SCRATCH_DIR) 

380 if is_debputy_dir: 380 ↛ 382line 380 didn't jump to line 382, because the condition on line 380 was never false

381 Path("debian/.debputy/.gitignore").write_text("*\n") 

382 return _SCRATCH_DIR 

383 

384 

385_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None 

386 

387 

388def generated_content_dir( 

389 *, 

390 package: Optional["BinaryPackage"] = None, 

391 subdir_key: Optional[str] = None, 

392) -> str: 

393 global _RUNTIME_CONTAINER_DIR_KEY 

394 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

395 first_run = False 

396 

397 if container_dir is None: 

398 first_run = True 

399 container_dir = f"_pb-{os.getpid()}" 

400 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

401 

402 directory = os.path.join(scratch_dir(), container_dir) 

403 

404 if first_run and os.path.isdir(directory): 404 ↛ 409line 404 didn't jump to line 409, because the condition on line 404 was never true

405 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

406 # see "stale" data. 

407 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

408 # reliable enough for that and we do not have an obvious hook for it. 

409 shutil.rmtree(directory) 

410 

411 directory = os.path.join( 

412 directory, 

413 "generated-fs-content", 

414 f"pkg_{package.name}" if package else "no-package", 

415 ) 

416 if subdir_key is not None: 

417 directory = os.path.join(directory, subdir_key) 

418 

419 os.makedirs(directory, exist_ok=True) 

420 return directory 

421 

422 

423PerlIncDir = collections.namedtuple("PerlIncDir", ["vendorlib", "vendorarch"]) 

424PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

425_PERL_MODULE_DIRS: Dict[str, PerlIncDir] = {} 

426 

427 

428@functools.lru_cache(1) 

429def _perl_config_data() -> PerlConfigData: 

430 d = ( 

431 subprocess.check_output( 

432 [ 

433 "perl", 

434 "-MConfig", 

435 "-e", 

436 'print "$Config{version}\n$Config{debian_abi}\n"', 

437 ] 

438 ) 

439 .decode("utf-8") 

440 .splitlines() 

441 ) 

442 return PerlConfigData(*d) 

443 

444 

445def _perl_version() -> str: 

446 return _perl_config_data().version 

447 

448 

449def perlxs_api_dependency() -> str: 

450 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

451 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

452 config = _perl_config_data() 

453 if config.debian_abi is not None and config.debian_abi != "": 

454 return f"perlapi-{config.debian_abi}" 

455 return f"perlapi-{config.version}" 

456 

457 

458def perl_module_dirs( 

459 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

460 dctrl_bin: "BinaryPackage", 

461) -> PerlIncDir: 

462 global _PERL_MODULE_DIRS 

463 arch = ( 

464 dctrl_bin.resolved_architecture 

465 if dpkg_architecture_variables.is_cross_compiling 

466 else "_default_" 

467 ) 

468 module_dir = _PERL_MODULE_DIRS.get(arch) 

469 if module_dir is None: 

470 cmd = ["perl"] 

471 if dpkg_architecture_variables.is_cross_compiling: 471 ↛ 472line 471 didn't jump to line 472, because the condition on line 471 was never true

472 version = _perl_version() 

473 inc_dir = f"/usr/lib/{dctrl_bin.deb_multiarch}/perl/cross-config-{version}" 

474 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

475 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

476 # cross builds... meh. 

477 if os.path.exists(os.path.join(inc_dir, "Config.pm")): 

478 cmd.append(f"-I{inc_dir}") 

479 cmd.extend( 

480 ["-MConfig", "-e", 'print "$Config{vendorlib}\n$Config{vendorarch}\n"'] 

481 ) 

482 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

483 if len(output) != 2: 483 ↛ 484line 483 didn't jump to line 484, because the condition on line 483 was never true

484 raise ValueError( 

485 "Internal error: Unable to determine the perl include directories:" 

486 f" Raw output from perl snippet: {output}" 

487 ) 

488 module_dir = PerlIncDir( 

489 vendorlib=_normalize_path(output[0]), 

490 vendorarch=_normalize_path(output[1]), 

491 ) 

492 _PERL_MODULE_DIRS[arch] = module_dir 

493 return module_dir 

494 

495 

496@functools.lru_cache(1) 

497def detect_fakeroot() -> bool: 

498 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

499 return False 

500 env = dict(os.environ) 

501 del env["LD_PRELOAD"] 

502 try: 

503 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

504 except subprocess.CalledProcessError: 

505 print( 

506 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

507 file=sys.stderr, 

508 ) 

509 return False 

510 

511 

512@functools.lru_cache(1) 

513def _sc_arg_max() -> Optional[int]: 

514 try: 

515 return os.sysconf("SC_ARG_MAX") 

516 except RuntimeError: 

517 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

518 return None 

519 

520 

521def _split_xargs_args( 

522 static_cmd: Sequence[str], 

523 max_args_byte_len: int, 

524 varargs: Iterable[str], 

525 reuse_list_ok: bool, 

526) -> Iterator[List[str]]: 

527 static_cmd_len = len(static_cmd) 

528 remaining_len = max_args_byte_len 

529 pending_args = list(static_cmd) 

530 for arg in varargs: 

531 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

532 remaining_len -= arg_len 

533 if not remaining_len: 

534 if len(pending_args) <= static_cmd_len: 

535 raise ValueError( 

536 f"Could not fit a single argument into the command line !?" 

537 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

538 ) 

539 yield pending_args 

540 remaining_len = max_args_byte_len - arg_len 

541 if reuse_list_ok: 

542 pending_args.clear() 

543 pending_args.extend(static_cmd) 

544 else: 

545 pending_args = list(static_cmd) 

546 pending_args.append(arg) 

547 

548 if len(pending_args) > static_cmd_len: 

549 yield pending_args 

550 

551 

552def xargs( 

553 static_cmd: Sequence[str], 

554 varargs: Iterable[str], 

555 *, 

556 env: Optional[Mapping[str, str]] = None, 

557 reuse_list_ok: bool = False, 

558) -> Iterator[List[str]]: 

559 max_args_bytes = _sc_arg_max() 

560 # len overshoots with one space explaining the -1. The _split_xargs_args 

561 # will account for the space for the first argument 

562 static_byte_len = ( 

563 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

564 ) 

565 if max_args_bytes is not None: 

566 if env is None: 

567 # +2 for nul bytes after key and value 

568 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

569 else: 

570 # +2 for nul bytes after key and value 

571 static_byte_len += sum( 

572 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

573 for k, v in env.items() 

574 ) 

575 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

576 # that) 

577 static_byte_len += 2 * 4096 

578 else: 

579 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

580 max_args_bytes = 20_000 

581 remain_len = max_args_bytes - static_byte_len 

582 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

583 

584 

585# itertools recipe 

586def grouper( 

587 iterable: Iterable[T], 

588 n: int, 

589 *, 

590 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

591 fillvalue: Optional[T] = None, 

592) -> Iterator[Tuple[T, ...]]: 

593 """Collect data into non-overlapping fixed-length chunks or blocks""" 

594 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

595 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

596 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

597 args = [iter(iterable)] * n 

598 if incomplete == "fill": 

599 return zip_longest(*args, fillvalue=fillvalue) 

600 if incomplete == "strict": 

601 return zip(*args, strict=True) 

602 if incomplete == "ignore": 

603 return zip(*args) 

604 else: 

605 raise ValueError("Expected fill, strict, or ignore") 

606 

607 

608_LOGGING_SET_UP = False 

609 

610 

611def _check_color() -> Tuple[bool, bool, Optional[str]]: 

612 dpkg_or_default = os.environ.get( 

613 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

614 ) 

615 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

616 bad_request = None 

617 if requested_color not in {"auto", "always", "never"}: 617 ↛ 618line 617 didn't jump to line 618, because the condition on line 617 was never true

618 bad_request = requested_color 

619 requested_color = "auto" 

620 

621 if requested_color == "auto": 621 ↛ 625line 621 didn't jump to line 625, because the condition on line 621 was never false

622 stdout_color = sys.stdout.isatty() 

623 stderr_color = sys.stdout.isatty() 

624 else: 

625 enable = requested_color == "always" 

626 stdout_color = enable 

627 stderr_color = enable 

628 return stdout_color, stderr_color, bad_request 

629 

630 

631def program_name() -> str: 

632 name = os.path.basename(sys.argv[0]) 

633 if name.endswith(".py"): 633 ↛ 634line 633 didn't jump to line 634, because the condition on line 633 was never true

634 name = name[:-3] 

635 if name == "__main__": 635 ↛ 636line 635 didn't jump to line 636, because the condition on line 635 was never true

636 name = os.path.basename(os.path.dirname(sys.argv[0])) 

637 # FIXME: Not optimal that we have to hardcode these kind of things here 

638 if name == "debputy_cmd": 638 ↛ 639line 638 didn't jump to line 639, because the condition on line 638 was never true

639 name = "debputy" 

640 return name 

641 

642 

643def package_cross_check_precheck( 

644 pkg_a: "BinaryPackage", 

645 pkg_b: "BinaryPackage", 

646) -> Tuple[bool, bool]: 

647 """Whether these two packages can do content cross-checks 

648 

649 :param pkg_a: The first package 

650 :param pkg_b: The second package 

651 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

652 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

653 that involves binary_package_a. Both can be True and both can be False at the same time, which 

654 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

655 """ 

656 

657 # Handle the two most obvious base-cases 

658 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

659 return False, False 

660 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

661 return False, False 

662 

663 a_may_see_b = True 

664 b_may_see_a = True 

665 

666 a_bp = pkg_a.fields.get("Build-Profiles", "") 

667 b_bp = pkg_b.fields.get("Build-Profiles", "") 

668 

669 if a_bp != b_bp: 

670 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

671 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

672 

673 # Check for build profiles being identically but just ordered differently. 

674 if a_bp_set != b_bp_set: 

675 # For simplicity, we let groups cancel each other out. If one side has no clauses 

676 # left, then it will always be built when the other is built. 

677 # 

678 # Eventually, someone will be here with a special case where more complex logic is 

679 # required. Good luck to you! Remember to add test cases for it (the existing logic 

680 # has some for a reason and if the logic is going to be more complex, it will need 

681 # tests cases to assert it fixes the problem and does not regress) 

682 if a_bp_set - b_bp_set: 

683 a_may_see_b = False 

684 if b_bp_set - a_bp_set: 

685 b_may_see_a = False 

686 

687 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

688 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

689 if pkg_a.declared_architecture != "any": 689 ↛ 691line 689 didn't jump to line 691, because the condition on line 689 was never false

690 b_may_see_a = False 

691 if pkg_a.declared_architecture != "any": 691 ↛ 694line 691 didn't jump to line 694, because the condition on line 691 was never false

692 a_may_see_b = False 

693 

694 return a_may_see_b, b_may_see_a 

695 

696 

697def setup_logging( 

698 *, log_only_to_stderr: bool = False, reconfigure_logging: bool = False 

699) -> None: 

700 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

701 if _LOGGING_SET_UP and not reconfigure_logging: 701 ↛ 702line 701 didn't jump to line 702, because the condition on line 701 was never true

702 raise RuntimeError( 

703 "Logging has already been configured." 

704 " Use reconfigure_logging=True if you need to reconfigure it" 

705 ) 

706 stdout_color, stderr_color, bad_request = _check_color() 

707 

708 if stdout_color or stderr_color: 708 ↛ 709line 708 didn't jump to line 709, because the condition on line 708 was never true

709 try: 

710 import colorlog 

711 except ImportError: 

712 stdout_color = False 

713 stderr_color = False 

714 

715 if log_only_to_stderr: 

716 stdout = sys.stderr 

717 stdout_color = stderr_color 

718 else: 

719 stdout = sys.stderr 

720 

721 class LogLevelFilter(logging.Filter): 

722 def __init__(self, threshold: int, above: bool): 

723 super().__init__() 

724 self.threshold = threshold 

725 self.above = above 

726 

727 def filter(self, record: logging.LogRecord) -> bool: 

728 if self.above: 

729 return record.levelno >= self.threshold 

730 else: 

731 return record.levelno < self.threshold 

732 

733 color_format = ( 

734 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

735 ) 

736 colorless_format = "{name}: {levelnamelower}: {message}" 

737 

738 existing_stdout_handler = _STDOUT_HANDLER 

739 existing_stderr_handler = _STDERR_HANDLER 

740 

741 if stdout_color: 741 ↛ 742line 741 didn't jump to line 742, because the condition on line 741 was never true

742 stdout_handler = colorlog.StreamHandler(stdout) 

743 stdout_handler.setFormatter( 

744 colorlog.ColoredFormatter(color_format, style="{", force_color=True) 

745 ) 

746 logger = colorlog.getLogger() 

747 if existing_stdout_handler is not None: 

748 logger.removeHandler(existing_stdout_handler) 

749 _STDOUT_HANDLER = stdout_handler 

750 logger.addHandler(stdout_handler) 

751 else: 

752 stdout_handler = logging.StreamHandler(stdout) 

753 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

754 logger = logging.getLogger() 

755 if existing_stdout_handler is not None: 

756 logger.removeHandler(existing_stdout_handler) 

757 _STDOUT_HANDLER = stdout_handler 

758 logger.addHandler(stdout_handler) 

759 

760 if stderr_color: 760 ↛ 761line 760 didn't jump to line 761, because the condition on line 760 was never true

761 stderr_handler = colorlog.StreamHandler(sys.stderr) 

762 stderr_handler.setFormatter( 

763 colorlog.ColoredFormatter(color_format, style="{", force_color=True) 

764 ) 

765 logger = logging.getLogger() 

766 if existing_stdout_handler is not None: 

767 logger.removeHandler(existing_stderr_handler) 

768 _STDERR_HANDLER = stderr_handler 

769 logger.addHandler(stderr_handler) 

770 else: 

771 stderr_handler = logging.StreamHandler(sys.stderr) 

772 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

773 logger = logging.getLogger() 

774 if existing_stdout_handler is not None: 

775 logger.removeHandler(existing_stderr_handler) 

776 _STDERR_HANDLER = stderr_handler 

777 logger.addHandler(stderr_handler) 

778 

779 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

780 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

781 

782 name = program_name() 

783 

784 old_factory = logging.getLogRecordFactory() 

785 

786 def record_factory( 

787 *args: Any, **kwargs: Any 

788 ) -> logging.LogRecord: # pragma: no cover 

789 record = old_factory(*args, **kwargs) 

790 record.levelnamelower = record.levelname.lower() 

791 return record 

792 

793 logging.setLogRecordFactory(record_factory) 

794 

795 logging.getLogger().setLevel(logging.INFO) 

796 _DEFAULT_LOGGER = logging.getLogger(name) 

797 

798 if bad_request: 798 ↛ 799line 798 didn't jump to line 799, because the condition on line 798 was never true

799 _DEFAULT_LOGGER.warning( 

800 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

801 ' Resetting to "auto".' 

802 ) 

803 

804 _LOGGING_SET_UP = True