Coverage for src/debputy/transformation_rules.py: 73%

271 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import dataclasses 

2import os 

3from typing import ( 

4 NoReturn, 

5 Optional, 

6 Callable, 

7 Sequence, 

8 Tuple, 

9 List, 

10 Literal, 

11 Dict, 

12 TypeVar, 

13 cast, 

14) 

15 

16from debputy.exceptions import ( 

17 DebputyRuntimeError, 

18 PureVirtualPathError, 

19 TestPathWithNonExistentFSPathError, 

20) 

21from debputy.filesystem_scan import FSPath 

22from debputy.interpreter import ( 

23 extract_shebang_interpreter_from_file, 

24) 

25from debputy.manifest_conditions import ConditionContext, ManifestCondition 

26from debputy.manifest_parser.base_types import ( 

27 FileSystemMode, 

28 StaticFileSystemOwner, 

29 StaticFileSystemGroup, 

30 DebputyDispatchableType, 

31) 

32from debputy.manifest_parser.util import AttributePath 

33from debputy.path_matcher import MatchRule 

34from debputy.plugin.api import VirtualPath 

35from debputy.plugin.debputy.types import DebputyCapability 

36from debputy.util import _warn 

37 

38 

39class TransformationRuntimeError(DebputyRuntimeError): 

40 pass 

41 

42 

43CreateSymlinkReplacementRule = Literal[ 

44 "error-if-exists", 

45 "error-if-directory", 

46 "abort-on-non-empty-directory", 

47 "discard-existing", 

48] 

49 

50 

51VP = TypeVar("VP", bound=VirtualPath) 

52 

53 

54@dataclasses.dataclass(frozen=True, slots=True) 

55class PreProvidedExclusion: 

56 tag: str 

57 description: str 

58 pruner: Callable[[FSPath], None] 

59 

60 

61class TransformationRule(DebputyDispatchableType): 

62 __slots__ = () 

63 

64 def transform_file_system( 

65 self, fs_root: FSPath, condition_context: ConditionContext 

66 ) -> None: 

67 raise NotImplementedError 

68 

69 def _evaluate_condition( 

70 self, 

71 condition: Optional[ManifestCondition], 

72 condition_context: ConditionContext, 

73 result_if_condition_is_missing: bool = True, 

74 ) -> bool: 

75 if condition is None: 75 ↛ 77line 75 didn't jump to line 77, because the condition on line 75 was never false

76 return result_if_condition_is_missing 

77 return condition.evaluate(condition_context) 

78 

79 def _error( 

80 self, 

81 msg: str, 

82 *, 

83 caused_by: Optional[BaseException] = None, 

84 ) -> NoReturn: 

85 raise TransformationRuntimeError(msg) from caused_by 

86 

87 def _match_rule_had_no_matches( 

88 self, match_rule: MatchRule, definition_source: str 

89 ) -> NoReturn: 

90 self._error( 

91 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did' 

92 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is" 

93 " incorrect." 

94 ) 

95 

96 def _fs_path_as_dir( 

97 self, 

98 path: VP, 

99 definition_source: str, 

100 ) -> VP: 

101 if path.is_dir: 101 ↛ 103line 101 didn't jump to line 103, because the condition on line 101 was never false

102 return path 

103 path_type = "file" if path.is_file else 'symlink/"special file system object"' 

104 self._error( 

105 f"The path {path.path} was expected to be a directory (or non-existing) due to" 

106 f" {definition_source}. However that path existed and is a {path_type}." 

107 f" You may need a `remove: {path.path}` prior to {definition_source} to" 

108 " to make this transformation succeed." 

109 ) 

110 

111 def _ensure_is_directory( 

112 self, 

113 fs_root: FSPath, 

114 path_to_directory: str, 

115 definition_source: str, 

116 ) -> FSPath: 

117 current, missing_parts = fs_root.attempt_lookup(path_to_directory) 

118 current = self._fs_path_as_dir(cast("FSPath", current), definition_source) 

119 if missing_parts: 

120 return current.mkdirs("/".join(missing_parts)) 

121 return current 

122 

123 

124class RemoveTransformationRule(TransformationRule): 

125 __slots__ = ( 

126 "_match_rules", 

127 "_keep_empty_parent_dirs", 

128 "_definition_source", 

129 ) 

130 

131 def __init__( 

132 self, 

133 match_rules: Sequence[MatchRule], 

134 keep_empty_parent_dirs: bool, 

135 definition_source: AttributePath, 

136 ) -> None: 

137 self._match_rules = match_rules 

138 self._keep_empty_parent_dirs = keep_empty_parent_dirs 

139 self._definition_source = definition_source.path 

140 

141 def transform_file_system( 

142 self, 

143 fs_root: FSPath, 

144 condition_context: ConditionContext, 

145 ) -> None: 

146 matched_any = False 

147 for match_rule in self._match_rules: 

148 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a 

149 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52 

150 matches = list(match_rule.finditer(fs_root)) 

151 for m in matches: 

152 matched_any = True 

153 parent = m.parent_dir 

154 if parent is None: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 self._error( 

156 f"Cannot remove the root directory (triggered by {self._definition_source})" 

157 ) 

158 m.unlink(recursive=True) 

159 if not self._keep_empty_parent_dirs: 

160 parent.prune_if_empty_dir() 

161 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures 

162 if not matched_any: 

163 self._match_rule_had_no_matches(match_rule, self._definition_source) 

164 

165 

166class MoveTransformationRule(TransformationRule): 

167 __slots__ = ( 

168 "_match_rule", 

169 "_dest_path", 

170 "_dest_is_dir", 

171 "_definition_source", 

172 "_condition", 

173 ) 

174 

175 def __init__( 

176 self, 

177 match_rule: MatchRule, 

178 dest_path: str, 

179 dest_is_dir: bool, 

180 definition_source: AttributePath, 

181 condition: Optional[ManifestCondition], 

182 ) -> None: 

183 self._match_rule = match_rule 

184 self._dest_path = dest_path 

185 self._dest_is_dir = dest_is_dir 

186 self._definition_source = definition_source.path 

187 self._condition = condition 

188 

189 def transform_file_system( 

190 self, fs_root: FSPath, condition_context: ConditionContext 

191 ) -> None: 

192 if not self._evaluate_condition(self._condition, condition_context): 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true

193 return 

194 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la) 

195 matches = list(self._match_rule.finditer(fs_root)) 

196 if not matches: 

197 self._match_rule_had_no_matches(self._match_rule, self._definition_source) 

198 

199 target_dir: Optional[VirtualPath] 

200 if self._dest_is_dir: 200 ↛ 201line 200 didn't jump to line 201, because the condition on line 200 was never true

201 target_dir = self._ensure_is_directory( 

202 fs_root, 

203 self._dest_path, 

204 self._definition_source, 

205 ) 

206 else: 

207 dir_part, basename = os.path.split(self._dest_path) 

208 target_parent_dir = self._ensure_is_directory( 

209 fs_root, 

210 dir_part, 

211 self._definition_source, 

212 ) 

213 target_dir = target_parent_dir.get(basename) 

214 

215 if target_dir is None or not target_dir.is_dir: 215 ↛ 235line 215 didn't jump to line 235, because the condition on line 215 was never false

216 if len(matches) > 1: 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true

217 self._error( 

218 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

219 f" (from: {self._definition_source}). Multiple paths matched the pattern and the" 

220 " destination was not a directory. Either correct the pattern to only match only source" 

221 " OR define the destination to be a directory (E.g., add a trailing slash - example:" 

222 f' "{self._dest_path}/")' 

223 ) 

224 p = matches[0] 

225 if p.path == self._dest_path: 225 ↛ 226line 225 didn't jump to line 226, because the condition on line 225 was never true

226 self._error( 

227 f"Error in {self._definition_source}, the source" 

228 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the" 

229 " rename redundant!?" 

230 ) 

231 p.parent_dir = target_parent_dir 

232 p.name = basename 

233 return 

234 

235 assert target_dir is not None and target_dir.is_dir 

236 basenames: Dict[str, VirtualPath] = dict() 

237 target_dir_path = target_dir.path 

238 

239 for m in matches: 

240 if m.path == target_dir_path: 

241 self._error( 

242 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}" 

243 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into" 

244 " itself" 

245 ) 

246 if m.name in basenames: 

247 alt_path = basenames[m.name] 

248 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be 

249 # possible for a *single* glob to match the same path twice. 

250 assert alt_path is not m 

251 self._error( 

252 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

253 f" (from: {self._definition_source}). Multiple paths matched the pattern had the" 

254 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the' 

255 f" pattern, so it only matches one path with that basename to avoid this conflict." 

256 ) 

257 existing = m.get(m.name) 

258 if existing and existing.is_dir: 

259 self._error( 

260 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

261 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace" 

262 f" the existing directory {existing.path}. If this replacement is intentional, then please" 

263 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)' 

264 ) 

265 basenames[m.name] = m 

266 m.parent_dir = target_dir 

267 

268 

269class CreateSymlinkPathTransformationRule(TransformationRule): 

270 __slots__ = ( 

271 "_link_dest", 

272 "_link_target", 

273 "_replacement_rule", 

274 "_definition_source", 

275 "_condition", 

276 ) 

277 

278 def __init__( 

279 self, 

280 link_target: str, 

281 link_dest: str, 

282 replacement_rule: CreateSymlinkReplacementRule, 

283 definition_source: AttributePath, 

284 condition: Optional[ManifestCondition], 

285 ) -> None: 

286 self._link_target = link_target 

287 self._link_dest = link_dest 

288 self._replacement_rule = replacement_rule 

289 self._definition_source = definition_source.path 

290 self._condition = condition 

291 

292 def transform_file_system( 

293 self, 

294 fs_root: FSPath, 

295 condition_context: ConditionContext, 

296 ) -> None: 

297 if not self._evaluate_condition(self._condition, condition_context): 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 return 

299 dir_path_part, link_name = os.path.split(self._link_dest) 

300 dir_path = self._ensure_is_directory( 

301 fs_root, 

302 dir_path_part, 

303 self._definition_source, 

304 ) 

305 existing = dir_path.get(link_name) 

306 if existing: 

307 self._handle_existing_path(existing) 

308 dir_path.add_symlink(link_name, self._link_target) 

309 

310 def _handle_existing_path(self, existing: VirtualPath) -> None: 

311 replacement_rule = self._replacement_rule 

312 if replacement_rule == "abort-on-non-empty-directory": 

313 unlink = not existing.is_dir or not any(existing.iterdir) 

314 reason = "the path is a non-empty directory" 

315 elif replacement_rule == "discard-existing": 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true

316 unlink = True 

317 reason = "<<internal error: you should not see an error with this message>>" 

318 elif replacement_rule == "error-if-directory": 

319 unlink = not existing.is_dir 

320 reason = "the path is a directory" 

321 else: 

322 assert replacement_rule == "error-if-exists" 

323 unlink = False 

324 reason = "the path exists" 

325 

326 if unlink: 

327 existing.unlink(recursive=True) 

328 else: 

329 self._error( 

330 f"Refusing to replace {existing.path} with a symlink; {reason} and" 

331 f" the active replacement-rule was {self._replacement_rule}. You can" 

332 f' set the replacement-rule to "discard-existing", if you are not interested' 

333 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}." 

334 ) 

335 

336 

337class CreateDirectoryTransformationRule(TransformationRule): 

338 __slots__ = ( 

339 "_directories", 

340 "_owner", 

341 "_group", 

342 "_mode", 

343 "_definition_source", 

344 "_condition", 

345 ) 

346 

347 def __init__( 

348 self, 

349 directories: Sequence[str], 

350 owner: Optional[StaticFileSystemOwner], 

351 group: Optional[StaticFileSystemGroup], 

352 mode: Optional[FileSystemMode], 

353 definition_source: str, 

354 condition: Optional[ManifestCondition], 

355 ) -> None: 

356 super().__init__() 

357 self._directories = directories 

358 self._owner = owner 

359 self._group = group 

360 self._mode = mode 

361 self._definition_source = definition_source 

362 self._condition = condition 

363 

364 def transform_file_system( 

365 self, 

366 fs_root: FSPath, 

367 condition_context: ConditionContext, 

368 ) -> None: 

369 if not self._evaluate_condition(self._condition, condition_context): 369 ↛ 370line 369 didn't jump to line 370, because the condition on line 369 was never true

370 return 

371 owner = self._owner 

372 group = self._group 

373 mode = self._mode 

374 for directory in self._directories: 

375 dir_path = self._ensure_is_directory( 

376 fs_root, 

377 directory, 

378 self._definition_source, 

379 ) 

380 

381 if mode is not None: 

382 try: 

383 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir) 

384 except ValueError as e: 

385 self._error( 

386 f"Could not compute desired mode for {dir_path.path} as" 

387 f" requested in {self._definition_source}: {e.args[0]}", 

388 caused_by=e, 

389 ) 

390 dir_path.mode = desired_mode 

391 dir_path.chown(owner, group) 

392 

393 

394def _apply_owner_and_mode( 

395 path: VirtualPath, 

396 owner: Optional[StaticFileSystemOwner], 

397 group: Optional[StaticFileSystemGroup], 

398 mode: Optional[FileSystemMode], 

399 capabilities: Optional[str], 

400 capability_mode: Optional[FileSystemMode], 

401 definition_source: str, 

402) -> None: 

403 if owner is not None or group is not None: 403 ↛ 405line 403 didn't jump to line 405, because the condition on line 403 was never false

404 path.chown(owner, group) 

405 if mode is not None: 405 ↛ 415line 405 didn't jump to line 415, because the condition on line 405 was never false

406 try: 

407 desired_mode = mode.compute_mode(path.mode, path.is_dir) 

408 except ValueError as e: 

409 raise TransformationRuntimeError( 

410 f"Could not compute desired mode for {path.path} as" 

411 f" requested in {definition_source}: {e.args[0]}" 

412 ) from e 

413 path.mode = desired_mode 

414 

415 if path.is_file and capabilities is not None: 415 ↛ 416line 415 didn't jump to line 416, because the condition on line 415 was never true

416 cap_ref = path.metadata(DebputyCapability) 

417 cap_value = cap_ref.value 

418 if cap_value is not None: 

419 _warn( 

420 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due" 

421 f" to {definition_source}." 

422 ) 

423 assert capability_mode is not None 

424 cap_ref.value = DebputyCapability( 

425 capabilities, 

426 capability_mode, 

427 definition_source, 

428 ) 

429 

430 

431class PathMetadataTransformationRule(TransformationRule): 

432 __slots__ = ( 

433 "_match_rules", 

434 "_owner", 

435 "_group", 

436 "_mode", 

437 "_capabilities", 

438 "_capability_mode", 

439 "_recursive", 

440 "_definition_source", 

441 "_condition", 

442 ) 

443 

444 def __init__( 

445 self, 

446 match_rules: Sequence[MatchRule], 

447 owner: Optional[StaticFileSystemOwner], 

448 group: Optional[StaticFileSystemGroup], 

449 mode: Optional[FileSystemMode], 

450 recursive: bool, 

451 capabilities: Optional[str], 

452 capability_mode: Optional[FileSystemMode], 

453 definition_source: str, 

454 condition: Optional[ManifestCondition], 

455 ) -> None: 

456 super().__init__() 

457 self._match_rules = match_rules 

458 self._owner = owner 

459 self._group = group 

460 self._mode = mode 

461 self._capabilities = capabilities 

462 self._capability_mode = capability_mode 

463 self._recursive = recursive 

464 self._definition_source = definition_source 

465 self._condition = condition 

466 if self._capabilities is None and self._capability_mode is not None: 466 ↛ 467line 466 didn't jump to line 467, because the condition on line 466 was never true

467 raise ValueError("capability_mode without capabilities") 

468 if self._capabilities is not None and self._capability_mode is None: 468 ↛ 469line 468 didn't jump to line 469, because the condition on line 468 was never true

469 raise ValueError("capabilities without capability_mode") 

470 

471 def transform_file_system( 

472 self, 

473 fs_root: FSPath, 

474 condition_context: ConditionContext, 

475 ) -> None: 

476 if not self._evaluate_condition(self._condition, condition_context): 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true

477 return 

478 owner = self._owner 

479 group = self._group 

480 mode = self._mode 

481 capabilities = self._capabilities 

482 capability_mode = self._capability_mode 

483 definition_source = self._definition_source 

484 d: Optional[List[FSPath]] = [] if self._recursive else None 

485 needs_file_match = False 

486 if self._owner is not None or self._group is not None or self._mode is not None: 486 ↛ 489line 486 didn't jump to line 489, because the condition on line 486 was never false

487 needs_file_match = True 

488 

489 for match_rule in self._match_rules: 

490 match_ok = False 

491 saw_symlink = False 

492 saw_directory = False 

493 

494 for path in match_rule.finditer(fs_root): 

495 if path.is_symlink: 495 ↛ 496line 495 didn't jump to line 496, because the condition on line 495 was never true

496 saw_symlink = True 

497 continue 

498 if path.is_file or not needs_file_match: 498 ↛ 500line 498 didn't jump to line 500, because the condition on line 498 was never false

499 match_ok = True 

500 if path.is_dir: 500 ↛ 501line 500 didn't jump to line 501, because the condition on line 500 was never true

501 saw_directory = True 

502 if not match_ok and needs_file_match and self._recursive: 

503 match_ok = any(p.is_file for p in path.all_paths()) 

504 _apply_owner_and_mode( 

505 path, 

506 owner, 

507 group, 

508 mode, 

509 capabilities, 

510 capability_mode, 

511 definition_source, 

512 ) 

513 if path.is_dir and d is not None: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true

514 d.append(path) 

515 

516 if not match_ok: 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true

517 if needs_file_match and (saw_directory or saw_symlink): 

518 _warn( 

519 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

520 " did not match any files, but given the attributes it can only apply to files." 

521 ) 

522 elif saw_symlink: 

523 _warn( 

524 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

525 ' matched symlinks, but "path-metadata" cannot apply to symlinks.' 

526 ) 

527 self._match_rule_had_no_matches(match_rule, self._definition_source) 

528 

529 if not d: 529 ↛ 531line 529 didn't jump to line 531, because the condition on line 529 was never false

530 return 

531 for recurse_dir in d: 

532 for path in recurse_dir.all_paths(): 

533 if path.is_symlink: 

534 continue 

535 _apply_owner_and_mode( 

536 path, 

537 owner, 

538 group, 

539 mode, 

540 capabilities, 

541 capability_mode, 

542 definition_source, 

543 ) 

544 

545 

546class ModeNormalizationTransformationRule(TransformationRule): 

547 __slots__ = ("_normalizations",) 

548 

549 def __init__( 

550 self, 

551 normalizations: Sequence[Tuple[MatchRule, FileSystemMode]], 

552 ) -> None: 

553 self._normalizations = normalizations 

554 

555 def transform_file_system( 

556 self, 

557 fs_root: FSPath, 

558 condition_context: ConditionContext, 

559 ) -> None: 

560 seen = set() 

561 for match_rule, fs_mode in self._normalizations: 

562 for path in match_rule.finditer( 

563 fs_root, ignore_paths=lambda p: p.path in seen 

564 ): 

565 if path.is_symlink or path.path in seen: 

566 continue 

567 seen.add(path.path) 

568 try: 

569 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir) 

570 except ValueError as e: 

571 raise AssertionError( 

572 "Error while applying built-in mode normalization rule" 

573 ) from e 

574 path.mode = desired_mode 

575 

576 

577class NormalizeShebangLineTransformation(TransformationRule): 

578 def transform_file_system( 

579 self, 

580 fs_root: VirtualPath, 

581 condition_context: ConditionContext, 

582 ) -> None: 

583 for path in fs_root.all_paths(): 

584 if not path.is_file: 

585 continue 

586 try: 

587 with path.open(byte_io=True, buffering=4096) as fd: 

588 interpreter = extract_shebang_interpreter_from_file(fd) 

589 except (PureVirtualPathError, TestPathWithNonExistentFSPathError): 

590 # Do not make tests unnecessarily complex to write 

591 continue 

592 if interpreter is None: 

593 continue 

594 

595 if interpreter.fixup_needed: 

596 interpreter.replace_shebang_line(path)