Coverage for src/debputy/filesystem_scan.py: 74%

1104 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import stat 

9import subprocess 

10import tempfile 

11import time 

12from abc import ABC 

13from contextlib import suppress 

14from typing import ( 

15 List, 

16 Iterable, 

17 Dict, 

18 Optional, 

19 Tuple, 

20 Union, 

21 Iterator, 

22 Mapping, 

23 cast, 

24 Any, 

25 ContextManager, 

26 TextIO, 

27 BinaryIO, 

28 NoReturn, 

29 Type, 

30 Generic, 

31) 

32from weakref import ref, ReferenceType 

33 

34from debputy.exceptions import ( 

35 PureVirtualPathError, 

36 DebputyFSIsROError, 

37 DebputyMetadataAccessError, 

38 TestPathWithNonExistentFSPathError, 

39 SymlinkLoopError, 

40) 

41from debputy.intermediate_manifest import PathType 

42from debputy.manifest_parser.base_types import ( 

43 ROOT_DEFINITION, 

44 StaticFileSystemOwner, 

45 StaticFileSystemGroup, 

46) 

47from debputy.plugin.api.spec import ( 

48 VirtualPath, 

49 PathDef, 

50 PathMetadataReference, 

51 PMT, 

52) 

53from debputy.types import VP 

54from debputy.util import ( 

55 generated_content_dir, 

56 _error, 

57 escape_shell, 

58 assume_not_none, 

59 _normalize_path, 

60) 

61 

62BY_BASENAME = operator.attrgetter("name") 

63 

64 

65class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

66 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

67 

68 def __init__( 

69 self, 

70 owning_plugin: str, 

71 current_plugin: str, 

72 metadata_type: Type[PMT], 

73 ) -> None: 

74 self._owning_plugin = owning_plugin 

75 self._current_plugin = current_plugin 

76 self._metadata_type = metadata_type 

77 

78 @property 

79 def is_present(self) -> bool: 

80 return False 

81 

82 @property 

83 def can_read(self) -> bool: 

84 return self._owning_plugin == self._current_plugin 

85 

86 @property 

87 def can_write(self) -> bool: 

88 return False 

89 

90 @property 

91 def value(self) -> Optional[PMT]: 

92 if self.can_read: 92 ↛ 94line 92 didn't jump to line 94, because the condition on line 92 was never false

93 return None 

94 raise DebputyMetadataAccessError( 

95 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

96 f" {self._owning_plugin} as the metadata has not been made" 

97 f" readable to the plugin {self._current_plugin}." 

98 ) 

99 

100 @value.setter 

101 def value(self, new_value: PMT) -> None: 

102 if self._is_owner: 

103 raise DebputyFSIsROError( 

104 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

105 ) 

106 raise DebputyMetadataAccessError( 

107 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

108 f" {self._owning_plugin} as the metadata has not been made" 

109 f" read-write to the plugin {self._current_plugin}." 

110 ) 

111 

112 @property 

113 def _is_owner(self) -> bool: 

114 return self._owning_plugin == self._current_plugin 

115 

116 

117@dataclasses.dataclass(slots=True) 

118class PathMetadataValue(Generic[PMT]): 

119 owning_plugin: str 

120 metadata_type: Type[PMT] 

121 value: Optional[PMT] = None 

122 

123 def can_read_value(self, current_plugin: str) -> bool: 

124 return self.owning_plugin == current_plugin 

125 

126 def can_write_value(self, current_plugin: str) -> bool: 

127 return self.owning_plugin == current_plugin 

128 

129 

130class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

131 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

132 

133 def __init__( 

134 self, 

135 owning_path: VirtualPath, 

136 current_plugin: str, 

137 path_metadata_value: PathMetadataValue[PMT], 

138 ) -> None: 

139 self._owning_path = owning_path 

140 self._current_plugin = current_plugin 

141 self._path_metadata_value = path_metadata_value 

142 

143 @property 

144 def is_present(self) -> bool: 

145 if not self.can_read: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true

146 return False 

147 return self._path_metadata_value.value is not None 

148 

149 @property 

150 def can_read(self) -> bool: 

151 return self._path_metadata_value.can_read_value(self._current_plugin) 

152 

153 @property 

154 def can_write(self) -> bool: 

155 if not self._path_metadata_value.can_write_value(self._current_plugin): 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true

156 return False 

157 owning_path = self._owning_path 

158 return owning_path.is_read_write and not owning_path.is_detached 

159 

160 @property 

161 def value(self) -> Optional[PMT]: 

162 if self.can_read: 162 ↛ 164line 162 didn't jump to line 164, because the condition on line 162 was never false

163 return self._path_metadata_value.value 

164 raise DebputyMetadataAccessError( 

165 f"Cannot read the metadata {self._metadata_type_name} owned by" 

166 f" {self._owning_plugin} as the metadata has not been made" 

167 f" readable to the plugin {self._current_plugin}." 

168 ) 

169 

170 @value.setter 

171 def value(self, new_value: PMT) -> None: 

172 if not self.can_write: 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true

173 m = "set" if new_value is not None else "delete" 

174 raise DebputyMetadataAccessError( 

175 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

176 f" {self._owning_plugin} as the metadata has not been made" 

177 f" read-write to the plugin {self._current_plugin}." 

178 ) 

179 owning_path = self._owning_path 

180 if not owning_path.is_read_write: 180 ↛ 181line 180 didn't jump to line 181, because the condition on line 180 was never true

181 raise DebputyFSIsROError( 

182 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

183 ) 

184 if owning_path.is_detached: 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true

185 raise TypeError( 

186 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

187 ) 

188 self._path_metadata_value.value = new_value 

189 

190 @property 

191 def _is_owner(self) -> bool: 

192 return self._owning_plugin == self._current_plugin 

193 

194 @property 

195 def _owning_plugin(self) -> str: 

196 return self._path_metadata_value.owning_plugin 

197 

198 @property 

199 def _metadata_type_name(self) -> str: 

200 return self._path_metadata_value.metadata_type.__name__ 

201 

202 

203def _cp_a(source: str, dest: str) -> None: 

204 cmd = ["cp", "-a", source, dest] 

205 try: 

206 subprocess.check_call(cmd) 

207 except subprocess.CalledProcessError: 

208 full_command = escape_shell(*cmd) 

209 _error( 

210 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

211 f" above to understand what went wrong. The full command was: {full_command}" 

212 ) 

213 

214 

215def _split_path(path: str) -> Tuple[bool, bool, List[str]]: 

216 must_be_dir = True if path.endswith("/") else False 

217 absolute = False 

218 if path.startswith("/"): 

219 absolute = True 

220 path = "." + path 

221 path_parts = path.rstrip("/").split("/") 

222 if must_be_dir: 

223 path_parts.append(".") 

224 return absolute, must_be_dir, path_parts 

225 

226 

227def _root(path: VP) -> VP: 

228 current = path 

229 while True: 

230 parent = current.parent_dir 

231 if parent is None: 

232 return current 

233 current = parent 

234 

235 

236def _check_fs_path_is_file( 

237 fs_path: str, 

238 unlink_on_error: Optional["FSPath"] = None, 

239) -> None: 

240 had_issue = False 

241 try: 

242 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

243 st = os.lstat(fs_path) 

244 except FileNotFoundError: 

245 had_issue = True 

246 else: 

247 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true

248 had_issue = True 

249 if not had_issue: 249 ↛ 252line 249 didn't jump to line 252, because the condition on line 249 was never false

250 return 

251 

252 if unlink_on_error: 

253 with suppress(FileNotFoundError): 

254 os.unlink(fs_path) 

255 raise TypeError( 

256 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

257 " linked to another file. The entry has been disconnected." 

258 ) 

259 

260 

261class CurrentPluginContextManager: 

262 __slots__ = ("_plugin_names",) 

263 

264 def __init__(self, initial_plugin_name: str) -> None: 

265 self._plugin_names = [initial_plugin_name] 

266 

267 @property 

268 def current_plugin_name(self) -> str: 

269 return self._plugin_names[-1] 

270 

271 @contextlib.contextmanager 

272 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

273 self._plugin_names.append(new_plugin_name) 

274 yield new_plugin_name 

275 self._plugin_names.pop() 

276 

277 

278class VirtualPathBase(VirtualPath, ABC): 

279 __slots__ = () 

280 

281 def _orphan_safe_path(self) -> str: 

282 return self.path 

283 

284 def _rw_check(self) -> None: 

285 if not self.is_read_write: 

286 raise DebputyFSIsROError( 

287 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

288 " Debputy Virtual File system is R/O." 

289 ) 

290 

291 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

292 match, missing = self.attempt_lookup(path) 

293 if missing: 

294 return None 

295 return match 

296 

297 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]: 

298 if self.is_detached: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 raise ValueError( 

300 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

301 ) 

302 absolute, must_be_dir, path_parts = _split_path(path) 

303 current = _root(self) if absolute else self 

304 path_parts.reverse() 

305 link_expansions = set() 

306 while path_parts: 

307 dir_part = path_parts.pop() 

308 if dir_part == ".": 

309 continue 

310 if dir_part == "..": 

311 p = current.parent_dir 

312 if p is None: 312 ↛ 313line 312 didn't jump to line 313, because the condition on line 312 was never true

313 raise ValueError(f'The path "{path}" escapes the root dir') 

314 current = p 

315 continue 

316 try: 

317 current = current[dir_part] 

318 except KeyError: 

319 path_parts.append(dir_part) 

320 path_parts.reverse() 

321 if must_be_dir: 

322 path_parts.pop() 

323 return current, path_parts 

324 if current.is_symlink and path_parts: 

325 if current.path in link_expansions: 

326 # This is our loop detection for now. It might have some false positives where you 

327 # could safely resolve the same symlink twice. However, given that this use-case is 

328 # basically non-existent in practice for packaging, we just stop here for now. 

329 raise SymlinkLoopError( 

330 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

331 " times. Currently, traversing the same symlink twice is considered" 

332 " a loop by `debputy` even if the path would eventually resolve." 

333 " Consider filing a feature request if you have a benign case that" 

334 " triggers this error." 

335 ) 

336 link_expansions.add(current.path) 

337 link_target = current.readlink() 

338 link_absolute, _, link_path_parts = _split_path(link_target) 

339 if link_absolute: 

340 current = _root(current) 

341 else: 

342 current = assume_not_none(current.parent_dir) 

343 link_path_parts.reverse() 

344 path_parts.extend(link_path_parts) 

345 return current, [] 

346 

347 def mkdirs(self, path: str) -> "VirtualPath": 

348 current: VirtualPath 

349 current, missing_parts = self.attempt_lookup( 

350 f"{path}/" if not path.endswith("/") else path 

351 ) 

352 if not current.is_dir: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true

353 raise ValueError( 

354 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

355 " a directory. However, that path exist AND is a not directory." 

356 ) 

357 for missing_part in missing_parts: 

358 assert missing_part not in (".", "..") 

359 current = current.mkdir(missing_part) 

360 return current 

361 

362 def prune_if_empty_dir(self) -> None: 

363 """Remove this and all (now) empty parent directories 

364 

365 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

366 

367 This operation may cause the path (and any of its parent directories) to become "detached" 

368 and therefore unsafe to use in further operations. 

369 """ 

370 self._rw_check() 

371 

372 if not self.is_dir: 372 ↛ 373line 372 didn't jump to line 373, because the condition on line 372 was never true

373 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

374 if any(self.iterdir): 

375 return 

376 parent_dir = assume_not_none(self.parent_dir) 

377 

378 # Recursive does not matter; we already know the directory is empty. 

379 self.unlink() 

380 

381 # Note: The root dir must never be deleted. This works because when delegating it to the root 

382 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

383 # inline loop (rather than recursion), be sure to preserve this feature. 

384 parent_dir.prune_if_empty_dir() 

385 

386 def _current_plugin(self) -> str: 

387 if self.is_detached: 387 ↛ 388line 387 didn't jump to line 388, because the condition on line 387 was never true

388 raise TypeError("Cannot resolve the current plugin; path is detached") 

389 current = self 

390 while True: 

391 next_parent = current.parent_dir 

392 if next_parent is None: 

393 break 

394 current = next_parent 

395 assert current is not None 

396 return cast("FSRootDir", current)._current_plugin() 

397 

398 

399class FSPath(VirtualPathBase, ABC): 

400 __slots__ = ( 

401 "_basename", 

402 "_parent_dir", 

403 "_children", 

404 "_path_cache", 

405 "_parent_path_cache", 

406 "_last_known_parent_path", 

407 "_mode", 

408 "_owner", 

409 "_group", 

410 "_mtime", 

411 "_stat_cache", 

412 "_metadata", 

413 "__weakref__", 

414 ) 

415 

416 def __init__( 

417 self, 

418 basename: str, 

419 parent: Optional["FSPath"], 

420 children: Optional[Dict[str, "FSPath"]] = None, 

421 initial_mode: Optional[int] = None, 

422 mtime: Optional[float] = None, 

423 stat_cache: Optional[os.stat_result] = None, 

424 ) -> None: 

425 self._basename = basename 

426 self._path_cache: Optional[str] = None 

427 self._parent_path_cache: Optional[str] = None 

428 self._children = children 

429 self._last_known_parent_path: Optional[str] = None 

430 self._mode = initial_mode 

431 self._mtime = mtime 

432 self._stat_cache = stat_cache 

433 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {} 

434 self._owner = ROOT_DEFINITION 

435 self._group = ROOT_DEFINITION 

436 

437 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

438 # is_orphaned, which assumes self._parent_dir is an attribute. 

439 self._parent_dir: Optional[ReferenceType["FSPath"]] = None 

440 if parent is not None: 

441 self.parent_dir = parent 

442 

443 def __repr__(self) -> str: 

444 return ( 

445 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

446 f" is_file={self.is_file}," 

447 f" is_dir={self.is_dir}," 

448 f" is_symlink={self.is_symlink}," 

449 f" has_fs_path={self.has_fs_path}," 

450 f" children_len={len(self._children) if self._children else 0})" 

451 ) 

452 

453 @property 

454 def name(self) -> str: 

455 return self._basename 

456 

457 @name.setter 

458 def name(self, new_name: str) -> None: 

459 self._rw_check() 

460 if new_name == self._basename: 460 ↛ 461line 460 didn't jump to line 461, because the condition on line 460 was never true

461 return 

462 if self.is_detached: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true

463 self._basename = new_name 

464 return 

465 self._rw_check() 

466 parent = self.parent_dir 

467 # This little parent_dir dance ensures the parent dir detects the rename properly 

468 self.parent_dir = None 

469 self._basename = new_name 

470 self.parent_dir = parent 

471 

472 @property 

473 def iterdir(self) -> Iterable["FSPath"]: 

474 if self._children is not None: 

475 yield from self._children.values() 

476 

477 def all_paths(self) -> Iterable["FSPath"]: 

478 yield self 

479 if not self.is_dir: 

480 return 

481 by_basename = BY_BASENAME 

482 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

483 while stack: 

484 current = stack.pop() 

485 yield current 

486 if current.is_dir and not current.is_detached: 

487 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

488 

489 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]: 

490 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

491 if not self.is_dir: 491 ↛ 492line 491 didn't jump to line 492, because the condition on line 491 was never true

492 yield self, [] 

493 return 

494 by_basename = BY_BASENAME 

495 stack = [self] 

496 while stack: 

497 current = stack.pop() 

498 children = sorted(current.iterdir, key=by_basename) 

499 assert not children or current.is_dir 

500 yield current, children 

501 # Removing the directory counts as discarding the children. 

502 if not current.is_detached: 502 ↛ 496line 502 didn't jump to line 496, because the condition on line 502 was never false

503 stack.extend(reversed(children)) 

504 

505 def _orphan_safe_path(self) -> str: 

506 if not self.is_detached or self._last_known_parent_path is not None: 506 ↛ 508line 506 didn't jump to line 508, because the condition on line 506 was never false

507 return self.path 

508 return f"<orphaned>/{self.name}" 

509 

510 @property 

511 def is_detached(self) -> bool: 

512 parent = self._parent_dir 

513 if parent is None: 

514 return True 

515 resolved_parent = parent() 

516 if resolved_parent is None: 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true

517 return True 

518 return resolved_parent.is_detached 

519 

520 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

521 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

522 # behaviour to avoid surprises for now. 

523 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

524 # to using it) 

525 __iter__ = None 

526 

527 def __getitem__(self, key) -> "FSPath": 

528 if self._children is None: 

529 raise KeyError( 

530 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

531 ) 

532 if isinstance(key, FSPath): 532 ↛ 533line 532 didn't jump to line 533, because the condition on line 532 was never true

533 key = key.name 

534 return self._children[key] 

535 

536 def __delitem__(self, key) -> None: 

537 self._rw_check() 

538 children = self._children 

539 if children is None: 539 ↛ 540line 539 didn't jump to line 540, because the condition on line 539 was never true

540 raise KeyError(key) 

541 del children[key] 

542 

543 def get(self, key: str) -> "Optional[FSPath]": 

544 try: 

545 return self[key] 

546 except KeyError: 

547 return None 

548 

549 def __contains__(self, item: object) -> bool: 

550 if isinstance(item, VirtualPath): 550 ↛ 551line 550 didn't jump to line 551, because the condition on line 550 was never true

551 return item.parent_dir is self 

552 if not isinstance(item, str): 552 ↛ 553line 552 didn't jump to line 553, because the condition on line 552 was never true

553 return False 

554 m = self.get(item) 

555 return m is not None 

556 

557 def _add_child(self, child: "FSPath") -> None: 

558 self._rw_check() 

559 if not self.is_dir: 559 ↛ 560line 559 didn't jump to line 560, because the condition on line 559 was never true

560 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

561 if self._children is None: 

562 self._children = {} 

563 

564 conflict_child = self.get(child.name) 

565 if conflict_child is not None: 565 ↛ 566line 565 didn't jump to line 566, because the condition on line 565 was never true

566 conflict_child.unlink(recursive=True) 

567 self._children[child.name] = child 

568 

569 @property 

570 def tar_path(self) -> str: 

571 path = self.path 

572 if self.is_dir: 

573 return path + "/" 

574 return path 

575 

576 @property 

577 def path(self) -> str: 

578 parent_path = self.parent_dir_path 

579 if ( 

580 self._parent_path_cache is not None 

581 and self._parent_path_cache == parent_path 

582 ): 

583 return assume_not_none(self._path_cache) 

584 if parent_path is None: 584 ↛ 585line 584 didn't jump to line 585, because the condition on line 584 was never true

585 raise ReferenceError( 

586 f"The path {self.name} is detached! {self.__class__.__name__}" 

587 ) 

588 self._parent_path_cache = parent_path 

589 ret = os.path.join(parent_path, self.name) 

590 self._path_cache = ret 

591 return ret 

592 

593 @property 

594 def parent_dir(self) -> Optional["FSPath"]: 

595 p_ref = self._parent_dir 

596 p = p_ref() if p_ref is not None else None 

597 if p is None: 597 ↛ 598line 597 didn't jump to line 598, because the condition on line 597 was never true

598 raise ReferenceError( 

599 f"The path {self.name} is detached! {self.__class__.__name__}" 

600 ) 

601 return p 

602 

603 @parent_dir.setter 

604 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

605 self._rw_check() 

606 if new_parent is not None: 

607 if not new_parent.is_dir: 607 ↛ 608line 607 didn't jump to line 608, because the condition on line 607 was never true

608 raise ValueError( 

609 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

610 ) 

611 new_parent._rw_check() 

612 old_parent = None 

613 self._last_known_parent_path = None 

614 if not self.is_detached: 

615 old_parent = self.parent_dir 

616 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

617 del old_parent_children[self.name] 

618 if new_parent is not None: 

619 self._parent_dir = ref(new_parent) 

620 new_parent._add_child(self) 

621 else: 

622 if old_parent is not None and not old_parent.is_detached: 622 ↛ 624line 622 didn't jump to line 624, because the condition on line 622 was never false

623 self._last_known_parent_path = old_parent.path 

624 self._parent_dir = None 

625 self._parent_path_cache = None 

626 

627 @property 

628 def parent_dir_path(self) -> Optional[str]: 

629 if self.is_detached: 629 ↛ 630line 629 didn't jump to line 630, because the condition on line 629 was never true

630 return self._last_known_parent_path 

631 return assume_not_none(self.parent_dir).path 

632 

633 def chown( 

634 self, 

635 owner: Optional[StaticFileSystemOwner], 

636 group: Optional[StaticFileSystemGroup], 

637 ) -> None: 

638 """Change the owner/group of this path 

639 

640 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

641 :param group: The desired group definition for this path. If None, then no change of group is performed. 

642 """ 

643 self._rw_check() 

644 

645 if owner is not None: 

646 self._owner = owner.ownership_definition 

647 if group is not None: 

648 self._group = group.ownership_definition 

649 

650 def stat(self) -> os.stat_result: 

651 st = self._stat_cache 

652 if st is None: 

653 st = self._uncached_stat() 

654 self._stat_cache = st 

655 return st 

656 

657 def _uncached_stat(self) -> os.stat_result: 

658 return os.lstat(self.fs_path) 

659 

660 @property 

661 def mode(self) -> int: 

662 current_mode = self._mode 

663 if current_mode is None: 663 ↛ 664line 663 didn't jump to line 664, because the condition on line 663 was never true

664 current_mode = stat.S_IMODE(self.stat().st_mode) 

665 self._mode = current_mode 

666 return current_mode 

667 

668 @mode.setter 

669 def mode(self, new_mode: int) -> None: 

670 self._rw_check() 

671 min_bit = 0o500 if self.is_dir else 0o400 

672 if (new_mode & min_bit) != min_bit: 672 ↛ 673line 672 didn't jump to line 673, because the condition on line 672 was never true

673 omode = oct(new_mode)[2:] 

674 omin = oct(min_bit)[2:] 

675 raise ValueError( 

676 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

677 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

678 " There are no paths that do not need these requirements met and they can cause" 

679 " problems during build or on the final system." 

680 ) 

681 self._mode = new_mode 

682 

683 @property 

684 def mtime(self) -> float: 

685 mtime = self._mtime 

686 if mtime is None: 

687 mtime = self.stat().st_mtime 

688 self._mtime = mtime 

689 return mtime 

690 

691 @mtime.setter 

692 def mtime(self, new_mtime: float) -> None: 

693 self._rw_check() 

694 self._mtime = new_mtime 

695 

696 @property 

697 def tar_owner_info(self) -> Tuple[str, int, str, int]: 

698 owner = self._owner 

699 group = self._group 

700 return ( 

701 owner.entity_name, 

702 owner.entity_id, 

703 group.entity_name, 

704 group.entity_id, 

705 ) 

706 

707 @property 

708 def _can_replace_inline(self) -> bool: 

709 return False 

710 

711 @contextlib.contextmanager 

712 def add_file( 

713 self, 

714 name: str, 

715 *, 

716 unlink_if_exists: bool = True, 

717 use_fs_path_mode: bool = False, 

718 mode: int = 0o0644, 

719 mtime: Optional[float] = None, 

720 # Special-case parameters that are not exposed in the API 

721 fs_basename_matters: bool = False, 

722 subdir_key: Optional[str] = None, 

723 ) -> Iterator["FSPath"]: 

724 if "/" in name or name in {".", ".."}: 724 ↛ 725line 724 didn't jump to line 725, because the condition on line 724 was never true

725 raise ValueError(f'Invalid file name: "{name}"') 

726 if not self.is_dir: 726 ↛ 727line 726 didn't jump to line 727, because the condition on line 726 was never true

727 raise TypeError( 

728 f"Cannot create {self._orphan_safe_path()}/{name}:" 

729 f" {self._orphan_safe_path()} is not a directory" 

730 ) 

731 self._rw_check() 

732 existing = self.get(name) 

733 if existing is not None: 733 ↛ 734line 733 didn't jump to line 734, because the condition on line 733 was never true

734 if not unlink_if_exists: 

735 raise ValueError( 

736 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

737 f" and exist_ok was False" 

738 ) 

739 existing.unlink(recursive=False) 

740 

741 if fs_basename_matters and subdir_key is None: 741 ↛ 742line 741 didn't jump to line 742, because the condition on line 741 was never true

742 raise ValueError( 

743 "When fs_basename_matters is True, a subdir_key must be provided" 

744 ) 

745 

746 directory = generated_content_dir(subdir_key=subdir_key) 

747 

748 if fs_basename_matters: 748 ↛ 749line 748 didn't jump to line 749, because the condition on line 748 was never true

749 fs_path = os.path.join(directory, name) 

750 with open(fs_path, "xb") as _: 

751 # Ensure that the fs_path exists 

752 pass 

753 child = FSBackedFilePath( 

754 name, 

755 self, 

756 fs_path, 

757 replaceable_inline=True, 

758 mtime=mtime, 

759 ) 

760 yield child 

761 else: 

762 with tempfile.NamedTemporaryFile( 

763 dir=directory, suffix=f"__{name}", delete=False 

764 ) as fd: 

765 fs_path = fd.name 

766 child = FSBackedFilePath( 

767 name, 

768 self, 

769 fs_path, 

770 replaceable_inline=True, 

771 mtime=mtime, 

772 ) 

773 fd.close() 

774 yield child 

775 

776 if use_fs_path_mode: 776 ↛ 778line 776 didn't jump to line 778, because the condition on line 776 was never true

777 # Ensure the caller can see the current mode 

778 os.chmod(fs_path, mode) 

779 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

780 child._reset_caches() 

781 if not use_fs_path_mode: 781 ↛ exitline 781 didn't return from function 'add_file', because the condition on line 781 was never false

782 child.mode = mode 

783 

784 def insert_file_from_fs_path( 

785 self, 

786 name: str, 

787 fs_path: str, 

788 *, 

789 exist_ok: bool = True, 

790 use_fs_path_mode: bool = False, 

791 mode: int = 0o0644, 

792 require_copy_on_write: bool = True, 

793 follow_symlinks: bool = True, 

794 reference_path: Optional[VirtualPath] = None, 

795 ) -> "FSPath": 

796 if "/" in name or name in {".", ".."}: 796 ↛ 797line 796 didn't jump to line 797, because the condition on line 796 was never true

797 raise ValueError(f'Invalid file name: "{name}"') 

798 if not self.is_dir: 798 ↛ 799line 798 didn't jump to line 799, because the condition on line 798 was never true

799 raise TypeError( 

800 f"Cannot create {self._orphan_safe_path()}/{name}:" 

801 f" {self._orphan_safe_path()} is not a directory" 

802 ) 

803 self._rw_check() 

804 if name in self and not exist_ok: 804 ↛ 805line 804 didn't jump to line 805, because the condition on line 804 was never true

805 raise ValueError( 

806 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

807 f" and exist_ok was False" 

808 ) 

809 new_fs_path = fs_path 

810 if follow_symlinks: 

811 if reference_path is not None: 811 ↛ 812line 811 didn't jump to line 812, because the condition on line 811 was never true

812 raise ValueError( 

813 "The reference_path cannot be used with follow_symlinks" 

814 ) 

815 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

816 

817 fmode: Optional[int] = mode 

818 if use_fs_path_mode: 

819 fmode = None 

820 

821 st = None 

822 if reference_path is None: 

823 st = os.lstat(new_fs_path) 

824 if stat.S_ISDIR(st.st_mode): 824 ↛ 825line 824 didn't jump to line 825, because the condition on line 824 was never true

825 raise ValueError( 

826 f'The provided path "{fs_path}" is a directory. However, this' 

827 " method does not support directories" 

828 ) 

829 

830 if not stat.S_ISREG(st.st_mode): 830 ↛ 831line 830 didn't jump to line 831, because the condition on line 830 was never true

831 if follow_symlinks: 

832 raise ValueError( 

833 f"The resolved fs_path ({new_fs_path}) was not a file." 

834 ) 

835 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

836 return FSBackedFilePath( 

837 name, 

838 self, 

839 new_fs_path, 

840 initial_mode=fmode, 

841 stat_cache=st, 

842 replaceable_inline=not require_copy_on_write, 

843 reference_path=reference_path, 

844 ) 

845 

846 def add_symlink( 

847 self, 

848 link_name: str, 

849 link_target: str, 

850 *, 

851 reference_path: Optional[VirtualPath] = None, 

852 ) -> "FSPath": 

853 if "/" in link_name or link_name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854, because the condition on line 853 was never true

854 raise ValueError( 

855 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

856 ) 

857 if not self.is_dir: 857 ↛ 858line 857 didn't jump to line 858, because the condition on line 857 was never true

858 raise TypeError( 

859 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

860 f" {self._orphan_safe_path()} is not a directory" 

861 ) 

862 self._rw_check() 

863 

864 existing = self.get(link_name) 

865 if existing: 865 ↛ 867line 865 didn't jump to line 867, because the condition on line 865 was never true

866 # Emulate ln -sf with attempts a non-recursive unlink first. 

867 existing.unlink(recursive=False) 

868 

869 return SymlinkVirtualPath( 

870 link_name, 

871 self, 

872 link_target, 

873 reference_path=reference_path, 

874 ) 

875 

876 def mkdir( 

877 self, 

878 name: str, 

879 *, 

880 reference_path: Optional[VirtualPath] = None, 

881 ) -> "FSPath": 

882 if "/" in name or name in {".", ".."}: 882 ↛ 883line 882 didn't jump to line 883, because the condition on line 882 was never true

883 raise ValueError( 

884 f'Invalid file name: "{name}" (it must be a valid basename)' 

885 ) 

886 if not self.is_dir: 886 ↛ 887line 886 didn't jump to line 887, because the condition on line 886 was never true

887 raise TypeError( 

888 f"Cannot create {self._orphan_safe_path()}/{name}:" 

889 f" {self._orphan_safe_path()} is not a directory" 

890 ) 

891 if reference_path is not None and not reference_path.is_dir: 891 ↛ 892line 891 didn't jump to line 892, because the condition on line 891 was never true

892 raise ValueError( 

893 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

894 ) 

895 self._rw_check() 

896 

897 existing = self.get(name) 

898 if existing: 898 ↛ 899line 898 didn't jump to line 899, because the condition on line 898 was never true

899 raise ValueError(f"Path {existing.path} already exist") 

900 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

901 

902 def mkdirs(self, path: str) -> "FSPath": 

903 return cast("FSPath", super().mkdirs(path)) 

904 

905 @property 

906 def is_read_write(self) -> bool: 

907 """When true, the file system entry may be mutated 

908 

909 :return: Whether file system mutations are permitted. 

910 """ 

911 if self.is_detached: 

912 return True 

913 return assume_not_none(self.parent_dir).is_read_write 

914 

915 def unlink(self, *, recursive: bool = False) -> None: 

916 """Unlink a file or a directory 

917 

918 This operation will detach the path from the file system (causing "is_detached" to return True). 

919 

920 Note that the root directory cannot be deleted. 

921 

922 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

923 as well. When False, an error is raised if the path is a non-empty directory 

924 """ 

925 if self.is_detached: 925 ↛ 926line 925 didn't jump to line 926, because the condition on line 925 was never true

926 return 

927 if not recursive and any(self.iterdir): 927 ↛ 928line 927 didn't jump to line 928, because the condition on line 927 was never true

928 raise ValueError( 

929 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

930 ) 

931 # The .parent_dir setter does a _rw_check() for us. 

932 self.parent_dir = None 

933 

934 def _reset_caches(self) -> None: 

935 self._mtime = None 

936 self._stat_cache = None 

937 

938 def metadata( 

939 self, 

940 metadata_type: Type[PMT], 

941 *, 

942 owning_plugin: Optional[str] = None, 

943 ) -> PathMetadataReference[PMT]: 

944 current_plugin = self._current_plugin() 

945 if owning_plugin is None: 945 ↛ 947line 945 didn't jump to line 947, because the condition on line 945 was never false

946 owning_plugin = current_plugin 

947 metadata_key = (owning_plugin, metadata_type) 

948 metadata_value = self._metadata.get(metadata_key) 

949 if metadata_value is None: 

950 if self.is_detached: 950 ↛ 951line 950 didn't jump to line 951, because the condition on line 950 was never true

951 raise TypeError( 

952 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

953 ) 

954 if not self.is_read_write: 

955 return AlwaysEmptyReadOnlyMetadataReference( 

956 owning_plugin, 

957 current_plugin, 

958 metadata_type, 

959 ) 

960 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

961 self._metadata[metadata_key] = metadata_value 

962 return PathMetadataReferenceImplementation( 

963 self, 

964 current_plugin, 

965 metadata_value, 

966 ) 

967 

968 @contextlib.contextmanager 

969 def replace_fs_path_content( 

970 self, 

971 *, 

972 use_fs_path_mode: bool = False, 

973 ) -> Iterator[str]: 

974 if not self.is_file: 974 ↛ 975line 974 didn't jump to line 975, because the condition on line 974 was never true

975 raise TypeError( 

976 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

977 ) 

978 self._rw_check() 

979 fs_path = self.fs_path 

980 if not self._can_replace_inline: 980 ↛ 992line 980 didn't jump to line 992, because the condition on line 980 was never false

981 fs_path = self.fs_path 

982 directory = generated_content_dir() 

983 with tempfile.NamedTemporaryFile( 

984 dir=directory, suffix=f"__{self.name}", delete=False 

985 ) as new_path_fd: 

986 new_path_fd.close() 

987 _cp_a(fs_path, new_path_fd.name) 

988 fs_path = new_path_fd.name 

989 self._replaced_path(fs_path) 

990 assert self.fs_path == fs_path 

991 

992 current_mtime = self._mtime 

993 if current_mtime is not None: 

994 os.utime(fs_path, (current_mtime, current_mtime)) 

995 

996 current_mode = self.mode 

997 yield fs_path 

998 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

999 if not use_fs_path_mode: 999 ↛ 1001line 999 didn't jump to line 1001, because the condition on line 999 was never false

1000 os.chmod(fs_path, current_mode) 

1001 self._reset_caches() 

1002 

1003 def _replaced_path(self, new_fs_path: str) -> None: 

1004 raise NotImplementedError 

1005 

1006 

1007class VirtualFSPathBase(FSPath, ABC): 

1008 __slots__ = () 

1009 

1010 def __init__( 

1011 self, 

1012 basename: str, 

1013 parent: Optional["FSPath"], 

1014 children: Optional[Dict[str, "FSPath"]] = None, 

1015 initial_mode: Optional[int] = None, 

1016 mtime: Optional[float] = None, 

1017 stat_cache: Optional[os.stat_result] = None, 

1018 ) -> None: 

1019 super().__init__( 

1020 basename, 

1021 parent, 

1022 children, 

1023 initial_mode=initial_mode, 

1024 mtime=mtime, 

1025 stat_cache=stat_cache, 

1026 ) 

1027 

1028 @property 

1029 def mtime(self) -> float: 

1030 mtime = self._mtime 

1031 if mtime is None: 

1032 mtime = time.time() 

1033 self._mtime = mtime 

1034 return mtime 

1035 

1036 @property 

1037 def has_fs_path(self) -> bool: 

1038 return False 

1039 

1040 def stat(self) -> os.stat_result: 

1041 if not self.has_fs_path: 

1042 raise PureVirtualPathError( 

1043 "stat() is only applicable to paths backed by the file system. The path" 

1044 f" {self._orphan_safe_path()!r} is purely virtual" 

1045 ) 

1046 return super().stat() 

1047 

1048 @property 

1049 def fs_path(self) -> str: 

1050 if not self.has_fs_path: 

1051 raise PureVirtualPathError( 

1052 "fs_path is only applicable to paths backed by the file system. The path" 

1053 f" {self._orphan_safe_path()!r} is purely virtual" 

1054 ) 

1055 return self.fs_path 

1056 

1057 

1058class FSRootDir(FSPath): 

1059 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1060 

1061 def __init__(self, fs_path: Optional[str] = None) -> None: 

1062 self._fs_path = fs_path 

1063 self._fs_read_write = True 

1064 super().__init__( 

1065 ".", 

1066 None, 

1067 children={}, 

1068 initial_mode=0o755, 

1069 ) 

1070 self._plugin_context = CurrentPluginContextManager("debputy") 

1071 

1072 @property 

1073 def is_detached(self) -> bool: 

1074 return False 

1075 

1076 def _orphan_safe_path(self) -> str: 

1077 return self.name 

1078 

1079 @property 

1080 def path(self) -> str: 

1081 return self.name 

1082 

1083 @property 

1084 def parent_dir(self) -> Optional["FSPath"]: 

1085 return None 

1086 

1087 @parent_dir.setter 

1088 def parent_dir(self, new_parent: Optional[FSPath]) -> None: 

1089 if new_parent is not None: 

1090 raise ValueError("The root directory cannot become a non-root directory") 

1091 

1092 @property 

1093 def parent_dir_path(self) -> Optional[str]: 

1094 return None 

1095 

1096 @property 

1097 def is_dir(self) -> bool: 

1098 return True 

1099 

1100 @property 

1101 def is_file(self) -> bool: 

1102 return False 

1103 

1104 @property 

1105 def is_symlink(self) -> bool: 

1106 return False 

1107 

1108 def readlink(self) -> str: 

1109 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1110 

1111 @property 

1112 def has_fs_path(self) -> bool: 

1113 return self._fs_path is not None 

1114 

1115 def stat(self) -> os.stat_result: 

1116 if not self.has_fs_path: 

1117 raise PureVirtualPathError( 

1118 "stat() is only applicable to paths backed by the file system. The path" 

1119 f" {self._orphan_safe_path()!r} is purely virtual" 

1120 ) 

1121 return os.stat(self.fs_path) 

1122 

1123 @property 

1124 def fs_path(self) -> str: 

1125 if not self.has_fs_path: 1125 ↛ 1126line 1125 didn't jump to line 1126, because the condition on line 1125 was never true

1126 raise PureVirtualPathError( 

1127 "fs_path is only applicable to paths backed by the file system. The path" 

1128 f" {self._orphan_safe_path()!r} is purely virtual" 

1129 ) 

1130 return assume_not_none(self._fs_path) 

1131 

1132 @property 

1133 def is_read_write(self) -> bool: 

1134 return self._fs_read_write 

1135 

1136 @is_read_write.setter 

1137 def is_read_write(self, new_value: bool) -> None: 

1138 self._fs_read_write = new_value 

1139 

1140 def prune_if_empty_dir(self) -> None: 

1141 # No-op for the root directory. There is never a case where you want to delete this directory 

1142 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1143 return 

1144 

1145 def unlink(self, *, recursive: bool = False) -> None: 

1146 # There is never a case where you want to delete this directory (and even if you could, 

1147 # debputy will need it for technical reasons, so the root dir stays) 

1148 raise TypeError("Cannot delete the root directory") 

1149 

1150 def _current_plugin(self) -> str: 

1151 return self._plugin_context.current_plugin_name 

1152 

1153 @contextlib.contextmanager 

1154 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1155 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1156 yield r 

1157 

1158 

1159class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1160 __slots__ = ("_reference_path",) 

1161 

1162 def __init__( 

1163 self, 

1164 basename: str, 

1165 parent: FSPath, 

1166 *, 

1167 default_mode: int, 

1168 reference_path: Optional[VirtualPath] = None, 

1169 ) -> None: 

1170 super().__init__( 

1171 basename, 

1172 parent=parent, 

1173 initial_mode=reference_path.mode if reference_path else default_mode, 

1174 ) 

1175 self._reference_path = reference_path 

1176 

1177 @property 

1178 def has_fs_path(self) -> bool: 

1179 ref_path = self._reference_path 

1180 return ref_path is not None and ref_path.has_fs_path 

1181 

1182 @property 

1183 def mtime(self) -> float: 

1184 mtime = self._mtime 

1185 if mtime is None: 1185 ↛ 1192line 1185 didn't jump to line 1192, because the condition on line 1185 was never false

1186 ref_path = self._reference_path 

1187 if ref_path: 1187 ↛ 1190line 1187 didn't jump to line 1190, because the condition on line 1187 was never false

1188 mtime = ref_path.mtime 

1189 else: 

1190 mtime = super().mtime 

1191 self._mtime = mtime 

1192 return mtime 

1193 

1194 @mtime.setter 

1195 def mtime(self, new_mtime: float) -> None: 

1196 self._rw_check() 

1197 self._mtime = new_mtime 

1198 

1199 @property 

1200 def fs_path(self) -> str: 

1201 ref_path = self._reference_path 

1202 if ref_path is not None and ( 1202 ↛ 1206line 1202 didn't jump to line 1206, because the condition on line 1202 was never false

1203 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1204 ): 

1205 return ref_path.fs_path 

1206 return super().fs_path 

1207 

1208 def stat(self) -> os.stat_result: 

1209 ref_path = self._reference_path 

1210 if ref_path is not None and ( 

1211 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1212 ): 

1213 return ref_path.stat() 

1214 return super().stat() 

1215 

1216 def open( 

1217 self, 

1218 *, 

1219 byte_io: bool = False, 

1220 buffering: int = -1, 

1221 ) -> Union[TextIO, BinaryIO]: 

1222 reference_path = self._reference_path 

1223 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1224 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1225 return super().open(byte_io=byte_io, buffering=buffering) 

1226 

1227 

1228class VirtualDirectoryFSPath(VirtualPathWithReference): 

1229 __slots__ = ("_reference_path",) 

1230 

1231 def __init__( 

1232 self, 

1233 basename: str, 

1234 parent: FSPath, 

1235 *, 

1236 reference_path: Optional[VirtualPath] = None, 

1237 ) -> None: 

1238 super().__init__( 

1239 basename, 

1240 parent, 

1241 reference_path=reference_path, 

1242 default_mode=0o755, 

1243 ) 

1244 self._reference_path = reference_path 

1245 assert reference_path is None or reference_path.is_dir 

1246 

1247 @property 

1248 def is_dir(self) -> bool: 

1249 return True 

1250 

1251 @property 

1252 def is_file(self) -> bool: 

1253 return False 

1254 

1255 @property 

1256 def is_symlink(self) -> bool: 

1257 return False 

1258 

1259 def readlink(self) -> str: 

1260 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1261 

1262 

1263class SymlinkVirtualPath(VirtualPathWithReference): 

1264 __slots__ = ("_link_target",) 

1265 

1266 def __init__( 

1267 self, 

1268 basename: str, 

1269 parent_dir: FSPath, 

1270 link_target: str, 

1271 *, 

1272 reference_path: Optional[VirtualPath] = None, 

1273 ) -> None: 

1274 super().__init__( 

1275 basename, 

1276 parent=parent_dir, 

1277 default_mode=_SYMLINK_MODE, 

1278 reference_path=reference_path, 

1279 ) 

1280 self._link_target = link_target 

1281 

1282 @property 

1283 def is_dir(self) -> bool: 

1284 return False 

1285 

1286 @property 

1287 def is_file(self) -> bool: 

1288 return False 

1289 

1290 @property 

1291 def is_symlink(self) -> bool: 

1292 return True 

1293 

1294 def readlink(self) -> str: 

1295 return self._link_target 

1296 

1297 

1298class FSBackedFilePath(VirtualPathWithReference): 

1299 __slots__ = ("_fs_path", "_replaceable_inline") 

1300 

1301 def __init__( 

1302 self, 

1303 basename: str, 

1304 parent_dir: FSPath, 

1305 fs_path: str, 

1306 *, 

1307 replaceable_inline: bool = False, 

1308 initial_mode: Optional[int] = None, 

1309 mtime: Optional[float] = None, 

1310 stat_cache: Optional[os.stat_result] = None, 

1311 reference_path: Optional[VirtualPath] = None, 

1312 ) -> None: 

1313 super().__init__( 

1314 basename, 

1315 parent_dir, 

1316 default_mode=0o644, 

1317 reference_path=reference_path, 

1318 ) 

1319 self._fs_path = fs_path 

1320 self._replaceable_inline = replaceable_inline 

1321 if initial_mode is not None: 

1322 self.mode = initial_mode 

1323 if mtime is not None: 

1324 self._mtime = mtime 

1325 self._stat_cache = stat_cache 

1326 assert ( 

1327 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1328 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1329 

1330 @property 

1331 def is_dir(self) -> bool: 

1332 return False 

1333 

1334 @property 

1335 def is_file(self) -> bool: 

1336 return True 

1337 

1338 @property 

1339 def is_symlink(self) -> bool: 

1340 return False 

1341 

1342 def readlink(self) -> str: 

1343 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1344 

1345 @property 

1346 def has_fs_path(self) -> bool: 

1347 return True 

1348 

1349 @property 

1350 def fs_path(self) -> str: 

1351 return self._fs_path 

1352 

1353 @property 

1354 def _can_replace_inline(self) -> bool: 

1355 return self._replaceable_inline 

1356 

1357 def _replaced_path(self, new_fs_path: str) -> None: 

1358 self._fs_path = new_fs_path 

1359 self._reference_path = None 

1360 self._replaceable_inline = True 

1361 

1362 

1363_SYMLINK_MODE = 0o777 

1364 

1365 

1366class VirtualTestPath(FSPath): 

1367 __slots__ = ( 

1368 "_path_type", 

1369 "_has_fs_path", 

1370 "_fs_path", 

1371 "_link_target", 

1372 "_content", 

1373 "_materialized_content", 

1374 ) 

1375 

1376 def __init__( 

1377 self, 

1378 basename: str, 

1379 parent_dir: Optional[FSPath], 

1380 mode: Optional[int] = None, 

1381 mtime: Optional[float] = None, 

1382 is_dir: bool = False, 

1383 has_fs_path: Optional[bool] = False, 

1384 fs_path: Optional[str] = None, 

1385 link_target: Optional[str] = None, 

1386 content: Optional[str] = None, 

1387 materialized_content: Optional[str] = None, 

1388 ) -> None: 

1389 if is_dir: 

1390 self._path_type = PathType.DIRECTORY 

1391 elif link_target is not None: 

1392 self._path_type = PathType.SYMLINK 

1393 if mode is not None and mode != _SYMLINK_MODE: 1393 ↛ 1394line 1393 didn't jump to line 1394, because the condition on line 1393 was never true

1394 raise ValueError( 

1395 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1396 ) 

1397 assert mode is None or mode == _SYMLINK_MODE 

1398 else: 

1399 self._path_type = PathType.FILE 

1400 

1401 if mode is not None: 

1402 initial_mode = mode 

1403 else: 

1404 initial_mode = 0o755 if is_dir else 0o644 

1405 

1406 self._link_target = link_target 

1407 if has_fs_path is None: 

1408 has_fs_path = bool(fs_path) 

1409 self._has_fs_path = has_fs_path 

1410 self._fs_path = fs_path 

1411 self._materialized_content = materialized_content 

1412 super().__init__( 

1413 basename, 

1414 parent=parent_dir, 

1415 initial_mode=initial_mode, 

1416 mtime=mtime, 

1417 ) 

1418 self._content = content 

1419 

1420 @property 

1421 def is_dir(self) -> bool: 

1422 return self._path_type == PathType.DIRECTORY 

1423 

1424 @property 

1425 def is_file(self) -> bool: 

1426 return self._path_type == PathType.FILE 

1427 

1428 @property 

1429 def is_symlink(self) -> bool: 

1430 return self._path_type == PathType.SYMLINK 

1431 

1432 def readlink(self) -> str: 

1433 if not self.is_symlink: 1433 ↛ 1434line 1433 didn't jump to line 1434, because the condition on line 1433 was never true

1434 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1435 link_target = self._link_target 

1436 assert link_target is not None 

1437 return link_target 

1438 

1439 @property 

1440 def mtime(self) -> float: 

1441 if self._mtime is None: 

1442 self._mtime = time.time() 

1443 return self._mtime 

1444 

1445 @mtime.setter 

1446 def mtime(self, new_mtime: float) -> None: 

1447 self._rw_check() 

1448 self._mtime = new_mtime 

1449 

1450 @property 

1451 def has_fs_path(self) -> bool: 

1452 return self._has_fs_path 

1453 

1454 def stat(self) -> os.stat_result: 

1455 if self.has_fs_path: 1455 ↛ 1470line 1455 didn't jump to line 1470, because the condition on line 1455 was never false

1456 path = self.fs_path 

1457 if path is None: 1457 ↛ 1458line 1457 didn't jump to line 1458, because the condition on line 1457 was never true

1458 raise PureVirtualPathError( 

1459 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1460 " cannot provide!" 

1461 ) 

1462 try: 

1463 return os.stat(path) 

1464 except FileNotFoundError as e: 

1465 raise PureVirtualPathError( 

1466 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1467 " cannot provide! (An fs_path was provided, but it did not exist)" 

1468 ) from e 

1469 

1470 raise PureVirtualPathError( 

1471 "stat() is only applicable to paths backed by the file system. The path" 

1472 f" {self._orphan_safe_path()!r} is purely virtual" 

1473 ) 

1474 

1475 @property 

1476 def size(self) -> int: 

1477 if self._content is not None: 1477 ↛ 1478line 1477 didn't jump to line 1478, because the condition on line 1477 was never true

1478 return len(self._content.encode("utf-8")) 

1479 if not self.has_fs_path or self.fs_path is None: 

1480 return 0 

1481 return self.stat().st_size 

1482 

1483 @property 

1484 def fs_path(self) -> str: 

1485 if self.has_fs_path: 

1486 if self._fs_path is None and self._materialized_content is not None: 

1487 with tempfile.NamedTemporaryFile( 

1488 mode="w+t", 

1489 encoding="utf-8", 

1490 suffix=f"__{self.name}", 

1491 delete=False, 

1492 ) as fd: 

1493 filepath = fd.name 

1494 fd.write(self._materialized_content) 

1495 self._fs_path = filepath 

1496 atexit.register(lambda: os.unlink(filepath)) 1496 ↛ exitline 1496 didn't run the lambda on line 1496

1497 

1498 path = self._fs_path 

1499 if path is None: 1499 ↛ 1500line 1499 didn't jump to line 1500, because the condition on line 1499 was never true

1500 raise PureVirtualPathError( 

1501 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1502 " mock path cannot provide!" 

1503 ) 

1504 return path 

1505 raise PureVirtualPathError( 

1506 "fs_path is only applicable to paths backed by the file system. The path" 

1507 f" {self._orphan_safe_path()!r} is purely virtual" 

1508 ) 

1509 

1510 def replace_fs_path_content( 

1511 self, 

1512 *, 

1513 use_fs_path_mode: bool = False, 

1514 ) -> ContextManager[str]: 

1515 if self._content is not None: 1515 ↛ 1516line 1515 didn't jump to line 1516, because the condition on line 1515 was never true

1516 raise TypeError( 

1517 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1518 " created with `content` but for this method to work, the path should have been" 

1519 " created with `materialized_content`" 

1520 ) 

1521 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1522 

1523 def open( 

1524 self, 

1525 *, 

1526 byte_io: bool = False, 

1527 buffering: int = -1, 

1528 ) -> Union[TextIO, BinaryIO]: 

1529 if self._content is None: 

1530 try: 

1531 return super().open(byte_io=byte_io, buffering=buffering) 

1532 except FileNotFoundError as e: 

1533 raise TestPathWithNonExistentFSPathError( 

1534 "The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1535 " exist. This exception can only occur in the testsuite. Either have the" 

1536 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1537 " if that is too painful in general, have the code accept this error as a " 

1538 " test only-case and provide a default." 

1539 ) from e 

1540 

1541 if byte_io: 

1542 return io.BytesIO(self._content.encode("utf-8")) 

1543 return io.StringIO(self._content) 

1544 

1545 def _replaced_path(self, new_fs_path: str) -> None: 

1546 self._fs_path = new_fs_path 

1547 

1548 

1549class FSROOverlay(VirtualPathBase): 

1550 __slots__ = ( 

1551 "_path", 

1552 "_fs_path", 

1553 "_parent", 

1554 "_stat_cache", 

1555 "_readlink_cache", 

1556 "_children", 

1557 "_stat_failed_cache", 

1558 "__weakref__", 

1559 ) 

1560 

1561 def __init__( 

1562 self, 

1563 path: str, 

1564 fs_path: str, 

1565 parent: Optional["FSROOverlay"], 

1566 ) -> None: 

1567 self._path: str = path 

1568 self._fs_path: str = _normalize_path(fs_path, with_prefix=False) 

1569 self._parent: Optional[ReferenceType[FSROOverlay]] = ( 

1570 ref(parent) if parent is not None else None 

1571 ) 

1572 self._stat_cache: Optional[os.stat_result] = None 

1573 self._readlink_cache: Optional[str] = None 

1574 self._stat_failed_cache = False 

1575 self._children: Optional[Mapping[str, FSROOverlay]] = None 

1576 

1577 @classmethod 

1578 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": 

1579 return FSROOverlay(path, fs_path, None) 

1580 

1581 @property 

1582 def name(self) -> str: 

1583 return os.path.basename(self._path) 

1584 

1585 @property 

1586 def iterdir(self) -> Iterable["FSROOverlay"]: 

1587 if not self.is_dir: 

1588 return 

1589 if self._children is None: 

1590 self._ensure_children_are_resolved() 

1591 yield from assume_not_none(self._children).values() 

1592 

1593 def lookup(self, path: str) -> Optional["FSROOverlay"]: 

1594 if not self.is_dir: 

1595 return None 

1596 if self._children is None: 

1597 self._ensure_children_are_resolved() 

1598 

1599 absolute, _, path_parts = _split_path(path) 

1600 current = cast("FSROOverlay", _root(self)) if absolute else self 

1601 for no, dir_part in enumerate(path_parts): 

1602 if dir_part == ".": 

1603 continue 

1604 if dir_part == "..": 

1605 p = current.parent_dir 

1606 if current is None: 

1607 raise ValueError(f'The path "{path}" escapes the root dir') 

1608 current = p 

1609 continue 

1610 try: 

1611 current = current[dir_part] 

1612 except KeyError: 

1613 return None 

1614 return current 

1615 

1616 def all_paths(self) -> Iterable["FSROOverlay"]: 

1617 yield self 

1618 if not self.is_dir: 

1619 return 

1620 stack = list(self.iterdir) 

1621 stack.reverse() 

1622 while stack: 

1623 current = stack.pop() 

1624 yield current 

1625 if current.is_dir: 

1626 if current._children is None: 

1627 current._ensure_children_are_resolved() 

1628 stack.extend(reversed(current._children.values())) 

1629 

1630 def _ensure_children_are_resolved(self) -> None: 

1631 if not self.is_dir or self._children: 

1632 return 

1633 dir_path = self.path 

1634 dir_fs_path = self.fs_path 

1635 children = {} 

1636 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1637 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1638 child_fs_path = ( 

1639 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1640 ) 

1641 children[name] = FSROOverlay( 

1642 child_path, 

1643 child_fs_path, 

1644 self, 

1645 ) 

1646 self._children = children 

1647 

1648 @property 

1649 def is_detached(self) -> bool: 

1650 return False 

1651 

1652 def __getitem__(self, key) -> "VirtualPath": 

1653 if not self.is_dir: 

1654 raise KeyError(key) 

1655 if self._children is None: 

1656 self._ensure_children_are_resolved() 

1657 if isinstance(key, FSPath): 

1658 key = key.name 

1659 return self._children[key] 

1660 

1661 def __delitem__(self, key) -> None: 

1662 self._error_ro_fs() 

1663 

1664 @property 

1665 def is_read_write(self) -> bool: 

1666 return False 

1667 

1668 def _rw_check(self) -> None: 

1669 self._error_ro_fs() 

1670 

1671 def _error_ro_fs(self) -> NoReturn: 

1672 raise DebputyFSIsROError( 

1673 f'Attempt to write to "{self.path}" failed:' 

1674 " Debputy Virtual File system is R/O." 

1675 ) 

1676 

1677 @property 

1678 def path(self) -> str: 

1679 return self._path 

1680 

1681 @property 

1682 def parent_dir(self) -> Optional["FSROOverlay"]: 

1683 parent = self._parent 

1684 if parent is None: 

1685 return None 

1686 resolved = parent() 

1687 if resolved is None: 

1688 raise RuntimeError("Parent was garbage collected!") 

1689 return resolved 

1690 

1691 def stat(self) -> os.stat_result: 

1692 if self._stat_failed_cache: 

1693 raise FileNotFoundError( 

1694 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1695 ) 

1696 

1697 if self._stat_cache is None: 

1698 try: 

1699 self._stat_cache = os.lstat(self.fs_path) 

1700 except FileNotFoundError: 

1701 self._stat_failed_cache = True 

1702 raise 

1703 return self._stat_cache 

1704 

1705 @property 

1706 def mode(self) -> int: 

1707 return stat.S_IMODE(self.stat().st_mode) 

1708 

1709 @mode.setter 

1710 def mode(self, _unused: int) -> None: 

1711 self._error_ro_fs() 

1712 

1713 @property 

1714 def mtime(self) -> float: 

1715 return self.stat().st_mtime 

1716 

1717 @mtime.setter 

1718 def mtime(self, new_mtime: float) -> None: 

1719 self._error_ro_fs() 

1720 

1721 def readlink(self) -> str: 

1722 if not self.is_symlink: 

1723 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1724 if self._readlink_cache is None: 

1725 self._readlink_cache = os.readlink(self.fs_path) 

1726 return self._readlink_cache 

1727 

1728 @property 

1729 def fs_path(self) -> str: 

1730 return self._fs_path 

1731 

1732 @property 

1733 def is_dir(self) -> bool: 

1734 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1735 try: 

1736 return stat.S_ISDIR(self.stat().st_mode) 

1737 except FileNotFoundError: 

1738 return False 

1739 

1740 @property 

1741 def is_file(self) -> bool: 

1742 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1743 try: 

1744 return stat.S_ISREG(self.stat().st_mode) 

1745 except FileNotFoundError: 

1746 return False 

1747 

1748 @property 

1749 def is_symlink(self) -> bool: 

1750 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1751 try: 

1752 return stat.S_ISLNK(self.stat().st_mode) 

1753 except FileNotFoundError: 

1754 return False 

1755 

1756 @property 

1757 def has_fs_path(self) -> bool: 

1758 return True 

1759 

1760 def open( 

1761 self, 

1762 *, 

1763 byte_io: bool = False, 

1764 buffering: int = -1, 

1765 ) -> Union[TextIO, BinaryIO]: 

1766 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1767 # case. 

1768 if not self.is_file and not self.is_symlink: 

1769 raise TypeError( 

1770 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1771 ) 

1772 

1773 if byte_io: 

1774 return open(self.fs_path, "rb", buffering=buffering) 

1775 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering) 

1776 

1777 def chown( 

1778 self, 

1779 owner: Optional[StaticFileSystemOwner], 

1780 group: Optional[StaticFileSystemGroup], 

1781 ) -> None: 

1782 self._error_ro_fs() 

1783 

1784 def mkdir(self, name: str) -> "VirtualPath": 

1785 self._error_ro_fs() 

1786 

1787 def add_file( 

1788 self, 

1789 name: str, 

1790 *, 

1791 unlink_if_exists: bool = True, 

1792 use_fs_path_mode: bool = False, 

1793 mode: int = 0o0644, 

1794 mtime: Optional[float] = None, 

1795 ) -> ContextManager["VirtualPath"]: 

1796 self._error_ro_fs() 

1797 

1798 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

1799 self._error_ro_fs() 

1800 

1801 def unlink(self, *, recursive: bool = False) -> None: 

1802 self._error_ro_fs() 

1803 

1804 def metadata( 

1805 self, 

1806 metadata_type: Type[PMT], 

1807 *, 

1808 owning_plugin: Optional[str] = None, 

1809 ) -> PathMetadataReference[PMT]: 

1810 current_plugin = self._current_plugin() 

1811 if owning_plugin is None: 

1812 owning_plugin = current_plugin 

1813 return AlwaysEmptyReadOnlyMetadataReference( 

1814 owning_plugin, 

1815 current_plugin, 

1816 metadata_type, 

1817 ) 

1818 

1819 

1820class FSROOverlayRootDir(FSROOverlay): 

1821 __slots__ = ("_plugin_context",) 

1822 

1823 def __init__(self, path: str, fs_path: str) -> None: 

1824 super().__init__(path, fs_path, None) 

1825 self._plugin_context = CurrentPluginContextManager("debputy") 

1826 

1827 def _current_plugin(self) -> str: 

1828 return self._plugin_context.current_plugin_name 

1829 

1830 @contextlib.contextmanager 

1831 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1832 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1833 yield r 

1834 

1835 

1836def as_path_def(pd: Union[str, PathDef]) -> PathDef: 

1837 return PathDef(pd) if isinstance(pd, str) else pd 

1838 

1839 

1840def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]: 

1841 yield from (as_path_def(p) for p in paths) 

1842 

1843 

1844def build_virtual_fs( 

1845 paths: Iterable[Union[str, PathDef]], 

1846 read_write_fs: bool = False, 

1847) -> "FSPath": 

1848 root_dir: Optional[FSRootDir] = None 

1849 directories: Dict[str, FSPath] = {} 

1850 non_directories = set() 

1851 

1852 def _ensure_parent_dirs(p: str) -> None: 

1853 current = p.rstrip("/") 

1854 missing_dirs = [] 

1855 while True: 

1856 current = os.path.dirname(current) 

1857 if current in directories: 

1858 break 

1859 if current in non_directories: 1859 ↛ 1860line 1859 didn't jump to line 1860, because the condition on line 1859 was never true

1860 raise ValueError( 

1861 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

1862 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

1863 ) 

1864 missing_dirs.append(current) 

1865 for dir_path in reversed(missing_dirs): 

1866 parent_dir = directories[os.path.dirname(dir_path)] 

1867 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

1868 directories[dir_path] = d 

1869 

1870 for path_def in as_path_defs(paths): 

1871 path = path_def.path_name 

1872 if path in directories or path in non_directories: 1872 ↛ 1873line 1872 didn't jump to line 1873, because the condition on line 1872 was never true

1873 raise ValueError( 

1874 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

1875 ' "correct order" (ensure directories occur before their children)' 

1876 ) 

1877 if root_dir is None: 

1878 root_fs_path = None 

1879 if path in (".", "./", "/"): 

1880 root_fs_path = path_def.fs_path 

1881 root_dir = FSRootDir(fs_path=root_fs_path) 

1882 directories["."] = root_dir 

1883 

1884 if path not in (".", "./", "/") and not path.startswith("./"): 

1885 path = "./" + path 

1886 if path not in (".", "./", "/"): 

1887 _ensure_parent_dirs(path) 

1888 if path in (".", "./"): 

1889 assert "." in directories 

1890 continue 

1891 is_dir = False 

1892 if path.endswith("/"): 

1893 path = path[:-1] 

1894 is_dir = True 

1895 directory = directories[os.path.dirname(path)] 

1896 assert not is_dir or not bool( 

1897 path_def.link_target 

1898 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

1899 fs_path = VirtualTestPath( 

1900 os.path.basename(path), 

1901 directory, 

1902 is_dir=is_dir, 

1903 mode=path_def.mode, 

1904 mtime=path_def.mtime, 

1905 has_fs_path=path_def.has_fs_path, 

1906 fs_path=path_def.fs_path, 

1907 link_target=path_def.link_target, 

1908 content=path_def.content, 

1909 materialized_content=path_def.materialized_content, 

1910 ) 

1911 assert not fs_path.is_detached 

1912 if fs_path.is_dir: 

1913 directories[fs_path.path] = fs_path 

1914 else: 

1915 non_directories.add(fs_path.path) 

1916 

1917 if root_dir is None: 

1918 root_dir = FSRootDir() 

1919 

1920 root_dir.is_read_write = read_write_fs 

1921 return root_dir