Coverage for src/debputy/path_matcher.py: 72%

279 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import fnmatch 

2import glob 

3import itertools 

4import os 

5import re 

6from enum import Enum 

7from typing import ( 

8 Callable, 

9 Optional, 

10 TypeVar, 

11 Iterable, 

12 Union, 

13 Sequence, 

14 Tuple, 

15) 

16 

17from debputy.intermediate_manifest import PathType 

18from debputy.plugin.api import VirtualPath 

19from debputy.substitution import Substitution, NULL_SUBSTITUTION 

20from debputy.types import VP 

21from debputy.util import _normalize_path, _error, escape_shell 

22 

23MR = TypeVar("MR") 

24_GLOB_PARTS = re.compile(r"[*?]|\[]?[^]]+]") 

25 

26 

27def _lookup_path(fs_root: VP, path: str) -> Optional[VP]: 

28 if not path.startswith("./"): 28 ↛ 29line 28 didn't jump to line 29, because the condition on line 28 was never true

29 raise ValueError("Directory must be normalized (and not the root directory)") 

30 if fs_root.name != "." or fs_root.parent_dir is not None: 30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true

31 raise ValueError("Provided fs_root must be the root directory") 

32 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

33 return fs_root.lookup(path[2:]) 

34 

35 

36def _compile_basename_glob( 

37 basename_glob: str, 

38) -> Tuple[Optional[str], Callable[[str], bool]]: 

39 remainder = None 

40 if not glob.has_magic(basename_glob): 40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true

41 return escape_shell(basename_glob), lambda x: x == basename_glob 

42 

43 if basename_glob.startswith("*"): 

44 if basename_glob.endswith("*"): 

45 remainder = basename_glob[1:-1] 

46 possible_quick_match = lambda x: remainder in x 

47 escaped_pattern = "*" + escape_shell(remainder) + "*" 

48 else: 

49 remainder = basename_glob[1:] 

50 possible_quick_match = lambda x: x.endswith(remainder) 

51 escaped_pattern = "*" + escape_shell(remainder) 

52 else: 

53 remainder = basename_glob[:-1] 

54 possible_quick_match = lambda x: x.startswith(remainder) 

55 escaped_pattern = escape_shell(remainder) + "*" 

56 

57 if not glob.has_magic(remainder): 

58 return escaped_pattern, possible_quick_match 

59 slow_pattern = re.compile(fnmatch.translate(basename_glob)) 

60 return None, lambda x: bool(slow_pattern.match(x)) 60 ↛ exitline 60 didn't run the lambda on line 60

61 

62 

63def _apply_match( 

64 fs_path: VP, 

65 match_part: Union[Callable[[str], bool], str], 

66) -> Iterable[VP]: 

67 if isinstance(match_part, str): 

68 m = fs_path.lookup(match_part) 

69 if m: 

70 yield m 

71 else: 

72 yield from (p for p in fs_path.iterdir if match_part(p.name)) 

73 

74 

75class MatchRuleType(Enum): 

76 EXACT_MATCH = "exact" 

77 BASENAME_GLOB = "basename-glob" 

78 DIRECT_CHILDREN_OF_DIR = "direct-children-of-dir" 

79 ANYTHING_BENEATH_DIR = "anything-beneath-dir" 

80 GENERIC_GLOB = "generic-glob" 

81 MATCH_ANYTHING = "match-anything" 

82 

83 

84class MatchRule: 

85 __slots__ = ("_rule_type",) 

86 

87 def __init__(self, rule_type: MatchRuleType) -> None: 

88 self._rule_type = rule_type 

89 

90 @property 

91 def rule_type(self) -> MatchRuleType: 

92 return self._rule_type 

93 

94 def finditer( 

95 self, 

96 fs_root: VP, 

97 *, 

98 ignore_paths: Optional[Callable[[VP], bool]] = None, 

99 ) -> Iterable[VP]: 

100 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

101 raise NotImplementedError 

102 

103 def _full_pattern(self) -> str: 

104 raise NotImplementedError 

105 

106 @property 

107 def path_type(self) -> Optional[PathType]: 

108 return None 

109 

110 def describe_match_short(self) -> str: 

111 return self._full_pattern() 

112 

113 def describe_match_exact(self) -> str: 

114 raise NotImplementedError 

115 

116 def shell_escape_pattern(self) -> str: 

117 raise TypeError("Pattern not suitable or not supported for shell escape") 

118 

119 @classmethod 

120 def recursive_beneath_directory( 

121 cls, 

122 directory: str, 

123 definition_source: str, 

124 path_type: Optional[PathType] = None, 

125 substitution: Substitution = NULL_SUBSTITUTION, 

126 ) -> "MatchRule": 

127 if directory in (".", "/"): 127 ↛ 128line 127 didn't jump to line 128, because the condition on line 127 was never true

128 return MATCH_ANYTHING 

129 assert not glob.has_magic(directory) 

130 return DirectoryBasedMatch( 

131 MatchRuleType.ANYTHING_BENEATH_DIR, 

132 substitution.substitute(_normalize_path(directory), definition_source), 

133 path_type=path_type, 

134 ) 

135 

136 @classmethod 

137 def from_path_or_glob( 

138 cls, 

139 path_or_glob: str, 

140 definition_source: str, 

141 path_type: Optional[PathType] = None, 

142 substitution: Substitution = NULL_SUBSTITUTION, 

143 ) -> "MatchRule": 

144 # TODO: Handle '{a,b,c}' patterns too 

145 # FIXME: Better error handling! 

146 normalized_no_prefix = _normalize_path(path_or_glob, with_prefix=False) 

147 if path_or_glob in ("*", "**/*", ".", "/"): 

148 assert path_type is None 

149 return MATCH_ANYTHING 

150 

151 # We do not support {a,b} at the moment. This check is not perfect, but it should catch the most obvious 

152 # unsupported usage. 

153 if ( 153 ↛ 158line 153 didn't jump to line 158

154 "{" in path_or_glob 

155 and ("," in path_or_glob or ".." in path_or_glob) 

156 and re.search(r"[{][^},.]*(?:,|[.][.])[^},.]*[}]", path_or_glob) 

157 ): 

158 m = re.search(r"(.*)[{]([^},.]*(?:,|[.][.])[^},.]*[}])", path_or_glob) 

159 assert m is not None 

160 replacement = m.group(1) + "{{OPEN_CURLY_BRACE}}" + m.group(2) 

161 _error( 

162 f'The pattern "{path_or_glob}" (defined in {definition_source}) looks like it contains a' 

163 f' brace expansion (such as "{{a,b}}" or "{{a..b}}"). Brace expansions are not supported.' 

164 " If you wanted to match the literal path a brace in it, please use a substitution to insert" 

165 f' the opening brace. As an example: "{replacement}"' 

166 ) 

167 

168 normalized_with_prefix = "./" + normalized_no_prefix 

169 # TODO: Check for escapes here "foo[?]/bar" can be written as an exact match for foo?/bar 

170 # - similar holds for "foo[?]/*" being a directory match (etc.). 

171 if not glob.has_magic(normalized_with_prefix): 

172 assert path_type is None 

173 return ExactFileSystemPath( 

174 substitution.substitute(normalized_with_prefix, definition_source) 

175 ) 

176 

177 directory = os.path.dirname(normalized_with_prefix) 

178 basename = os.path.basename(normalized_with_prefix) 

179 

180 if ("**" in directory and directory != "./**") or "**" in basename: 180 ↛ 181line 180 didn't jump to line 181, because the condition on line 180 was never true

181 raise ValueError( 

182 f'Cannot process pattern "{path_or_glob}" from {definition_source}: The double-star' 

183 ' glob ("**") is not supported in general. Only "**/<basename-glob>" supported.' 

184 ) 

185 

186 if basename == "*" and not glob.has_magic(directory): 

187 return DirectoryBasedMatch( 

188 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

189 substitution.substitute(directory, definition_source), 

190 path_type=path_type, 

191 ) 

192 elif directory == "./**" or not glob.has_magic(directory): 

193 basename_glob = substitution.substitute( 

194 basename, definition_source, escape_glob_characters=True 

195 ) 

196 if directory in (".", "./**"): 

197 return BasenameGlobMatch( 

198 basename_glob, 

199 path_type=path_type, 

200 recursive_match=True, 

201 ) 

202 return BasenameGlobMatch( 

203 basename_glob, 

204 only_when_in_directory=substitution.substitute( 

205 directory, definition_source 

206 ), 

207 path_type=path_type, 

208 recursive_match=False, 

209 ) 

210 

211 return GenericGlobImplementation(normalized_with_prefix, path_type=path_type) 

212 

213 

214def _match_file_type(path_type: PathType, path: VirtualPath) -> bool: 

215 if path_type == PathType.FILE and path.is_file: 

216 return True 

217 if path_type == PathType.DIRECTORY and path.is_dir: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 return True 

219 if path_type == PathType.SYMLINK and path.is_symlink: 219 ↛ 220line 219 didn't jump to line 220, because the condition on line 219 was never true

220 return True 

221 assert path_type in (PathType.FILE, PathType.DIRECTORY, PathType.SYMLINK) 

222 return False 

223 

224 

225class MatchAnything(MatchRule): 

226 def __init__(self) -> None: 

227 super().__init__(MatchRuleType.MATCH_ANYTHING) 

228 

229 def _full_pattern(self) -> str: 

230 return "**/*" 

231 

232 def finditer(self, fs_root: VP, *, ignore_paths=None) -> Iterable[VP]: 

233 if ignore_paths is not None: 

234 yield from (p for p in fs_root.all_paths() if not ignore_paths(p)) 

235 yield from fs_root.all_paths() 

236 

237 def describe_match_exact(self) -> str: 

238 return "**/* (Match anything)" 

239 

240 

241MATCH_ANYTHING: MatchRule = MatchAnything() 

242 

243del MatchAnything 

244 

245 

246class ExactFileSystemPath(MatchRule): 

247 __slots__ = "_path" 

248 

249 def __init__(self, path: str) -> None: 

250 super().__init__(MatchRuleType.EXACT_MATCH) 

251 self._path = path 

252 

253 def _full_pattern(self) -> str: 

254 return self._path 

255 

256 def finditer(self, fs_root: VP, *, ignore_paths=None) -> Iterable[VP]: 

257 p = _lookup_path(fs_root, self._path) 

258 if p is not None and (ignore_paths is None or not ignore_paths(p)): 

259 yield p 

260 

261 def describe_match_exact(self) -> str: 

262 return f"{self._path} (the exact path / no globbing)" 

263 

264 @property 

265 def path(self) -> str: 

266 return self._path 

267 

268 def shell_escape_pattern(self) -> str: 

269 return escape_shell(self._path.lstrip(".")) 

270 

271 

272class DirectoryBasedMatch(MatchRule): 

273 __slots__ = "_directory", "_path_type" 

274 

275 def __init__( 

276 self, 

277 rule_type: MatchRuleType, 

278 directory: str, 

279 path_type: Optional[PathType] = None, 

280 ) -> None: 

281 super().__init__(rule_type) 

282 self._directory = directory 

283 self._path_type = path_type 

284 assert rule_type in ( 

285 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

286 MatchRuleType.ANYTHING_BENEATH_DIR, 

287 ) 

288 assert not self._directory.endswith("/") 

289 

290 def _full_pattern(self) -> str: 

291 return self._directory 

292 

293 def finditer( 

294 self, 

295 fs_root: VP, 

296 *, 

297 ignore_paths: Optional[Callable[[VP], bool]] = None, 

298 ) -> Iterable[VP]: 

299 p = _lookup_path(fs_root, self._directory) 

300 if p is None or not p.is_dir: 

301 return 

302 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 path_iter = p.all_paths() 

304 else: 

305 path_iter = p.iterdir 

306 if ignore_paths is not None: 

307 path_iter = (p for p in path_iter if not ignore_paths(p)) 

308 if self._path_type is None: 

309 yield from path_iter 

310 else: 

311 yield from (m for m in path_iter if _match_file_type(self._path_type, m)) 

312 

313 def describe_match_short(self) -> str: 

314 path_type_match = ( 

315 "" 

316 if self._path_type is None 

317 else f" <only for path type {self._path_type.manifest_key}>" 

318 ) 

319 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

320 return f"{self._directory}/**/*{path_type_match}" 

321 return f"{self._directory}/*{path_type_match}" 

322 

323 def describe_match_exact(self) -> str: 

324 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

325 return f"{self._directory}/**/* (anything below the directory)" 

326 return f"{self.describe_match_short()} (anything directly in the directory)" 

327 

328 @property 

329 def path_type(self) -> Optional[PathType]: 

330 return self._path_type 

331 

332 @property 

333 def directory(self) -> str: 

334 return self._directory 

335 

336 def shell_escape_pattern(self) -> str: 

337 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 337 ↛ 338line 337 didn't jump to line 338, because the condition on line 337 was never true

338 return super().shell_escape_pattern() 

339 return escape_shell(self._directory.lstrip(".")) + "/*" 

340 

341 

342class BasenameGlobMatch(MatchRule): 

343 __slots__ = ( 

344 "_basename_glob", 

345 "_directory", 

346 "_matcher", 

347 "_path_type", 

348 "_recursive_match", 

349 "_escaped_basename_pattern", 

350 ) 

351 

352 def __init__( 

353 self, 

354 basename_glob: str, 

355 only_when_in_directory: Optional[str] = None, 

356 path_type: Optional[PathType] = None, 

357 recursive_match: Optional[bool] = None, # TODO: Can this just be = False (?) 

358 ) -> None: 

359 super().__init__(MatchRuleType.BASENAME_GLOB) 

360 self._basename_glob = basename_glob 

361 self._directory = only_when_in_directory 

362 self._path_type = path_type 

363 self._recursive_match = recursive_match 

364 if self._directory is None and not recursive_match: 364 ↛ 365line 364 didn't jump to line 365, because the condition on line 364 was never true

365 self._recursive_match = True 

366 assert self._directory is None or not self._directory.endswith("/") 

367 assert "/" not in basename_glob # Not a basename if it contains / 

368 assert "**" not in basename_glob # Also not a (true) basename if it has ** 

369 self._escaped_basename_pattern, self._matcher = _compile_basename_glob( 

370 basename_glob 

371 ) 

372 

373 def _full_pattern(self) -> str: 

374 if self._directory is not None: 

375 maybe_recursive = "**/" if self._recursive_match else "" 

376 return f"{self._directory}/{maybe_recursive}{self._basename_glob}" 

377 return self._basename_glob 

378 

379 def finditer(self, fs_root: VP, *, ignore_paths=None) -> Iterable[VP]: 

380 search_root = fs_root 

381 if self._directory is not None: 

382 p = _lookup_path(fs_root, self._directory) 

383 if p is None or not p.is_dir: 

384 return 

385 search_root = p 

386 path_iter = ( 

387 search_root.all_paths() if self._recursive_match else search_root.iterdir 

388 ) 

389 if ignore_paths is not None: 

390 path_iter = (p for p in path_iter if not ignore_paths(p)) 

391 if self._path_type is None: 

392 yield from (m for m in path_iter if self._matcher(m.name)) 

393 else: 

394 yield from ( 

395 m 

396 for m in path_iter 

397 if self._matcher(m.name) and _match_file_type(self._path_type, m) 

398 ) 

399 

400 def describe_match_short(self) -> str: 

401 path_type_match = ( 

402 "" 

403 if self._path_type is None 

404 else f" <only for path type {self._path_type.manifest_key}>" 

405 ) 

406 return ( 

407 self._full_pattern() 

408 if path_type_match == "" 

409 else f"{self._full_pattern()}{path_type_match}" 

410 ) 

411 

412 def describe_match_exact(self) -> str: 

413 if self._directory is not None: 

414 return f"{self.describe_match_short()} (glob / directly in the directory)" 

415 return f"{self.describe_match_short()} (basename match)" 

416 

417 def __eq__(self, other: object) -> bool: 

418 if not isinstance(other, BasenameGlobMatch): 

419 return NotImplemented 

420 return ( 

421 self._basename_glob == other._basename_glob 

422 and self._directory == other._directory 

423 and self._path_type == other._path_type 

424 and self._recursive_match == other._recursive_match 

425 ) 

426 

427 @property 

428 def path_type(self) -> Optional[PathType]: 

429 return self._path_type 

430 

431 @property 

432 def directory(self) -> Optional[str]: 

433 return self._directory 

434 

435 def shell_escape_pattern(self) -> str: 

436 if self._directory is None or self._escaped_basename_pattern is None: 

437 return super().shell_escape_pattern() 

438 return ( 

439 escape_shell(self._directory.lstrip(".")) 

440 + f"/{self._escaped_basename_pattern}" 

441 ) 

442 

443 

444class GenericGlobImplementation(MatchRule): 

445 __slots__ = "_glob_pattern", "_path_type", "_match_parts" 

446 

447 def __init__( 

448 self, 

449 glob_pattern: str, 

450 path_type: Optional[PathType] = None, 

451 ) -> None: 

452 super().__init__(MatchRuleType.GENERIC_GLOB) 

453 if glob_pattern.startswith("./"): 453 ↛ 455line 453 didn't jump to line 455, because the condition on line 453 was never false

454 glob_pattern = glob_pattern[2:] 

455 self._glob_pattern = glob_pattern 

456 self._path_type = path_type 

457 assert "**" not in glob_pattern # No recursive globs 

458 assert glob.has_magic( 

459 glob_pattern 

460 ) # If it has no glob, then it could have been an exact match 

461 assert ( 

462 "/" in glob_pattern 

463 ) # If it does not have a / then a BasenameGlob could have been used instead 

464 self._match_parts = self._compile_glob() 

465 

466 def _full_pattern(self) -> str: 

467 return self._glob_pattern 

468 

469 def finditer(self, fs_root: VP, *, ignore_paths=None) -> Iterable[VP]: 

470 search_history = [fs_root] 

471 for part in self._match_parts: 

472 next_layer = itertools.chain.from_iterable( 

473 _apply_match(m, part) for m in search_history 

474 ) 

475 # TODO: Figure out why we need to materialize next_layer into a list for this to work. 

476 search_history = list(next_layer) 

477 if not search_history: 

478 # While we have it as a list, we might as well have an "early exit". 

479 return 

480 

481 if self._path_type is None: 

482 if ignore_paths is None: 

483 yield from search_history 

484 else: 

485 yield from (p for p in search_history if not ignore_paths(p)) 

486 elif ignore_paths is None: 

487 yield from ( 

488 m for m in search_history if _match_file_type(self._path_type, m) 

489 ) 

490 else: 

491 yield from ( 

492 m 

493 for m in search_history 

494 if _match_file_type(self._path_type, m) and not ignore_paths(m) 

495 ) 

496 

497 def describe_match_short(self) -> str: 

498 path_type_match = ( 

499 "" 

500 if self._path_type is None 

501 else f" <only for path type {self._path_type.manifest_key}>" 

502 ) 

503 return ( 

504 self._full_pattern() 

505 if path_type_match == "" 

506 else f"{self._full_pattern()}{path_type_match}" 

507 ) 

508 

509 def describe_match_exact(self) -> str: 

510 return f"{self.describe_match_short()} (glob)" 

511 

512 def _compile_glob(self) -> Sequence[Union[Callable[[str], bool], str]]: 

513 assert self._glob_pattern.strip("/") == self._glob_pattern 

514 return [ 

515 _compile_basename_glob(part) if glob.has_magic(part) else part 

516 for part in self._glob_pattern.split("/") 

517 ] 

518 

519 def __eq__(self, other: object) -> bool: 

520 if not isinstance(other, GenericGlobImplementation): 

521 return NotImplemented 

522 return ( 

523 self._glob_pattern == other._glob_pattern 

524 and self._path_type == other._path_type 

525 ) 

526 

527 @property 

528 def path_type(self) -> Optional[PathType]: 

529 return self._path_type