From 11180d7be49c1912b7bdcba23d932747041e5b5d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 14 Apr 2024 22:18:23 +0200 Subject: Merging upstream version 0.1.28. Signed-off-by: Daniel Baumann --- coverage-report/d_267b6307937f1878_util_py.html | 903 ++++++++++++++++++++++++ 1 file changed, 903 insertions(+) create mode 100644 coverage-report/d_267b6307937f1878_util_py.html (limited to 'coverage-report/d_267b6307937f1878_util_py.html') diff --git a/coverage-report/d_267b6307937f1878_util_py.html b/coverage-report/d_267b6307937f1878_util_py.html new file mode 100644 index 0000000..5381dcb --- /dev/null +++ b/coverage-report/d_267b6307937f1878_util_py.html @@ -0,0 +1,903 @@ + + + + + Coverage for src/debputy/util.py: 65% + + + + + +
+
+

+ Coverage for src/debputy/util.py: + 65% +

+ +

+ 426 statements   + + + + +

+

+ « prev     + ^ index     + » next +       + coverage.py v7.2.7, + created at 2024-04-07 12:14 +0200 +

+ +
+
+
+

1import argparse 

+

2import collections 

+

3import functools 

+

4import glob 

+

5import logging 

+

6import os 

+

7import re 

+

8import shutil 

+

9import subprocess 

+

10import sys 

+

11import time 

+

12from itertools import zip_longest 

+

13from pathlib import Path 

+

14from typing import ( 

+

15 NoReturn, 

+

16 TYPE_CHECKING, 

+

17 Union, 

+

18 Set, 

+

19 FrozenSet, 

+

20 Optional, 

+

21 TypeVar, 

+

22 Dict, 

+

23 Iterator, 

+

24 Iterable, 

+

25 Literal, 

+

26 Tuple, 

+

27 Sequence, 

+

28 List, 

+

29 Mapping, 

+

30 Any, 

+

31) 

+

32 

+

33from debian.deb822 import Deb822 

+

34 

+

35from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

+

36from debputy.exceptions import DebputySubstitutionError 

+

37 

+

38if TYPE_CHECKING: 

+

39 from debputy.packages import BinaryPackage 

+

40 from debputy.substitution import Substitution 

+

41 

+

42 

+

43T = TypeVar("T") 

+

44 

+

45 

+

46SLASH_PRUNE = re.compile("//+") 

+

47PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

+

48PKGVERSION_REGEX = re.compile( 

+

49 r""" 

+

50 (?: \d+ : )? # Optional epoch 

+

51 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

+

52 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

+

53""", 

+

54 re.VERBOSE | re.ASCII, 

+

55) 

+

56DEFAULT_PACKAGE_TYPE = "deb" 

+

57DBGSYM_PACKAGE_TYPE = "deb" 

+

58UDEB_PACKAGE_TYPE = "udeb" 

+

59 

+

60POSTINST_DEFAULT_CONDITION = ( 

+

61 '[ "$1" = "configure" ]' 

+

62 ' || [ "$1" = "abort-upgrade" ]' 

+

63 ' || [ "$1" = "abort-deconfigure" ]' 

+

64 ' || [ "$1" = "abort-remove" ]' 

+

65) 

+

66 

+

67 

+

68_SPACE_RE = re.compile(r"\s") 

+

69_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

+

70_REGULAR_ESCAPEES = re.compile(r'([\s!"$()*+#;<>?@\[\]\\`|~])') 

+

71_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

+

72_DEFAULT_LOGGER: Optional[logging.Logger] = None 

+

73_STDOUT_HANDLER: Optional[logging.StreamHandler] = None 

+

74_STDERR_HANDLER: Optional[logging.StreamHandler] = None 

+

75 

+

76 

+

77def assume_not_none(x: Optional[T]) -> T: 

+

78 if x is None: # pragma: no cover 

+

79 raise ValueError( 

+

80 'Internal error: None was given, but the receiver assumed "not None" here' 

+

81 ) 

+

82 return x 

+

83 

+

84 

+

85def _info(msg: str) -> None: 

+

86 global _DEFAULT_LOGGER 

+

87 logger = _DEFAULT_LOGGER 

+

88 if logger: 

+

89 logger.info(msg) 

+

90 # No fallback print for info 

+

91 

+

92 

+

93def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn": 

+

94 global _DEFAULT_LOGGER 

+

95 logger = _DEFAULT_LOGGER 

+

96 if logger: 

+

97 logger.error(msg) 

+

98 else: 

+

99 me = os.path.basename(sys.argv[0]) if prog is None else prog 

+

100 print( 

+

101 f"{me}: error: {msg}", 

+

102 file=sys.stderr, 

+

103 ) 

+

104 sys.exit(1) 

+

105 

+

106 

+

107def _warn(msg: str, *, prog: Optional[str] = None) -> None: 

+

108 global _DEFAULT_LOGGER 

+

109 logger = _DEFAULT_LOGGER 

+

110 if logger: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

+

111 logger.warning(msg) 

+

112 else: 

+

113 me = os.path.basename(sys.argv[0]) if prog is None else prog 

+

114 

+

115 print( 

+

116 f"{me}: warning: {msg}", 

+

117 file=sys.stderr, 

+

118 ) 

+

119 

+

120 

+

121class ColorizedArgumentParser(argparse.ArgumentParser): 

+

122 def error(self, message: str) -> NoReturn: 

+

123 self.print_usage(sys.stderr) 

+

124 _error(message, prog=self.prog) 

+

125 

+

126 

+

127def ensure_dir(path: str) -> None: 

+

128 if not os.path.isdir(path): 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

+

129 os.makedirs(path, mode=0o755, exist_ok=True) 

+

130 

+

131 

+

132def _clean_path(orig_p: str) -> str: 

+

133 p = SLASH_PRUNE.sub("/", orig_p) 

+

134 if "." in p: 134 ↛ 147line 134 didn't jump to line 147, because the condition on line 134 was never false

+

135 path_base = p 

+

136 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

+

137 # of a normalized path to be a no-op. 

+

138 if path_base.startswith("./"): 

+

139 path_base = path_base[2:] 

+

140 assert path_base 

+

141 for segment in path_base.split("/"): 

+

142 if segment in (".", ".."): 

+

143 raise ValueError( 

+

144 'Please provide paths that are normalized (i.e., no ".." or ".").' 

+

145 f' Offending input "{orig_p}"' 

+

146 ) 

+

147 return p 

+

148 

+

149 

+

150def _normalize_path(path: str, with_prefix: bool = True) -> str: 

+

151 path = path.strip("/") 

+

152 if not path or path == ".": 152 ↛ 153line 152 didn't jump to line 153, because the condition on line 152 was never true

+

153 return "." 

+

154 if "//" in path or "." in path: 

+

155 path = _clean_path(path) 

+

156 if with_prefix ^ path.startswith("./"): 

+

157 if with_prefix: 157 ↛ 160line 157 didn't jump to line 160, because the condition on line 157 was never false

+

158 path = "./" + path 

+

159 else: 

+

160 path = path[2:] 

+

161 return path 

+

162 

+

163 

+

164def _normalize_link_target(link_target: str) -> str: 

+

165 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

+

166 result: List[str] = [] 

+

167 for segment in link_target.split("/"): 

+

168 if segment in (".", ""): 

+

169 # Ignore these - the empty string is generally a trailing slash 

+

170 continue 

+

171 if segment == "..": 

+

172 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

+

173 if result: 173 ↛ 167line 173 didn't jump to line 167, because the condition on line 173 was never false

+

174 result.pop() 

+

175 else: 

+

176 result.append(segment) 

+

177 return "/".join(result) 

+

178 

+

179 

+

180def _backslash_escape(m: re.Match[str]) -> str: 

+

181 return "\\" + m.group(0) 

+

182 

+

183 

+

184def _escape_shell_word(w: str) -> str: 

+

185 if _SPACE_RE.match(w): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

+

186 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

+

187 return f'"{w}"' 

+

188 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

+

189 

+

190 

+

191def escape_shell(*args: str) -> str: 

+

192 return " ".join(_escape_shell_word(w) for w in args) 

+

193 

+

194 

+

195def print_command(*args: str) -> None: 

+

196 print(f" {escape_shell(*args)}") 

+

197 

+

198 

+

199def debian_policy_normalize_symlink_target( 

+

200 link_path: str, 

+

201 link_target: str, 

+

202 normalize_link_path: bool = False, 

+

203) -> str: 

+

204 if normalize_link_path: 

+

205 link_path = _normalize_path(link_path) 

+

206 elif not link_path.startswith("./"): 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

+

207 raise ValueError("Link part was not normalized") 

+

208 

+

209 link_path = link_path[2:] 

+

210 

+

211 if not link_target.startswith("/"): 

+

212 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

+

213 

+

214 link_path_parts = link_path.split("/") 

+

215 link_target_parts = [ 

+

216 s for s in _normalize_link_target(link_target).split("/") if s != "." 

+

217 ] 

+

218 

+

219 assert link_path_parts 

+

220 

+

221 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

+

222 # Per Debian Policy, must be relative 

+

223 

+

224 # First determine the length of the overlap 

+

225 common_segment_count = 1 

+

226 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

+

227 while ( 

+

228 common_segment_count < shortest_path_length 

+

229 and link_target_parts[common_segment_count] 

+

230 == link_path_parts[common_segment_count] 

+

231 ): 

+

232 common_segment_count += 1 

+

233 

+

234 if common_segment_count == shortest_path_length and len( 

+

235 link_path_parts 

+

236 ) - 1 == len(link_target_parts): 

+

237 normalized_link_target = "." 

+

238 else: 

+

239 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

+

240 normalized_link_target_parts = [] 

+

241 if up_dir_count: 

+

242 up_dir_part = "../" * up_dir_count 

+

243 # We overshoot with a single '/', so rstrip it away 

+

244 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

+

245 # Add the relevant down parts 

+

246 normalized_link_target_parts.extend( 

+

247 link_target_parts[common_segment_count:] 

+

248 ) 

+

249 

+

250 normalized_link_target = "/".join(normalized_link_target_parts) 

+

251 else: 

+

252 # Per Debian Policy, must be absolute 

+

253 normalized_link_target = "/" + "/".join(link_target_parts) 

+

254 

+

255 return normalized_link_target 

+

256 

+

257 

+

258def has_glob_magic(pattern: str) -> bool: 

+

259 return glob.has_magic(pattern) or "{" in pattern 

+

260 

+

261 

+

262def glob_escape(replacement_value: str) -> str: 

+

263 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

+

264 return replacement_value 

+

265 return ( 

+

266 replacement_value.replace("[", "[[]") 

+

267 .replace("]", "[]]") 

+

268 .replace("*", "[*]") 

+

269 .replace("?", "[?]") 

+

270 .replace("{", "[{]") 

+

271 .replace("}", "[}]") 

+

272 ) 

+

273 

+

274 

+

275# TODO: This logic should probably be moved to `python-debian` 

+

276def active_profiles_match( 

+

277 profiles_raw: str, 

+

278 active_build_profiles: Union[Set[str], FrozenSet[str]], 

+

279) -> bool: 

+

280 profiles_raw = profiles_raw.strip() 

+

281 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true

+

282 raise ValueError( 

+

283 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

+

284 ) 

+

285 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

+

286 for profile_group_raw in profile_groups: 286 ↛ 302line 286 didn't jump to line 302, because the loop on line 286 didn't complete

+

287 should_process_package = True 

+

288 for profile_name in profile_group_raw.split(): 

+

289 negation = False 

+

290 if profile_name[0] == "!": 290 ↛ 294line 290 didn't jump to line 294, because the condition on line 290 was never false

+

291 negation = True 

+

292 profile_name = profile_name[1:] 

+

293 

+

294 matched_profile = profile_name in active_build_profiles 

+

295 if matched_profile == negation: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true

+

296 should_process_package = False 

+

297 break 

+

298 

+

299 if should_process_package: 299 ↛ 286line 299 didn't jump to line 286, because the condition on line 299 was never false

+

300 return True 

+

301 

+

302 return False 

+

303 

+

304 

+

305def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]: 

+

306 profiles_raw = build_profiles_raw.strip() 

+

307 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true

+

308 raise ValueError( 

+

309 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

+

310 ) 

+

311 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

+

312 return frozenset(frozenset(g.split()) for g in profile_groups) 

+

313 

+

314 

+

315def resolve_source_date_epoch( 

+

316 command_line_value: Optional[int], 

+

317 *, 

+

318 substitution: Optional["Substitution"] = None, 

+

319) -> int: 

+

320 mtime = command_line_value 

+

321 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

+

322 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

+

323 if sde_raw == "": 

+

324 _error("SOURCE_DATE_EPOCH is set but empty.") 

+

325 mtime = int(sde_raw) 

+

326 if mtime is None and substitution is not None: 

+

327 try: 

+

328 sde_raw = substitution.substitute( 

+

329 "{{SOURCE_DATE_EPOCH}}", 

+

330 "Internal resolution", 

+

331 ) 

+

332 mtime = int(sde_raw) 

+

333 except (DebputySubstitutionError, ValueError): 

+

334 pass 

+

335 if mtime is None: 

+

336 mtime = int(time.time()) 

+

337 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

+

338 return mtime 

+

339 

+

340 

+

341def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

+

342 with open(os.path.join(control_root_dir, "control"), "rt") as fd: 

+

343 control_file = Deb822(fd) 

+

344 

+

345 package_name = control_file["Package"] 

+

346 package_version = control_file["Version"] 

+

347 package_architecture = control_file["Architecture"] 

+

348 extension = control_file.get("Package-Type") or "deb" 

+

349 if ":" in package_version: 

+

350 package_version = package_version.split(":", 1)[1] 

+

351 if is_udeb: 

+

352 extension = "udeb" 

+

353 

+

354 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

+

355 

+

356 

+

357_SCRATCH_DIR = None 

+

358_DH_INTEGRATION_MODE = False 

+

359 

+

360 

+

361def integrated_with_debhelper() -> None: 

+

362 global _DH_INTEGRATION_MODE 

+

363 _DH_INTEGRATION_MODE = True 

+

364 

+

365 

+

366def scratch_dir() -> str: 

+

367 global _SCRATCH_DIR 

+

368 if _SCRATCH_DIR is not None: 

+

369 return _SCRATCH_DIR 

+

370 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

+

371 is_debputy_dir = True 

+

372 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 372 ↛ 374line 372 didn't jump to line 374, because the condition on line 372 was never false

+

373 _SCRATCH_DIR = debputy_scratch_dir 

+

374 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 

+

375 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

+

376 is_debputy_dir = False 

+

377 else: 

+

378 _SCRATCH_DIR = debputy_scratch_dir 

+

379 ensure_dir(_SCRATCH_DIR) 

+

380 if is_debputy_dir: 380 ↛ 382line 380 didn't jump to line 382, because the condition on line 380 was never false

+

381 Path("debian/.debputy/.gitignore").write_text("*\n") 

+

382 return _SCRATCH_DIR 

+

383 

+

384 

+

385_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None 

+

386 

+

387 

+

388def generated_content_dir( 

+

389 *, 

+

390 package: Optional["BinaryPackage"] = None, 

+

391 subdir_key: Optional[str] = None, 

+

392) -> str: 

+

393 global _RUNTIME_CONTAINER_DIR_KEY 

+

394 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

+

395 first_run = False 

+

396 

+

397 if container_dir is None: 

+

398 first_run = True 

+

399 container_dir = f"_pb-{os.getpid()}" 

+

400 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

+

401 

+

402 directory = os.path.join(scratch_dir(), container_dir) 

+

403 

+

404 if first_run and os.path.isdir(directory): 404 ↛ 409line 404 didn't jump to line 409, because the condition on line 404 was never true

+

405 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

+

406 # see "stale" data. 

+

407 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

+

408 # reliable enough for that and we do not have an obvious hook for it. 

+

409 shutil.rmtree(directory) 

+

410 

+

411 directory = os.path.join( 

+

412 directory, 

+

413 "generated-fs-content", 

+

414 f"pkg_{package.name}" if package else "no-package", 

+

415 ) 

+

416 if subdir_key is not None: 

+

417 directory = os.path.join(directory, subdir_key) 

+

418 

+

419 os.makedirs(directory, exist_ok=True) 

+

420 return directory 

+

421 

+

422 

+

423PerlIncDir = collections.namedtuple("PerlIncDir", ["vendorlib", "vendorarch"]) 

+

424PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

+

425_PERL_MODULE_DIRS: Dict[str, PerlIncDir] = {} 

+

426 

+

427 

+

428@functools.lru_cache(1) 

+

429def _perl_config_data() -> PerlConfigData: 

+

430 d = ( 

+

431 subprocess.check_output( 

+

432 [ 

+

433 "perl", 

+

434 "-MConfig", 

+

435 "-e", 

+

436 'print "$Config{version}\n$Config{debian_abi}\n"', 

+

437 ] 

+

438 ) 

+

439 .decode("utf-8") 

+

440 .splitlines() 

+

441 ) 

+

442 return PerlConfigData(*d) 

+

443 

+

444 

+

445def _perl_version() -> str: 

+

446 return _perl_config_data().version 

+

447 

+

448 

+

449def perlxs_api_dependency() -> str: 

+

450 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

+

451 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

+

452 config = _perl_config_data() 

+

453 if config.debian_abi is not None and config.debian_abi != "": 

+

454 return f"perlapi-{config.debian_abi}" 

+

455 return f"perlapi-{config.version}" 

+

456 

+

457 

+

458def perl_module_dirs( 

+

459 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

+

460 dctrl_bin: "BinaryPackage", 

+

461) -> PerlIncDir: 

+

462 global _PERL_MODULE_DIRS 

+

463 arch = ( 

+

464 dctrl_bin.resolved_architecture 

+

465 if dpkg_architecture_variables.is_cross_compiling 

+

466 else "_default_" 

+

467 ) 

+

468 module_dir = _PERL_MODULE_DIRS.get(arch) 

+

469 if module_dir is None: 

+

470 cmd = ["perl"] 

+

471 if dpkg_architecture_variables.is_cross_compiling: 471 ↛ 472line 471 didn't jump to line 472, because the condition on line 471 was never true

+

472 version = _perl_version() 

+

473 inc_dir = f"/usr/lib/{dctrl_bin.deb_multiarch}/perl/cross-config-{version}" 

+

474 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

+

475 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

+

476 # cross builds... meh. 

+

477 if os.path.exists(os.path.join(inc_dir, "Config.pm")): 

+

478 cmd.append(f"-I{inc_dir}") 

+

479 cmd.extend( 

+

480 ["-MConfig", "-e", 'print "$Config{vendorlib}\n$Config{vendorarch}\n"'] 

+

481 ) 

+

482 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

+

483 if len(output) != 2: 483 ↛ 484line 483 didn't jump to line 484, because the condition on line 483 was never true

+

484 raise ValueError( 

+

485 "Internal error: Unable to determine the perl include directories:" 

+

486 f" Raw output from perl snippet: {output}" 

+

487 ) 

+

488 module_dir = PerlIncDir( 

+

489 vendorlib=_normalize_path(output[0]), 

+

490 vendorarch=_normalize_path(output[1]), 

+

491 ) 

+

492 _PERL_MODULE_DIRS[arch] = module_dir 

+

493 return module_dir 

+

494 

+

495 

+

496@functools.lru_cache(1) 

+

497def detect_fakeroot() -> bool: 

+

498 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

+

499 return False 

+

500 env = dict(os.environ) 

+

501 del env["LD_PRELOAD"] 

+

502 try: 

+

503 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

+

504 except subprocess.CalledProcessError: 

+

505 print( 

+

506 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

+

507 file=sys.stderr, 

+

508 ) 

+

509 return False 

+

510 

+

511 

+

512@functools.lru_cache(1) 

+

513def _sc_arg_max() -> Optional[int]: 

+

514 try: 

+

515 return os.sysconf("SC_ARG_MAX") 

+

516 except RuntimeError: 

+

517 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

+

518 return None 

+

519 

+

520 

+

521def _split_xargs_args( 

+

522 static_cmd: Sequence[str], 

+

523 max_args_byte_len: int, 

+

524 varargs: Iterable[str], 

+

525 reuse_list_ok: bool, 

+

526) -> Iterator[List[str]]: 

+

527 static_cmd_len = len(static_cmd) 

+

528 remaining_len = max_args_byte_len 

+

529 pending_args = list(static_cmd) 

+

530 for arg in varargs: 

+

531 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

+

532 remaining_len -= arg_len 

+

533 if not remaining_len: 

+

534 if len(pending_args) <= static_cmd_len: 

+

535 raise ValueError( 

+

536 f"Could not fit a single argument into the command line !?" 

+

537 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

+

538 ) 

+

539 yield pending_args 

+

540 remaining_len = max_args_byte_len - arg_len 

+

541 if reuse_list_ok: 

+

542 pending_args.clear() 

+

543 pending_args.extend(static_cmd) 

+

544 else: 

+

545 pending_args = list(static_cmd) 

+

546 pending_args.append(arg) 

+

547 

+

548 if len(pending_args) > static_cmd_len: 

+

549 yield pending_args 

+

550 

+

551 

+

552def xargs( 

+

553 static_cmd: Sequence[str], 

+

554 varargs: Iterable[str], 

+

555 *, 

+

556 env: Optional[Mapping[str, str]] = None, 

+

557 reuse_list_ok: bool = False, 

+

558) -> Iterator[List[str]]: 

+

559 max_args_bytes = _sc_arg_max() 

+

560 # len overshoots with one space explaining the -1. The _split_xargs_args 

+

561 # will account for the space for the first argument 

+

562 static_byte_len = ( 

+

563 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

+

564 ) 

+

565 if max_args_bytes is not None: 

+

566 if env is None: 

+

567 # +2 for nul bytes after key and value 

+

568 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

+

569 else: 

+

570 # +2 for nul bytes after key and value 

+

571 static_byte_len += sum( 

+

572 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

+

573 for k, v in env.items() 

+

574 ) 

+

575 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

+

576 # that) 

+

577 static_byte_len += 2 * 4096 

+

578 else: 

+

579 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

+

580 max_args_bytes = 20_000 

+

581 remain_len = max_args_bytes - static_byte_len 

+

582 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

+

583 

+

584 

+

585# itertools recipe 

+

586def grouper( 

+

587 iterable: Iterable[T], 

+

588 n: int, 

+

589 *, 

+

590 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

+

591 fillvalue: Optional[T] = None, 

+

592) -> Iterator[Tuple[T, ...]]: 

+

593 """Collect data into non-overlapping fixed-length chunks or blocks""" 

+

594 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

+

595 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

+

596 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

+

597 args = [iter(iterable)] * n 

+

598 if incomplete == "fill": 

+

599 return zip_longest(*args, fillvalue=fillvalue) 

+

600 if incomplete == "strict": 

+

601 return zip(*args, strict=True) 

+

602 if incomplete == "ignore": 

+

603 return zip(*args) 

+

604 else: 

+

605 raise ValueError("Expected fill, strict, or ignore") 

+

606 

+

607 

+

608_LOGGING_SET_UP = False 

+

609 

+

610 

+

611def _check_color() -> Tuple[bool, bool, Optional[str]]: 

+

612 dpkg_or_default = os.environ.get( 

+

613 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

+

614 ) 

+

615 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

+

616 bad_request = None 

+

617 if requested_color not in {"auto", "always", "never"}: 617 ↛ 618line 617 didn't jump to line 618, because the condition on line 617 was never true

+

618 bad_request = requested_color 

+

619 requested_color = "auto" 

+

620 

+

621 if requested_color == "auto": 621 ↛ 625line 621 didn't jump to line 625, because the condition on line 621 was never false

+

622 stdout_color = sys.stdout.isatty() 

+

623 stderr_color = sys.stdout.isatty() 

+

624 else: 

+

625 enable = requested_color == "always" 

+

626 stdout_color = enable 

+

627 stderr_color = enable 

+

628 return stdout_color, stderr_color, bad_request 

+

629 

+

630 

+

631def program_name() -> str: 

+

632 name = os.path.basename(sys.argv[0]) 

+

633 if name.endswith(".py"): 633 ↛ 634line 633 didn't jump to line 634, because the condition on line 633 was never true

+

634 name = name[:-3] 

+

635 if name == "__main__": 635 ↛ 636line 635 didn't jump to line 636, because the condition on line 635 was never true

+

636 name = os.path.basename(os.path.dirname(sys.argv[0])) 

+

637 # FIXME: Not optimal that we have to hardcode these kind of things here 

+

638 if name == "debputy_cmd": 638 ↛ 639line 638 didn't jump to line 639, because the condition on line 638 was never true

+

639 name = "debputy" 

+

640 return name 

+

641 

+

642 

+

643def package_cross_check_precheck( 

+

644 pkg_a: "BinaryPackage", 

+

645 pkg_b: "BinaryPackage", 

+

646) -> Tuple[bool, bool]: 

+

647 """Whether these two packages can do content cross-checks 

+

648 

+

649 :param pkg_a: The first package 

+

650 :param pkg_b: The second package 

+

651 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

+

652 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

+

653 that involves binary_package_a. Both can be True and both can be False at the same time, which 

+

654 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

+

655 """ 

+

656 

+

657 # Handle the two most obvious base-cases 

+

658 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

+

659 return False, False 

+

660 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

+

661 return False, False 

+

662 

+

663 a_may_see_b = True 

+

664 b_may_see_a = True 

+

665 

+

666 a_bp = pkg_a.fields.get("Build-Profiles", "") 

+

667 b_bp = pkg_b.fields.get("Build-Profiles", "") 

+

668 

+

669 if a_bp != b_bp: 

+

670 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

+

671 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

+

672 

+

673 # Check for build profiles being identically but just ordered differently. 

+

674 if a_bp_set != b_bp_set: 

+

675 # For simplicity, we let groups cancel each other out. If one side has no clauses 

+

676 # left, then it will always be built when the other is built. 

+

677 # 

+

678 # Eventually, someone will be here with a special case where more complex logic is 

+

679 # required. Good luck to you! Remember to add test cases for it (the existing logic 

+

680 # has some for a reason and if the logic is going to be more complex, it will need 

+

681 # tests cases to assert it fixes the problem and does not regress) 

+

682 if a_bp_set - b_bp_set: 

+

683 a_may_see_b = False 

+

684 if b_bp_set - a_bp_set: 

+

685 b_may_see_a = False 

+

686 

+

687 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

+

688 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

+

689 if pkg_a.declared_architecture != "any": 689 ↛ 691line 689 didn't jump to line 691, because the condition on line 689 was never false

+

690 b_may_see_a = False 

+

691 if pkg_a.declared_architecture != "any": 691 ↛ 694line 691 didn't jump to line 694, because the condition on line 691 was never false

+

692 a_may_see_b = False 

+

693 

+

694 return a_may_see_b, b_may_see_a 

+

695 

+

696 

+

697def setup_logging( 

+

698 *, log_only_to_stderr: bool = False, reconfigure_logging: bool = False 

+

699) -> None: 

+

700 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

+

701 if _LOGGING_SET_UP and not reconfigure_logging: 701 ↛ 702line 701 didn't jump to line 702, because the condition on line 701 was never true

+

702 raise RuntimeError( 

+

703 "Logging has already been configured." 

+

704 " Use reconfigure_logging=True if you need to reconfigure it" 

+

705 ) 

+

706 stdout_color, stderr_color, bad_request = _check_color() 

+

707 

+

708 if stdout_color or stderr_color: 708 ↛ 709line 708 didn't jump to line 709, because the condition on line 708 was never true

+

709 try: 

+

710 import colorlog 

+

711 except ImportError: 

+

712 stdout_color = False 

+

713 stderr_color = False 

+

714 

+

715 if log_only_to_stderr: 

+

716 stdout = sys.stderr 

+

717 stdout_color = stderr_color 

+

718 else: 

+

719 stdout = sys.stderr 

+

720 

+

721 class LogLevelFilter(logging.Filter): 

+

722 def __init__(self, threshold: int, above: bool): 

+

723 super().__init__() 

+

724 self.threshold = threshold 

+

725 self.above = above 

+

726 

+

727 def filter(self, record: logging.LogRecord) -> bool: 

+

728 if self.above: 

+

729 return record.levelno >= self.threshold 

+

730 else: 

+

731 return record.levelno < self.threshold 

+

732 

+

733 color_format = ( 

+

734 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

+

735 ) 

+

736 colorless_format = "{name}: {levelnamelower}: {message}" 

+

737 

+

738 existing_stdout_handler = _STDOUT_HANDLER 

+

739 existing_stderr_handler = _STDERR_HANDLER 

+

740 

+

741 if stdout_color: 741 ↛ 742line 741 didn't jump to line 742, because the condition on line 741 was never true

+

742 stdout_handler = colorlog.StreamHandler(stdout) 

+

743 stdout_handler.setFormatter( 

+

744 colorlog.ColoredFormatter(color_format, style="{", force_color=True) 

+

745 ) 

+

746 logger = colorlog.getLogger() 

+

747 if existing_stdout_handler is not None: 

+

748 logger.removeHandler(existing_stdout_handler) 

+

749 _STDOUT_HANDLER = stdout_handler 

+

750 logger.addHandler(stdout_handler) 

+

751 else: 

+

752 stdout_handler = logging.StreamHandler(stdout) 

+

753 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

+

754 logger = logging.getLogger() 

+

755 if existing_stdout_handler is not None: 

+

756 logger.removeHandler(existing_stdout_handler) 

+

757 _STDOUT_HANDLER = stdout_handler 

+

758 logger.addHandler(stdout_handler) 

+

759 

+

760 if stderr_color: 760 ↛ 761line 760 didn't jump to line 761, because the condition on line 760 was never true

+

761 stderr_handler = colorlog.StreamHandler(sys.stderr) 

+

762 stderr_handler.setFormatter( 

+

763 colorlog.ColoredFormatter(color_format, style="{", force_color=True) 

+

764 ) 

+

765 logger = logging.getLogger() 

+

766 if existing_stdout_handler is not None: 

+

767 logger.removeHandler(existing_stderr_handler) 

+

768 _STDERR_HANDLER = stderr_handler 

+

769 logger.addHandler(stderr_handler) 

+

770 else: 

+

771 stderr_handler = logging.StreamHandler(sys.stderr) 

+

772 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

+

773 logger = logging.getLogger() 

+

774 if existing_stdout_handler is not None: 

+

775 logger.removeHandler(existing_stderr_handler) 

+

776 _STDERR_HANDLER = stderr_handler 

+

777 logger.addHandler(stderr_handler) 

+

778 

+

779 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

+

780 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

+

781 

+

782 name = program_name() 

+

783 

+

784 old_factory = logging.getLogRecordFactory() 

+

785 

+

786 def record_factory( 

+

787 *args: Any, **kwargs: Any 

+

788 ) -> logging.LogRecord: # pragma: no cover 

+

789 record = old_factory(*args, **kwargs) 

+

790 record.levelnamelower = record.levelname.lower() 

+

791 return record 

+

792 

+

793 logging.setLogRecordFactory(record_factory) 

+

794 

+

795 logging.getLogger().setLevel(logging.INFO) 

+

796 _DEFAULT_LOGGER = logging.getLogger(name) 

+

797 

+

798 if bad_request: 798 ↛ 799line 798 didn't jump to line 799, because the condition on line 798 was never true

+

799 _DEFAULT_LOGGER.warning( 

+

800 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

+

801 ' Resetting to "auto".' 

+

802 ) 

+

803 

+

804 _LOGGING_SET_UP = True 

+
+ + + -- cgit v1.2.3