Coverage for src/debputy/plugin/api/impl.py: 55%

753 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.util 

6import itertools 

7import json 

8import os 

9import re 

10import subprocess 

11import sys 

12from abc import ABC 

13from json import JSONDecodeError 

14from typing import ( 

15 Optional, 

16 Callable, 

17 Dict, 

18 Tuple, 

19 Iterable, 

20 Sequence, 

21 Type, 

22 List, 

23 Union, 

24 Set, 

25 Iterator, 

26 IO, 

27 Mapping, 

28 AbstractSet, 

29 cast, 

30 FrozenSet, 

31 Any, 

32 Literal, 

33) 

34 

35from debputy import DEBPUTY_DOC_ROOT_DIR 

36from debputy.exceptions import ( 

37 DebputySubstitutionError, 

38 PluginConflictError, 

39 PluginMetadataError, 

40 PluginBaseError, 

41 PluginInitializationError, 

42 PluginAPIViolationError, 

43 PluginNotFoundError, 

44) 

45from debputy.maintscript_snippet import ( 

46 STD_CONTROL_SCRIPTS, 

47 MaintscriptSnippetContainer, 

48 MaintscriptSnippet, 

49) 

50from debputy.manifest_parser.base_types import TypeMapping 

51from debputy.manifest_parser.exceptions import ManifestParseException 

52from debputy.manifest_parser.parser_data import ParserContextData 

53from debputy.manifest_parser.util import AttributePath 

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

55from debputy.plugin.api.impl_types import ( 

56 DebputyPluginMetadata, 

57 PackagerProvidedFileClassSpec, 

58 MetadataOrMaintscriptDetector, 

59 PluginProvidedTrigger, 

60 TTP, 

61 DIPHandler, 

62 PF, 

63 SF, 

64 DIPKWHandler, 

65 PluginProvidedManifestVariable, 

66 PluginProvidedPackageProcessor, 

67 PluginProvidedDiscardRule, 

68 AutomaticDiscardRuleExample, 

69 PPFFormatParam, 

70 ServiceManagerDetails, 

71 resolve_package_type_selectors, 

72 KnownPackagingFileInfo, 

73 PluginProvidedKnownPackagingFile, 

74 InstallPatternDHCompatRule, 

75 PluginProvidedTypeMapping, 

76) 

77from debputy.plugin.api.plugin_parser import ( 

78 PLUGIN_METADATA_PARSER, 

79 PluginJsonMetadata, 

80 PLUGIN_PPF_PARSER, 

81 PackagerProvidedFileJsonDescription, 

82 PLUGIN_MANIFEST_VARS_PARSER, 

83 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

84) 

85from debputy.plugin.api.spec import ( 

86 MaintscriptAccessor, 

87 Maintscript, 

88 DpkgTriggerType, 

89 BinaryCtrlAccessor, 

90 PackageProcessingContext, 

91 MetadataAutoDetector, 

92 PluginInitializationEntryPoint, 

93 DebputyPluginInitializer, 

94 PackageTypeSelector, 

95 FlushableSubstvars, 

96 ParserDocumentation, 

97 PackageProcessor, 

98 VirtualPath, 

99 ServiceIntegrator, 

100 ServiceDetector, 

101 ServiceRegistry, 

102 ServiceDefinition, 

103 DSD, 

104 ServiceUpgradeRule, 

105 PackagerProvidedFileReferenceDocumentation, 

106 packager_provided_file_reference_documentation, 

107 TypeMappingDocumentation, 

108) 

109from debputy.substitution import ( 

110 Substitution, 

111 VariableNameState, 

112 SUBST_VAR_RE, 

113 VariableContext, 

114) 

115from debputy.util import ( 

116 _normalize_path, 

117 POSTINST_DEFAULT_CONDITION, 

118 _error, 

119 print_command, 

120 _warn, 

121) 

122 

123PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

124 

125 

126def _validate_known_packaging_file_dh_compat_rules( 

127 dh_compat_rules: Optional[List[InstallPatternDHCompatRule]], 

128) -> None: 

129 max_compat = None 

130 if not dh_compat_rules: 

131 return 

132 dh_compat_rule: InstallPatternDHCompatRule 

133 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

134 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

135 compat = dh_compat_rule.get("starting_with_compat_level") 

136 

137 remaining = dh_compat_rule.keys() - { 

138 "after_debhelper_version", 

139 "starting_with_compat_level", 

140 } 

141 if not remaining: 

142 raise ValueError( 

143 f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?" 

144 ) 

145 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

146 raise ValueError( 

147 f"The dh compat-rule at index {idx} is not the last and is missing either" 

148 " before-debhelper-version or before-compat-level" 

149 ) 

150 if compat is not None and compat < 0: 

151 raise ValueError( 

152 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

153 f" for something that appeared when migrating from {compat} to {compat + 1}." 

154 ) 

155 

156 if max_compat is None: 

157 max_compat = compat 

158 elif compat is not None: 

159 if compat >= max_compat: 

160 raise ValueError( 

161 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

162 ) 

163 max_compat = compat 

164 

165 install_pattern = dh_compat_rule.get("install_pattern") 

166 if ( 

167 install_pattern is not None 

168 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

169 ): 

170 raise ValueError( 

171 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

172 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

173 ) 

174 

175 

176class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

177 __slots__ = ( 

178 "_plugin_metadata", 

179 "_feature_set", 

180 "_plugin_detector_ids", 

181 "_substitution", 

182 "_unloaders", 

183 "_load_started", 

184 ) 

185 

186 def __init__( 

187 self, 

188 plugin_metadata: DebputyPluginMetadata, 

189 feature_set: PluginProvidedFeatureSet, 

190 substitution: Substitution, 

191 ) -> None: 

192 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

193 self._feature_set = feature_set 

194 self._plugin_detector_ids: Set[str] = set() 

195 self._substitution = substitution 

196 self._unloaders: List[Callable[[], None]] = [] 

197 self._load_started = False 

198 

199 def unload_plugin(self) -> None: 

200 if self._load_started: 

201 for unloader in self._unloaders: 

202 unloader() 

203 del self._feature_set.plugin_data[self._plugin_name] 

204 

205 def load_plugin(self) -> None: 

206 metadata = self._plugin_metadata 

207 if metadata.plugin_name in self._feature_set.plugin_data: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

208 raise PluginConflictError( 

209 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

210 ) 

211 assert ( 

212 metadata.api_compat_version == 1 

213 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

214 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

215 self._load_started = True 

216 assert not metadata.is_initialized 

217 try: 

218 metadata.initialize_plugin(self) 

219 except Exception as e: 

220 initializer = metadata.plugin_initializer 

221 if ( 221 ↛ 226line 221 didn't jump to line 226

222 isinstance(e, TypeError) 

223 and initializer is not None 

224 and not callable(initializer) 

225 ): 

226 raise PluginMetadataError( 

227 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

228 f" callable (callable returns False). The specified entry point identifies" 

229 f' itself as "{initializer.__qualname__}".' 

230 ) from e 

231 elif isinstance(e, PluginBaseError): 231 ↛ 233line 231 didn't jump to line 233, because the condition on line 231 was never false

232 raise 

233 raise PluginInitializationError( 

234 f"Exception while attempting to load plugin {metadata.plugin_name}" 

235 ) from e 

236 

237 def packager_provided_file( 

238 self, 

239 stem: str, 

240 installed_path: str, 

241 *, 

242 default_mode: int = 0o0644, 

243 default_priority: Optional[int] = None, 

244 allow_name_segment: bool = True, 

245 allow_architecture_segment: bool = False, 

246 post_formatting_rewrite: Optional[Callable[[str], str]] = None, 

247 packageless_is_fallback_for_all_packages: bool = False, 

248 reservation_only: bool = False, 

249 format_callback: Optional[ 

250 Callable[[str, PPFFormatParam, VirtualPath], str] 

251 ] = None, 

252 reference_documentation: Optional[ 

253 PackagerProvidedFileReferenceDocumentation 

254 ] = None, 

255 ) -> None: 

256 packager_provided_files = self._feature_set.packager_provided_files 

257 existing = packager_provided_files.get(stem) 

258 

259 if format_callback is not None and self._plugin_name != "debputy": 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

260 raise ValueError( 

261 "Sorry; Using format_callback is a debputy-internal" 

262 f" API. Triggered by plugin {self._plugin_name}" 

263 ) 

264 

265 if installed_path.endswith("/"): 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

266 raise ValueError( 

267 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

268 f" Triggered by plugin {self._plugin_name}." 

269 ) 

270 

271 installed_path = _normalize_path(installed_path) 

272 

273 has_name_var = "{name}" in installed_path 

274 

275 if installed_path.startswith("./DEBIAN") or reservation_only: 

276 # Special-case, used for control files. 

277 if self._plugin_name != "debputy": 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true

278 raise ValueError( 

279 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

280 f" API. Triggered by plugin {self._plugin_name}" 

281 ) 

282 elif not has_name_var and "{owning_package}" not in installed_path: 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

283 raise ValueError( 

284 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

285 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

286 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

287 ) 

288 

289 if allow_name_segment and not has_name_var: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true

290 raise ValueError( 

291 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

292 " variable. Otherwise, the name segment will not work properly. Triggered by" 

293 f" plugin {self._plugin_name}" 

294 ) 

295 

296 if ( 296 ↛ 301line 296 didn't jump to line 301

297 default_priority is not None 

298 and "{priority}" not in installed_path 

299 and "{priority:02}" not in installed_path 

300 ): 

301 raise ValueError( 

302 'When default_priority is not None, the installed_path should have a "{priority}"' 

303 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

304 f" Triggered by plugin {self._plugin_name}" 

305 ) 

306 

307 if existing is not None: 

308 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 308 ↛ 315line 308 didn't jump to line 315

309 message = ( 

310 f'The stem "{stem}" is registered twice for packager provided files.' 

311 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

312 f" by {self._plugin_name}" 

313 ) 

314 else: 

315 message = ( 

316 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

317 f' stem "{stem}" twice for packager provided files.' 

318 ) 

319 raise PluginConflictError( 

320 message, existing.debputy_plugin_metadata, self._plugin_metadata 

321 ) 

322 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

323 self._plugin_metadata, 

324 stem, 

325 installed_path, 

326 default_mode=default_mode, 

327 default_priority=default_priority, 

328 allow_name_segment=allow_name_segment, 

329 allow_architecture_segment=allow_architecture_segment, 

330 post_formatting_rewrite=post_formatting_rewrite, 

331 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

332 reservation_only=reservation_only, 

333 formatting_callback=format_callback, 

334 reference_documentation=reference_documentation, 

335 ) 

336 

337 def _unload() -> None: 

338 del packager_provided_files[stem] 

339 

340 self._unloaders.append(_unload) 

341 

342 def metadata_or_maintscript_detector( 

343 self, 

344 auto_detector_id: str, 

345 auto_detector: MetadataAutoDetector, 

346 *, 

347 package_type: PackageTypeSelector = "deb", 

348 ) -> None: 

349 if auto_detector_id in self._plugin_detector_ids: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

350 raise ValueError( 

351 f"The plugin {self._plugin_name} tried to register" 

352 f' "{auto_detector_id}" twice' 

353 ) 

354 self._plugin_detector_ids.add(auto_detector_id) 

355 all_detectors = self._feature_set.metadata_maintscript_detectors 

356 if self._plugin_name not in all_detectors: 

357 all_detectors[self._plugin_name] = [] 

358 package_types = resolve_package_type_selectors(package_type) 

359 all_detectors[self._plugin_name].append( 

360 MetadataOrMaintscriptDetector( 

361 detector_id=auto_detector_id, 

362 detector=auto_detector, 

363 plugin_metadata=self._plugin_metadata, 

364 applies_to_package_types=package_types, 

365 enabled=True, 

366 ) 

367 ) 

368 

369 def _unload() -> None: 

370 if self._plugin_name in all_detectors: 

371 del all_detectors[self._plugin_name] 

372 

373 self._unloaders.append(_unload) 

374 

375 def document_builtin_variable( 

376 self, 

377 variable_name: str, 

378 variable_reference_documentation: str, 

379 *, 

380 is_context_specific: bool = False, 

381 is_for_special_case: bool = False, 

382 ) -> None: 

383 manifest_variables = self._feature_set.manifest_variables 

384 self._restricted_api() 

385 state = self._substitution.variable_state(variable_name) 

386 if state == VariableNameState.UNDEFINED: 386 ↛ 387line 386 didn't jump to line 387, because the condition on line 386 was never true

387 raise ValueError( 

388 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

389 f" but it is not known to be a variable" 

390 ) 

391 

392 assert variable_name not in manifest_variables 

393 

394 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

395 self._plugin_metadata, 

396 variable_name, 

397 None, 

398 is_context_specific_variable=is_context_specific, 

399 variable_reference_documentation=variable_reference_documentation, 

400 is_documentation_placeholder=True, 

401 is_for_special_case=is_for_special_case, 

402 ) 

403 

404 def _unload() -> None: 

405 del manifest_variables[variable_name] 

406 

407 self._unloaders.append(_unload) 

408 

409 def manifest_variable_provider( 

410 self, 

411 provider: Callable[[VariableContext], Mapping[str, str]], 

412 variables: Union[Sequence[str], Mapping[str, Optional[str]]], 

413 ) -> None: 

414 self._restricted_api() 

415 cached_provider = functools.lru_cache(None)(provider) 

416 permitted_variables = frozenset(variables) 

417 variables_iter: Iterable[Tuple[str, Optional[str]]] 

418 if not isinstance(variables, Mapping): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true

419 variables_iter = zip(variables, itertools.repeat(None)) 

420 else: 

421 variables_iter = variables.items() 

422 

423 checked_vars = False 

424 manifest_variables = self._feature_set.manifest_variables 

425 plugin_name = self._plugin_name 

426 

427 def _value_resolver_generator( 

428 variable_name: str, 

429 ) -> Callable[[VariableContext], str]: 

430 def _value_resolver(variable_context: VariableContext) -> str: 

431 res = cached_provider(variable_context) 

432 nonlocal checked_vars 

433 if not checked_vars: 433 ↛ 444line 433 didn't jump to line 444, because the condition on line 433 was never false

434 if permitted_variables != res.keys(): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

435 expected = ", ".join(sorted(permitted_variables)) 

436 actual = ", ".join(sorted(res)) 

437 raise PluginAPIViolationError( 

438 f"The plugin {plugin_name} claimed to provide" 

439 f" the following variables {expected}," 

440 f" but when resolving the variables, the plugin provided" 

441 f" {actual}. These two lists should have been the same." 

442 ) 

443 checked_vars = False 

444 return res[variable_name] 

445 

446 return _value_resolver 

447 

448 for varname, vardoc in variables_iter: 

449 self._check_variable_name(varname) 

450 manifest_variables[varname] = PluginProvidedManifestVariable( 

451 self._plugin_metadata, 

452 varname, 

453 _value_resolver_generator(varname), 

454 is_context_specific_variable=False, 

455 variable_reference_documentation=vardoc, 

456 ) 

457 

458 def _unload() -> None: 

459 raise PluginInitializationError( 

460 "Cannot unload manifest_variable_provider (not implemented)" 

461 ) 

462 

463 self._unloaders.append(_unload) 

464 

465 def _check_variable_name(self, variable_name: str) -> None: 

466 manifest_variables = self._feature_set.manifest_variables 

467 existing = manifest_variables.get(variable_name) 

468 

469 if existing is not None: 

470 if existing.plugin_metadata.plugin_name == self._plugin_name: 470 ↛ 476line 470 didn't jump to line 476

471 message = ( 

472 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

473 f' manifest variable "{variable_name}" twice.' 

474 ) 

475 else: 

476 message = ( 

477 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

478 f" both tried to provide the manifest variable {variable_name}" 

479 ) 

480 raise PluginConflictError( 

481 message, existing.plugin_metadata, self._plugin_metadata 

482 ) 

483 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

484 raise ValueError( 

485 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

486 f" which is not a valid variable name" 

487 ) 

488 

489 namespace = "" 

490 variable_basename = variable_name 

491 if ":" in variable_name: 

492 namespace, variable_basename = variable_name.rsplit(":", 1) 

493 assert namespace != "" 

494 assert variable_name != "" 

495 

496 if namespace != "" and namespace not in ("token", "path"): 

497 raise ValueError( 

498 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

499 f" which is in the reserved namespace {namespace}" 

500 ) 

501 

502 variable_name_upper = variable_name.upper() 

503 if ( 

504 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

505 or variable_basename.startswith("_") 

506 or variable_basename.upper().startswith("DEBPUTY") 

507 ) and self._plugin_name != "debputy": 

508 raise ValueError( 

509 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

510 f" which is a variable name reserved by debputy" 

511 ) 

512 

513 state = self._substitution.variable_state(variable_name) 

514 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

515 raise ValueError( 

516 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

517 f" which would shadow a built-in variable" 

518 ) 

519 

520 def package_processor( 

521 self, 

522 processor_id: str, 

523 processor: PackageProcessor, 

524 *, 

525 depends_on_processor: Iterable[str] = tuple(), 

526 package_type: PackageTypeSelector = "deb", 

527 ) -> None: 

528 self._restricted_api(allowed_plugins={"lua"}) 

529 package_processors = self._feature_set.all_package_processors 

530 dependencies = set() 

531 processor_key = (self._plugin_name, processor_id) 

532 

533 if processor_key in package_processors: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

534 raise PluginConflictError( 

535 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

536 self._plugin_metadata, 

537 self._plugin_metadata, 

538 ) 

539 

540 for depends_ref in depends_on_processor: 

541 if isinstance(depends_ref, str): 541 ↛ 555line 541 didn't jump to line 555, because the condition on line 541 was never false

542 if (self._plugin_name, depends_ref) in package_processors: 542 ↛ 544line 542 didn't jump to line 544, because the condition on line 542 was never false

543 depends_key = (self._plugin_name, depends_ref) 

544 elif ("debputy", depends_ref) in package_processors: 

545 depends_key = ("debputy", depends_ref) 

546 else: 

547 raise ValueError( 

548 f'Could not resolve dependency "{depends_ref}" for' 

549 f' "{processor_id}". It was not provided by the plugin itself' 

550 f" ({self._plugin_name}) nor debputy." 

551 ) 

552 else: 

553 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

554 # via the direct dependencies. 

555 assert False 

556 

557 existing_processor = package_processors.get(depends_key) 

558 if existing_processor is None: 558 ↛ 561line 558 didn't jump to line 561, because the condition on line 558 was never true

559 # We currently require the processor to be declared already. If this ever changes, 

560 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

561 dplugin_name, dprocessor_name = depends_key 

562 available_processors = ", ".join( 

563 n for p, n in package_processors.keys() if p == dplugin_name 

564 ) 

565 raise ValueError( 

566 f"The plugin {dplugin_name} does not provide a processor called" 

567 f" {dprocessor_name}. Available processors for that plugin are:" 

568 f" {available_processors}" 

569 ) 

570 dependencies.add(depends_key) 

571 

572 package_processors[processor_key] = PluginProvidedPackageProcessor( 

573 processor_id, 

574 resolve_package_type_selectors(package_type), 

575 processor, 

576 frozenset(dependencies), 

577 self._plugin_metadata, 

578 ) 

579 

580 def _unload() -> None: 

581 del package_processors[processor_key] 

582 

583 self._unloaders.append(_unload) 

584 

585 def automatic_discard_rule( 

586 self, 

587 name: str, 

588 should_discard: Callable[[VirtualPath], bool], 

589 *, 

590 rule_reference_documentation: Optional[str] = None, 

591 examples: Union[ 

592 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample] 

593 ] = tuple(), 

594 ) -> None: 

595 """Register an automatic discard rule 

596 

597 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

598 If any discard rule concludes that a path should not be installed, then the path is not installed. 

599 In the case where the discard path is a: 

600 

601 * directory: Then the entire directory is excluded along with anything beneath it. 

602 * symlink: Then the symlink itself (but not its target) is excluded. 

603 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

604 

605 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

606 

607 Automatic discard rules should be written with the assumption that directories will be tested 

608 before their content *when it is relevant* for the discard rule to examine whether the directory 

609 can be excluded. 

610 

611 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

612 without any globs. As example: 

613 

614 installations: 

615 - install: 

616 sources: 

617 - usr/lib/libfoo.la # <-- This path is always installed 

618 # (Discard rules are never asked in this case) 

619 # 

620 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

621 # Though, they will not examine `libfoo.la` as it has already been installed 

622 # 

623 # Note: usr/lib itself is never tested in this case (it is assumed to be 

624 # explicitly requested). But any subdir of usr/lib will be examined. 

625 

626 When an automatic discard rule is evaluated, it can see the source path currently being considered 

627 for installation. While it can look at "surrounding" context (like parent directory), it will not 

628 know whether those paths are to be installed or will be installed. 

629 

630 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

631 metacharacters and spaces. 

632 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

633 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

634 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

635 A source path will either be from the root of the source tree or the root of a search directory such as 

636 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

637 evaluated. 

638 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

639 looks up this automatic discard rule. 

640 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

641 generate the examples. 

642 

643 """ 

644 self._restricted_api() 

645 auto_discard_rules = self._feature_set.auto_discard_rules 

646 existing = auto_discard_rules.get(name) 

647 if existing is not None: 647 ↛ 648line 647 didn't jump to line 648, because the condition on line 647 was never true

648 if existing.plugin_metadata.plugin_name == self._plugin_name: 

649 message = ( 

650 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

651 f' automatic discard rule "{name}" twice.' 

652 ) 

653 else: 

654 message = ( 

655 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

656 f" both tried to provide the automatic discard rule {name}" 

657 ) 

658 raise PluginConflictError( 

659 message, existing.plugin_metadata, self._plugin_metadata 

660 ) 

661 examples = ( 

662 (examples,) 

663 if isinstance(examples, AutomaticDiscardRuleExample) 

664 else tuple(examples) 

665 ) 

666 auto_discard_rules[name] = PluginProvidedDiscardRule( 

667 name, 

668 self._plugin_metadata, 

669 should_discard, 

670 rule_reference_documentation, 

671 examples, 

672 ) 

673 

674 def _unload() -> None: 

675 del auto_discard_rules[name] 

676 

677 self._unloaders.append(_unload) 

678 

679 def service_provider( 

680 self, 

681 service_manager: str, 

682 detector: ServiceDetector, 

683 integrator: ServiceIntegrator, 

684 ) -> None: 

685 self._restricted_api() 

686 service_managers = self._feature_set.service_managers 

687 existing = service_managers.get(service_manager) 

688 if existing is not None: 688 ↛ 689line 688 didn't jump to line 689, because the condition on line 688 was never true

689 if existing.plugin_metadata.plugin_name == self._plugin_name: 

690 message = ( 

691 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

692 f' service manager "{service_manager}" twice.' 

693 ) 

694 else: 

695 message = ( 

696 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

697 f' both tried to provide the service manager "{service_manager}"' 

698 ) 

699 raise PluginConflictError( 

700 message, existing.plugin_metadata, self._plugin_metadata 

701 ) 

702 service_managers[service_manager] = ServiceManagerDetails( 

703 service_manager, 

704 detector, 

705 integrator, 

706 self._plugin_metadata, 

707 ) 

708 

709 def _unload() -> None: 

710 del service_managers[service_manager] 

711 

712 self._unloaders.append(_unload) 

713 

714 def manifest_variable( 

715 self, 

716 variable_name: str, 

717 value: str, 

718 variable_reference_documentation: Optional[str] = None, 

719 ) -> None: 

720 self._check_variable_name(variable_name) 

721 manifest_variables = self._feature_set.manifest_variables 

722 try: 

723 resolved_value = self._substitution.substitute( 

724 value, "Plugin initialization" 

725 ) 

726 depends_on_variable = resolved_value != value 

727 except DebputySubstitutionError: 

728 depends_on_variable = True 

729 if depends_on_variable: 

730 raise ValueError( 

731 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

732 f" This value depends on another variable, which is not supported. This restriction may be" 

733 f" lifted in the future." 

734 ) 

735 

736 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

737 self._plugin_metadata, 

738 variable_name, 

739 value, 

740 is_context_specific_variable=False, 

741 variable_reference_documentation=variable_reference_documentation, 

742 ) 

743 

744 def _unload() -> None: 

745 # We need to check it was never resolved 

746 raise PluginInitializationError( 

747 "Cannot unload manifest_variable (not implemented)" 

748 ) 

749 

750 self._unloaders.append(_unload) 

751 

752 @property 

753 def _plugin_name(self) -> str: 

754 return self._plugin_metadata.plugin_name 

755 

756 def provide_manifest_keyword( 

757 self, 

758 rule_type: TTP, 

759 rule_name: Union[str, List[str]], 

760 handler: DIPKWHandler, 

761 *, 

762 inline_reference_documentation: Optional[ParserDocumentation] = None, 

763 ) -> None: 

764 self._restricted_api() 

765 parser_generator = self._feature_set.manifest_parser_generator 

766 if rule_type not in parser_generator.dispatchable_table_parsers: 766 ↛ 767line 766 didn't jump to line 767, because the condition on line 766 was never true

767 types = ", ".join( 

768 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

769 ) 

770 raise ValueError( 

771 f"The rule_type was not a supported type. It must be one of {types}" 

772 ) 

773 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

774 dispatching_parser.register_keyword( 

775 rule_name, 

776 handler, 

777 self._plugin_metadata, 

778 inline_reference_documentation=inline_reference_documentation, 

779 ) 

780 

781 def _unload() -> None: 

782 raise PluginInitializationError( 

783 "Cannot unload provide_manifest_keyword (not implemented)" 

784 ) 

785 

786 self._unloaders.append(_unload) 

787 

788 def pluggable_object_parser( 

789 self, 

790 rule_type: str, 

791 rule_name: str, 

792 *, 

793 object_parser_key: Optional[str] = None, 

794 on_end_parse_step: Optional[ 

795 Callable[ 

796 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData], 

797 None, 

798 ] 

799 ] = None, 

800 nested_in_package_context: bool = False, 

801 ) -> None: 

802 self._restricted_api() 

803 if object_parser_key is None: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true

804 object_parser_key = rule_name 

805 

806 parser_generator = self._feature_set.manifest_parser_generator 

807 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

808 if rule_type not in dispatchable_object_parsers: 808 ↛ 809line 808 didn't jump to line 809, because the condition on line 808 was never true

809 types = ", ".join(sorted(dispatchable_object_parsers)) 

810 raise ValueError( 

811 f"The rule_type was not a supported type. It must be one of {types}" 

812 ) 

813 if object_parser_key not in dispatchable_object_parsers: 813 ↛ 814line 813 didn't jump to line 814, because the condition on line 813 was never true

814 types = ", ".join(sorted(dispatchable_object_parsers)) 

815 raise ValueError( 

816 f"The object_parser_key was not a supported type. It must be one of {types}" 

817 ) 

818 parent_dispatcher = dispatchable_object_parsers[rule_type] 

819 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

820 parent_dispatcher.register_child_parser( 

821 rule_name, 

822 child_dispatcher, 

823 self._plugin_metadata, 

824 on_end_parse_step=on_end_parse_step, 

825 nested_in_package_context=nested_in_package_context, 

826 ) 

827 

828 def _unload() -> None: 

829 raise PluginInitializationError( 

830 "Cannot unload pluggable_object_parser (not implemented)" 

831 ) 

832 

833 self._unloaders.append(_unload) 

834 

835 def pluggable_manifest_rule( 

836 self, 

837 rule_type: Union[TTP, str], 

838 rule_name: Union[str, List[str]], 

839 parsed_format: Type[PF], 

840 handler: DIPHandler, 

841 *, 

842 source_format: Optional[SF] = None, 

843 inline_reference_documentation: Optional[ParserDocumentation] = None, 

844 ) -> None: 

845 self._restricted_api() 

846 feature_set = self._feature_set 

847 parser_generator = feature_set.manifest_parser_generator 

848 if isinstance(rule_type, str): 

849 if rule_type not in parser_generator.dispatchable_object_parsers: 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true

850 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

851 raise ValueError( 

852 f"The rule_type was not a supported type. It must be one of {types}" 

853 ) 

854 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

855 else: 

856 if rule_type not in parser_generator.dispatchable_table_parsers: 856 ↛ 857line 856 didn't jump to line 857, because the condition on line 856 was never true

857 types = ", ".join( 

858 sorted( 

859 x.__name__ for x in parser_generator.dispatchable_table_parsers 

860 ) 

861 ) 

862 raise ValueError( 

863 f"The rule_type was not a supported type. It must be one of {types}" 

864 ) 

865 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

866 

867 parser = feature_set.manifest_parser_generator.generate_parser( 

868 parsed_format, 

869 source_content=source_format, 

870 inline_reference_documentation=inline_reference_documentation, 

871 ) 

872 dispatching_parser.register_parser( 

873 rule_name, 

874 parser, 

875 handler, 

876 self._plugin_metadata, 

877 ) 

878 

879 def _unload() -> None: 

880 raise PluginInitializationError( 

881 "Cannot unload pluggable_manifest_rule (not implemented)" 

882 ) 

883 

884 self._unloaders.append(_unload) 

885 

886 def known_packaging_files( 

887 self, 

888 packaging_file_details: KnownPackagingFileInfo, 

889 ) -> None: 

890 known_packaging_files = self._feature_set.known_packaging_files 

891 detection_method = packaging_file_details.get( 

892 "detection_method", cast("Literal['path']", "path") 

893 ) 

894 path = packaging_file_details.get("path") 

895 dhpkgfile = packaging_file_details.get("pkgfile") 

896 

897 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy() 

898 

899 if detection_method == "path": 

900 if dhpkgfile is not None: 

901 raise ValueError( 

902 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

903 ) 

904 if path != _normalize_path(path, with_prefix=False): 

905 raise ValueError( 

906 f"The path for known packaging files must be normalized. Please replace" 

907 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

908 ) 

909 detection_value = path 

910 else: 

911 assert detection_method == "dh.pkgfile" 

912 if path is not None: 

913 raise ValueError( 

914 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

915 ) 

916 if "/" in dhpkgfile: 

917 raise ValueError( 

918 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

919 ) 

920 detection_value = dhpkgfile 

921 key = f"{detection_method}::{detection_value}" 

922 existing = known_packaging_files.get(key) 

923 if existing is not None: 

924 if existing.plugin_metadata.plugin_name != self._plugin_name: 

925 message = ( 

926 f'The key "{key}" is registered twice for known packaging files.' 

927 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

928 ) 

929 else: 

930 message = ( 

931 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

932 f' key "{key}" twice for known packaging files.' 

933 ) 

934 raise PluginConflictError( 

935 message, existing.plugin_metadata, self._plugin_metadata 

936 ) 

937 _validate_known_packaging_file_dh_compat_rules( 

938 packaging_file_details.get("dh_compat_rules") 

939 ) 

940 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

941 packaging_file_details, 

942 detection_method, 

943 detection_value, 

944 self._plugin_metadata, 

945 ) 

946 

947 def _unload() -> None: 

948 del known_packaging_files[key] 

949 

950 self._unloaders.append(_unload) 

951 

952 def register_mapped_type( 

953 self, 

954 type_mapping: TypeMapping, 

955 *, 

956 reference_documentation: Optional[TypeMappingDocumentation] = None, 

957 ) -> None: 

958 self._restricted_api() 

959 target_type = type_mapping.target_type 

960 mapped_types = self._feature_set.mapped_types 

961 existing = mapped_types.get(target_type) 

962 if existing is not None: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true

963 if existing.plugin_metadata.plugin_name != self._plugin_name: 

964 message = ( 

965 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

966 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

967 ) 

968 else: 

969 message = ( 

970 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

971 f' key "{target_type.__name__}" twice for known packaging files.' 

972 ) 

973 raise PluginConflictError( 

974 message, existing.plugin_metadata, self._plugin_metadata 

975 ) 

976 parser_generator = self._feature_set.manifest_parser_generator 

977 mapped_types[target_type] = PluginProvidedTypeMapping( 

978 type_mapping, reference_documentation, self._plugin_metadata 

979 ) 

980 parser_generator.register_mapped_type(type_mapping) 

981 

982 def _restricted_api( 

983 self, 

984 *, 

985 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(), 

986 ) -> None: 

987 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true

988 raise PluginAPIViolationError( 

989 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

990 " If you are the maintainer of this plugin and want access to this" 

991 " API, please file a feature request to make this public." 

992 " (The API is currently private as it is unstable.)" 

993 ) 

994 

995 

996class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

997 __slots__ = () 

998 

999 def _append_script( 

1000 self, 

1001 caller_name: str, 

1002 maintscript: Maintscript, 

1003 full_script: str, 

1004 /, 

1005 perform_substitution: bool = True, 

1006 ) -> None: 

1007 raise NotImplementedError 

1008 

1009 @classmethod 

1010 def _apply_condition_to_script( 

1011 cls, 

1012 condition: str, 

1013 run_snippet: str, 

1014 /, 

1015 indent: Optional[bool] = None, 

1016 ) -> str: 

1017 if indent is None: 

1018 # We auto-determine this based on heredocs currently 

1019 indent = "<<" not in run_snippet 

1020 

1021 if indent: 

1022 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1023 if not run_snippet.endswith("\n"): 

1024 run_snippet += "\n" 

1025 condition_line = f"if {condition}; then\n" 

1026 end_line = "fi\n" 

1027 return "".join((condition_line, run_snippet, end_line)) 

1028 

1029 def on_configure( 

1030 self, 

1031 run_snippet: str, 

1032 /, 

1033 indent: Optional[bool] = None, 

1034 perform_substitution: bool = True, 

1035 skip_on_rollback: bool = False, 

1036 ) -> None: 

1037 condition = POSTINST_DEFAULT_CONDITION 

1038 if skip_on_rollback: 1038 ↛ 1039line 1038 didn't jump to line 1039, because the condition on line 1038 was never true

1039 condition = '[ "$1" = "configure" ]' 

1040 return self._append_script( 

1041 "on_configure", 

1042 "postinst", 

1043 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1044 perform_substitution=perform_substitution, 

1045 ) 

1046 

1047 def on_initial_install( 

1048 self, 

1049 run_snippet: str, 

1050 /, 

1051 indent: Optional[bool] = None, 

1052 perform_substitution: bool = True, 

1053 ) -> None: 

1054 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1055 return self._append_script( 

1056 "on_initial_install", 

1057 "postinst", 

1058 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1059 perform_substitution=perform_substitution, 

1060 ) 

1061 

1062 def on_upgrade( 

1063 self, 

1064 run_snippet: str, 

1065 /, 

1066 indent: Optional[bool] = None, 

1067 perform_substitution: bool = True, 

1068 ) -> None: 

1069 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1070 return self._append_script( 

1071 "on_upgrade", 

1072 "postinst", 

1073 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1074 perform_substitution=perform_substitution, 

1075 ) 

1076 

1077 def on_upgrade_from( 

1078 self, 

1079 version: str, 

1080 run_snippet: str, 

1081 /, 

1082 indent: Optional[bool] = None, 

1083 perform_substitution: bool = True, 

1084 ) -> None: 

1085 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1086 return self._append_script( 

1087 "on_upgrade_from", 

1088 "postinst", 

1089 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1090 perform_substitution=perform_substitution, 

1091 ) 

1092 

1093 def on_before_removal( 

1094 self, 

1095 run_snippet: str, 

1096 /, 

1097 indent: Optional[bool] = None, 

1098 perform_substitution: bool = True, 

1099 ) -> None: 

1100 condition = '[ "$1" = "remove" ]' 

1101 return self._append_script( 

1102 "on_before_removal", 

1103 "prerm", 

1104 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1105 perform_substitution=perform_substitution, 

1106 ) 

1107 

1108 def on_removed( 

1109 self, 

1110 run_snippet: str, 

1111 /, 

1112 indent: Optional[bool] = None, 

1113 perform_substitution: bool = True, 

1114 ) -> None: 

1115 condition = '[ "$1" = "remove" ]' 

1116 return self._append_script( 

1117 "on_removed", 

1118 "postrm", 

1119 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1120 perform_substitution=perform_substitution, 

1121 ) 

1122 

1123 def on_purge( 

1124 self, 

1125 run_snippet: str, 

1126 /, 

1127 indent: Optional[bool] = None, 

1128 perform_substitution: bool = True, 

1129 ) -> None: 

1130 condition = '[ "$1" = "purge" ]' 

1131 return self._append_script( 

1132 "on_purge", 

1133 "postrm", 

1134 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1135 perform_substitution=perform_substitution, 

1136 ) 

1137 

1138 def unconditionally_in_script( 

1139 self, 

1140 maintscript: Maintscript, 

1141 run_snippet: str, 

1142 /, 

1143 perform_substitution: bool = True, 

1144 ) -> None: 

1145 if maintscript not in STD_CONTROL_SCRIPTS: 1145 ↛ 1146line 1145 didn't jump to line 1146, because the condition on line 1145 was never true

1146 raise ValueError( 

1147 f'Unknown script "{maintscript}". Should have been one of:' 

1148 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1149 ) 

1150 return self._append_script( 

1151 "unconditionally_in_script", 

1152 maintscript, 

1153 run_snippet, 

1154 perform_substitution=perform_substitution, 

1155 ) 

1156 

1157 

1158class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1159 __slots__ = ( 

1160 "_plugin_metadata", 

1161 "_maintscript_snippets", 

1162 "_plugin_source_id", 

1163 "_package_substitution", 

1164 "_default_snippet_order", 

1165 ) 

1166 

1167 def __init__( 

1168 self, 

1169 plugin_metadata: DebputyPluginMetadata, 

1170 plugin_source_id: str, 

1171 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1172 package_substitution: Substitution, 

1173 *, 

1174 default_snippet_order: Optional[Literal["service"]] = None, 

1175 ): 

1176 self._plugin_metadata = plugin_metadata 

1177 self._plugin_source_id = plugin_source_id 

1178 self._maintscript_snippets = maintscript_snippets 

1179 self._package_substitution = package_substitution 

1180 self._default_snippet_order = default_snippet_order 

1181 

1182 def _append_script( 

1183 self, 

1184 caller_name: str, 

1185 maintscript: Maintscript, 

1186 full_script: str, 

1187 /, 

1188 perform_substitution: bool = True, 

1189 ) -> None: 

1190 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1191 if perform_substitution: 

1192 full_script = self._package_substitution.substitute(full_script, def_source) 

1193 

1194 snippet = MaintscriptSnippet( 

1195 snippet=full_script, 

1196 definition_source=def_source, 

1197 snippet_order=self._default_snippet_order, 

1198 ) 

1199 self._maintscript_snippets[maintscript].append(snippet) 

1200 

1201 

1202class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1203 __slots__ = ( 

1204 "_plugin_metadata", 

1205 "_plugin_source_id", 

1206 "_package_metadata_context", 

1207 "_triggers", 

1208 "_substvars", 

1209 "_maintscript", 

1210 "_shlibs_details", 

1211 ) 

1212 

1213 def __init__( 

1214 self, 

1215 plugin_metadata: DebputyPluginMetadata, 

1216 plugin_source_id: str, 

1217 package_metadata_context: PackageProcessingContext, 

1218 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1219 substvars: FlushableSubstvars, 

1220 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

1221 ) -> None: 

1222 self._plugin_metadata = plugin_metadata 

1223 self._plugin_source_id = plugin_source_id 

1224 self._package_metadata_context = package_metadata_context 

1225 self._triggers = triggers 

1226 self._substvars = substvars 

1227 self._maintscript: Optional[MaintscriptAccessor] = None 

1228 self._shlibs_details = shlibs_details 

1229 

1230 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1231 raise NotImplementedError 

1232 

1233 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1234 """Register a declarative dpkg level trigger 

1235 

1236 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1237 

1238 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1239 """ 

1240 key = (trigger_type, trigger_target) 

1241 if key in self._triggers: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true

1242 return 

1243 self._triggers[key] = PluginProvidedTrigger( 

1244 dpkg_trigger_type=trigger_type, 

1245 dpkg_trigger_target=trigger_target, 

1246 provider=self._plugin_metadata, 

1247 provider_source_id=self._plugin_source_id, 

1248 ) 

1249 

1250 @property 

1251 def maintscript(self) -> MaintscriptAccessor: 

1252 maintscript = self._maintscript 

1253 if maintscript is None: 

1254 maintscript = self._create_maintscript_accessor() 

1255 self._maintscript = maintscript 

1256 return maintscript 

1257 

1258 @property 

1259 def substvars(self) -> FlushableSubstvars: 

1260 return self._substvars 

1261 

1262 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1263 binary_package = self._package_metadata_context.binary_package 

1264 with self.substvars.flush() as substvars_file: 

1265 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1266 if binary_package.is_udeb: 

1267 dpkg_cmd.append("-tudeb") 

1268 if binary_package.is_essential: 1268 ↛ 1269line 1268 didn't jump to line 1269, because the condition on line 1268 was never true

1269 dpkg_cmd.append("-dPre-Depends") 

1270 shlibs_local, shlib_dirs = self._shlibs_details 

1271 if shlibs_local is not None: 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true

1272 dpkg_cmd.append(f"-L{shlibs_local}") 

1273 if shlib_dirs: 1273 ↛ 1274line 1273 didn't jump to line 1274, because the condition on line 1273 was never true

1274 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1275 dpkg_cmd.extend(p.fs_path for p in paths) 

1276 print_command(*dpkg_cmd) 

1277 try: 

1278 subprocess.check_call(dpkg_cmd) 

1279 except subprocess.CalledProcessError: 

1280 _error( 

1281 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1282 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1283 ) 

1284 

1285 

1286class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1287 __slots__ = ( 

1288 "_maintscript", 

1289 "_maintscript_snippets", 

1290 "_package_substitution", 

1291 ) 

1292 

1293 def __init__( 

1294 self, 

1295 plugin_metadata: DebputyPluginMetadata, 

1296 plugin_source_id: str, 

1297 package_metadata_context: PackageProcessingContext, 

1298 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1299 substvars: FlushableSubstvars, 

1300 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1301 package_substitution: Substitution, 

1302 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

1303 *, 

1304 default_snippet_order: Optional[Literal["service"]] = None, 

1305 ) -> None: 

1306 super().__init__( 

1307 plugin_metadata, 

1308 plugin_source_id, 

1309 package_metadata_context, 

1310 triggers, 

1311 substvars, 

1312 shlibs_details, 

1313 ) 

1314 self._maintscript_snippets = maintscript_snippets 

1315 self._package_substitution = package_substitution 

1316 self._maintscript = MaintscriptAccessorProvider( 

1317 plugin_metadata, 

1318 plugin_source_id, 

1319 maintscript_snippets, 

1320 package_substitution, 

1321 default_snippet_order=default_snippet_order, 

1322 ) 

1323 

1324 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1325 return MaintscriptAccessorProvider( 

1326 self._plugin_metadata, 

1327 self._plugin_source_id, 

1328 self._maintscript_snippets, 

1329 self._package_substitution, 

1330 ) 

1331 

1332 

1333class BinaryCtrlAccessorProviderCreator: 

1334 def __init__( 

1335 self, 

1336 package_metadata_context: PackageProcessingContext, 

1337 substvars: FlushableSubstvars, 

1338 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1339 substitution: Substitution, 

1340 ) -> None: 

1341 self._package_metadata_context = package_metadata_context 

1342 self._substvars = substvars 

1343 self._maintscript_snippets = maintscript_snippets 

1344 self._substitution = substitution 

1345 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1346 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None 

1347 

1348 def for_plugin( 

1349 self, 

1350 plugin_metadata: DebputyPluginMetadata, 

1351 plugin_source_id: str, 

1352 *, 

1353 default_snippet_order: Optional[Literal["service"]] = None, 

1354 ) -> BinaryCtrlAccessor: 

1355 return BinaryCtrlAccessorProvider( 

1356 plugin_metadata, 

1357 plugin_source_id, 

1358 self._package_metadata_context, 

1359 self._triggers, 

1360 self._substvars, 

1361 self._maintscript_snippets, 

1362 self._substitution, 

1363 self.shlibs_details, 

1364 default_snippet_order=default_snippet_order, 

1365 ) 

1366 

1367 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1368 return self._triggers.values() 

1369 

1370 

1371def plugin_metadata_for_debputys_own_plugin( 

1372 loader: Optional[PluginInitializationEntryPoint] = None, 

1373) -> DebputyPluginMetadata: 

1374 if loader is None: 

1375 from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

1376 

1377 loader = initialize_debputy_features 

1378 return DebputyPluginMetadata( 

1379 plugin_name="debputy", 

1380 api_compat_version=1, 

1381 plugin_initializer=loader, 

1382 plugin_loader=None, 

1383 plugin_path="<bundled>", 

1384 ) 

1385 

1386 

1387def load_plugin_features( 

1388 plugin_search_dirs: Sequence[str], 

1389 substitution: Substitution, 

1390 requested_plugins_only: Optional[Sequence[str]] = None, 

1391 required_plugins: Optional[Set[str]] = None, 

1392 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None, 

1393 debug_mode: bool = False, 

1394) -> PluginProvidedFeatureSet: 

1395 if plugin_feature_set is None: 

1396 plugin_feature_set = PluginProvidedFeatureSet() 

1397 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1398 unloadable_plugins = set() 

1399 if required_plugins: 

1400 plugins.extend( 

1401 find_json_plugins( 

1402 plugin_search_dirs, 

1403 required_plugins, 

1404 ) 

1405 ) 

1406 if requested_plugins_only is not None: 

1407 plugins.extend( 

1408 find_json_plugins( 

1409 plugin_search_dirs, 

1410 requested_plugins_only, 

1411 ) 

1412 ) 

1413 else: 

1414 auto_loaded = _find_all_json_plugins( 

1415 plugin_search_dirs, 

1416 required_plugins if required_plugins is not None else frozenset(), 

1417 debug_mode=debug_mode, 

1418 ) 

1419 for plugin_metadata in auto_loaded: 

1420 plugins.append(plugin_metadata) 

1421 unloadable_plugins.add(plugin_metadata.plugin_name) 

1422 

1423 for plugin_metadata in plugins: 

1424 api = DebputyPluginInitializerProvider( 

1425 plugin_metadata, plugin_feature_set, substitution 

1426 ) 

1427 try: 

1428 api.load_plugin() 

1429 except PluginBaseError as e: 

1430 if plugin_metadata.plugin_name not in unloadable_plugins: 

1431 raise 

1432 if debug_mode: 

1433 raise 

1434 try: 

1435 api.unload_plugin() 

1436 except Exception: 

1437 _warn( 

1438 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1439 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1440 " module might have tainted the feature set." 

1441 ) 

1442 raise e from None 

1443 else: 

1444 if debug_mode: 

1445 _warn( 

1446 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1447 f" to --debug/-d." 

1448 ) 

1449 _warn( 

1450 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1451 f" deactivated. Use debug mode (--debug) to show the stacktrace (the warning will become an error)" 

1452 ) 

1453 

1454 return plugin_feature_set 

1455 

1456 

1457def find_json_plugin( 

1458 search_dirs: Sequence[str], 

1459 requested_plugin: str, 

1460) -> DebputyPluginMetadata: 

1461 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1462 assert len(r) == 1 

1463 return r[0] 

1464 

1465 

1466def find_related_implementation_files_for_plugin( 

1467 plugin_metadata: DebputyPluginMetadata, 

1468) -> List[str]: 

1469 plugin_path = plugin_metadata.plugin_path 

1470 if not os.path.isfile(plugin_path): 

1471 plugin_name = plugin_metadata.plugin_name 

1472 _error( 

1473 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1474 " or loaded via a mechanism that does not support detecting its tests." 

1475 ) 

1476 files = [] 

1477 module_name, module_file = _find_plugin_implementation_file( 

1478 plugin_metadata.plugin_name, 

1479 plugin_metadata.plugin_path, 

1480 ) 

1481 if os.path.isfile(module_file): 

1482 files.append(module_file) 

1483 else: 

1484 if not plugin_metadata.is_loaded: 

1485 plugin_metadata.load_plugin() 

1486 if module_name in sys.modules: 

1487 _error( 

1488 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1489 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1490 f" installed via this method. The related Python would not be installed" 

1491 f" (which would result in a plugin that would fail to load)" 

1492 ) 

1493 

1494 return files 

1495 

1496 

1497def find_tests_for_plugin( 

1498 plugin_metadata: DebputyPluginMetadata, 

1499) -> List[str]: 

1500 plugin_name = plugin_metadata.plugin_name 

1501 plugin_path = plugin_metadata.plugin_path 

1502 

1503 if not os.path.isfile(plugin_path): 

1504 _error( 

1505 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1506 " mechanism that does not support detecting its tests." 

1507 ) 

1508 

1509 plugin_dir = os.path.dirname(plugin_path) 

1510 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1511 tests = [] 

1512 with os.scandir(plugin_dir) as dir_iter: 

1513 for p in dir_iter: 

1514 if ( 

1515 p.is_file() 

1516 and p.name.startswith(test_basename_prefix) 

1517 and PLUGIN_TEST_SUFFIX.search(p.name) 

1518 ): 

1519 tests.append(p.path) 

1520 return tests 

1521 

1522 

1523def find_json_plugins( 

1524 search_dirs: Sequence[str], 

1525 requested_plugins: Iterable[str], 

1526) -> Iterable[DebputyPluginMetadata]: 

1527 for plugin_name_or_path in requested_plugins: 

1528 found = False 

1529 if "/" in plugin_name_or_path: 1529 ↛ 1530line 1529 didn't jump to line 1530, because the condition on line 1529 was never true

1530 if not os.path.isfile(plugin_name_or_path): 

1531 raise PluginNotFoundError( 

1532 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1533 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1534 " is not used." 

1535 ) 

1536 yield parse_json_plugin_desc(plugin_name_or_path) 

1537 return 

1538 for search_dir in search_dirs: 

1539 path = os.path.join( 

1540 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1541 ) 

1542 if not os.path.isfile(path): 1542 ↛ 1543line 1542 didn't jump to line 1543, because the condition on line 1542 was never true

1543 continue 

1544 found = True 

1545 yield parse_json_plugin_desc(path) 

1546 if not found: 1546 ↛ 1547line 1546 didn't jump to line 1547, because the condition on line 1546 was never true

1547 search_dir_str = ":".join(search_dirs) 

1548 raise PluginNotFoundError( 

1549 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1550 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1551 ) 

1552 

1553 

1554def _find_all_json_plugins( 

1555 search_dirs: Sequence[str], 

1556 required_plugins: AbstractSet[str], 

1557 debug_mode: bool = False, 

1558) -> Iterable[DebputyPluginMetadata]: 

1559 seen = set(required_plugins) 

1560 error_seen = False 

1561 for search_dir in search_dirs: 

1562 try: 

1563 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1564 except FileNotFoundError: 

1565 continue 

1566 with dir_fd: 

1567 for entry in dir_fd: 

1568 if ( 

1569 not entry.is_file(follow_symlinks=True) 

1570 or not entry.name.endswith(".json") 

1571 or entry.name in seen 

1572 ): 

1573 continue 

1574 try: 

1575 plugin_metadata = parse_json_plugin_desc(entry.path) 

1576 except PluginBaseError as e: 

1577 if debug_mode: 

1578 raise 

1579 if not error_seen: 

1580 error_seen = True 

1581 _warn( 

1582 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1583 ) 

1584 else: 

1585 _warn( 

1586 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1587 ) 

1588 else: 

1589 yield plugin_metadata 

1590 

1591 

1592def _find_plugin_implementation_file( 

1593 plugin_name: str, 

1594 json_file_path: str, 

1595) -> Tuple[str, str]: 

1596 guessed_module_basename = plugin_name.replace("-", "_") 

1597 module_name = f"debputy.plugin.{guessed_module_basename}" 

1598 module_fs_path = os.path.join( 

1599 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1600 ) 

1601 return module_name, module_fs_path 

1602 

1603 

1604def _resolve_module_initializer( 

1605 plugin_name: str, 

1606 plugin_initializer_name: str, 

1607 module_name: Optional[str], 

1608 json_file_path: str, 

1609) -> PluginInitializationEntryPoint: 

1610 module = None 

1611 module_fs_path = None 

1612 if module_name is None: 1612 ↛ 1640line 1612 didn't jump to line 1640, because the condition on line 1612 was never false

1613 module_name, module_fs_path = _find_plugin_implementation_file( 

1614 plugin_name, json_file_path 

1615 ) 

1616 if os.path.isfile(module_fs_path): 1616 ↛ 1640line 1616 didn't jump to line 1640, because the condition on line 1616 was never false

1617 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1618 if spec is None: 1618 ↛ 1619line 1618 didn't jump to line 1619, because the condition on line 1618 was never true

1619 raise PluginInitializationError( 

1620 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1621 " The spec_from_file_location function returned None." 

1622 ) 

1623 mod = importlib.util.module_from_spec(spec) 

1624 loader = spec.loader 

1625 if loader is None: 1625 ↛ 1626line 1625 didn't jump to line 1626, because the condition on line 1625 was never true

1626 raise PluginInitializationError( 

1627 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1628 " Python could not find a suitable loader (spec.loader was None)" 

1629 ) 

1630 sys.modules[module_name] = mod 

1631 try: 

1632 loader.exec_module(mod) 

1633 except (Exception, GeneratorExit) as e: 

1634 raise PluginInitializationError( 

1635 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1636 " The module threw an exception while being loaded." 

1637 ) from e 

1638 module = mod 

1639 

1640 if module is None: 1640 ↛ 1641line 1640 didn't jump to line 1641, because the condition on line 1640 was never true

1641 try: 

1642 module = importlib.import_module(module_name) 

1643 except ModuleNotFoundError as e: 

1644 if module_fs_path is None: 

1645 raise PluginMetadataError( 

1646 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1647 " this module is not available in the python search path" 

1648 ) from e 

1649 raise PluginInitializationError( 

1650 f"Failed to load {plugin_name}. Tried loading it from" 

1651 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

1652 f" {module_name} (where it was not found either). Please ensure" 

1653 " the module code is installed in the correct spot or provide an" 

1654 f' explicit "module" definition in {json_file_path}.' 

1655 ) from e 

1656 

1657 plugin_initializer = getattr(module, plugin_initializer_name) 

1658 

1659 if plugin_initializer is None: 1659 ↛ 1660line 1659 didn't jump to line 1660, because the condition on line 1659 was never true

1660 raise PluginMetadataError( 

1661 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

1662 f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin" 

1663 f" metadata or initializer name in the Python module." 

1664 ) 

1665 return cast("PluginInitializationEntryPoint", plugin_initializer) 

1666 

1667 

1668def _json_plugin_loader( 

1669 plugin_name: str, 

1670 plugin_json_metadata: PluginJsonMetadata, 

1671 json_file_path: str, 

1672 attribute_path: AttributePath, 

1673) -> Callable[["DebputyPluginInitializer"], None]: 

1674 api_compat = plugin_json_metadata["api_compat_version"] 

1675 module_name = plugin_json_metadata.get("module") 

1676 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

1677 packager_provided_files_raw = plugin_json_metadata.get( 

1678 "packager_provided_files", [] 

1679 ) 

1680 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

1681 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

1682 if api_compat != 1: 1682 ↛ 1683line 1682 didn't jump to line 1683, because the condition on line 1682 was never true

1683 raise PluginMetadataError( 

1684 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

1685 f" version of debputy only supports API compat version of 1" 

1686 ) 

1687 if plugin_initializer_name is not None and "." in plugin_initializer_name: 1687 ↛ 1688line 1687 didn't jump to line 1688, because the condition on line 1687 was never true

1688 p = attribute_path["plugin_initializer"] 

1689 raise PluginMetadataError( 

1690 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

1691 ) 

1692 

1693 plugin_initializers = [] 

1694 

1695 if plugin_initializer_name is not None: 

1696 plugin_initializer = _resolve_module_initializer( 

1697 plugin_name, 

1698 plugin_initializer_name, 

1699 module_name, 

1700 json_file_path, 

1701 ) 

1702 plugin_initializers.append(plugin_initializer) 

1703 

1704 if known_packaging_files_raw: 1704 ↛ 1705line 1704 didn't jump to line 1705, because the condition on line 1704 was never true

1705 kpf_root_path = attribute_path["known_packaging_files"] 

1706 known_packaging_files = [] 

1707 for k, v in enumerate(known_packaging_files_raw): 

1708 kpf_path = kpf_root_path[k] 

1709 p = v.get("path") 

1710 if isinstance(p, str): 

1711 kpf_path.path_hint = p 

1712 if plugin_name.startswith("debputy-") and isinstance(v, dict): 

1713 docs = v.get("documentation-uris") 

1714 if docs is not None and isinstance(docs, list): 

1715 docs = [ 

1716 ( 

1717 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

1718 if isinstance(d, str) 

1719 else d 

1720 ) 

1721 for d in docs 

1722 ] 

1723 v["documentation-uris"] = docs 

1724 known_packaging_file: KnownPackagingFileInfo = ( 

1725 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

1726 v, 

1727 kpf_path, 

1728 ) 

1729 ) 

1730 known_packaging_files.append((kpf_path, known_packaging_file)) 

1731 

1732 def _initialize_json_provided_known_packaging_files( 

1733 api: DebputyPluginInitializerProvider, 

1734 ) -> None: 

1735 for p, details in known_packaging_files: 

1736 try: 

1737 api.known_packaging_files(details) 

1738 except ValueError as ex: 

1739 raise PluginMetadataError( 

1740 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

1741 ) 

1742 

1743 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

1744 

1745 if manifest_variables_raw: 

1746 manifest_var_path = attribute_path["manifest_variables"] 

1747 manifest_variables = [ 

1748 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

1749 for i, p in enumerate(manifest_variables_raw) 

1750 ] 

1751 

1752 def _initialize_json_provided_manifest_vars( 

1753 api: DebputyPluginInitializer, 

1754 ) -> None: 

1755 for idx, manifest_variable in enumerate(manifest_variables): 

1756 name = manifest_variable["name"] 

1757 value = manifest_variable["value"] 

1758 doc = manifest_variable.get("reference_documentation") 

1759 try: 

1760 api.manifest_variable( 

1761 name, value, variable_reference_documentation=doc 

1762 ) 

1763 except ValueError as ex: 

1764 var_path = manifest_var_path[idx] 

1765 raise PluginMetadataError( 

1766 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

1767 ) 

1768 

1769 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

1770 

1771 if packager_provided_files_raw: 

1772 ppf_path = attribute_path["packager_provided_files"] 

1773 ppfs = [ 

1774 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

1775 for i, p in enumerate(packager_provided_files_raw) 

1776 ] 

1777 

1778 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

1779 ppf: PackagerProvidedFileJsonDescription 

1780 for idx, ppf in enumerate(ppfs): 

1781 c = dict(ppf) 

1782 stem = ppf["stem"] 

1783 installed_path = ppf["installed_path"] 

1784 default_mode = ppf.get("default_mode") 

1785 ref_doc_dict = ppf.get("reference_documentation") 

1786 if default_mode is not None: 1786 ↛ 1789line 1786 didn't jump to line 1789, because the condition on line 1786 was never false

1787 c["default_mode"] = default_mode.octal_mode 

1788 

1789 if ref_doc_dict is not None: 1789 ↛ 1794line 1789 didn't jump to line 1794, because the condition on line 1789 was never false

1790 ref_doc = packager_provided_file_reference_documentation( 

1791 **ref_doc_dict 

1792 ) 

1793 else: 

1794 ref_doc = None 

1795 

1796 for k in [ 

1797 "stem", 

1798 "installed_path", 

1799 "reference_documentation", 

1800 ]: 

1801 try: 

1802 del c[k] 

1803 except KeyError: 

1804 pass 

1805 

1806 try: 

1807 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

1808 except ValueError as ex: 

1809 p_path = ppf_path[idx] 

1810 raise PluginMetadataError( 

1811 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

1812 ) 

1813 

1814 plugin_initializers.append(_initialize_json_provided_ppfs) 

1815 

1816 if not plugin_initializers: 1816 ↛ 1817line 1816 didn't jump to line 1817, because the condition on line 1816 was never true

1817 raise PluginMetadataError( 

1818 f"The plugin defined in {json_file_path} does not seem to provide features, " 

1819 f" such as module + plugin-initializer or packager-provided-files." 

1820 ) 

1821 

1822 if len(plugin_initializers) == 1: 

1823 return plugin_initializers[0] 

1824 

1825 def _chain_loader(api: DebputyPluginInitializer) -> None: 

1826 for initializer in plugin_initializers: 

1827 initializer(api) 

1828 

1829 return _chain_loader 

1830 

1831 

1832@contextlib.contextmanager 

1833def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]: 

1834 if fd is not None: 

1835 yield fd 

1836 else: 

1837 with open(path, "rb") as fd: 

1838 yield fd 

1839 

1840 

1841def parse_json_plugin_desc( 

1842 path: str, *, fd: Optional[IO[bytes]] = None 

1843) -> DebputyPluginMetadata: 

1844 with _open(path, fd=fd) as rfd: 

1845 try: 

1846 raw = json.load(rfd) 

1847 except JSONDecodeError as e: 

1848 raise PluginMetadataError( 

1849 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

1850 ) from e 

1851 plugin_name = os.path.basename(path) 

1852 if plugin_name.endswith(".json"): 

1853 plugin_name = plugin_name[:-5] 

1854 elif plugin_name.endswith(".json.in"): 

1855 plugin_name = plugin_name[:-8] 

1856 

1857 if plugin_name == "debputy": 1857 ↛ 1859line 1857 didn't jump to line 1859, because the condition on line 1857 was never true

1858 # Provide a better error message than "The plugin has already loaded!?" 

1859 raise PluginMetadataError( 

1860 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

1861 f" clash with the bundled plugin of same name." 

1862 ) 

1863 

1864 attribute_path = AttributePath.root_path() 

1865 

1866 try: 

1867 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

1868 raw, 

1869 attribute_path, 

1870 ) 

1871 except ManifestParseException as e: 

1872 raise PluginMetadataError( 

1873 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

1874 ) from e 

1875 api_compat = plugin_json_metadata["api_compat_version"] 

1876 

1877 return DebputyPluginMetadata( 

1878 plugin_name=plugin_name, 

1879 plugin_loader=lambda: _json_plugin_loader( 

1880 plugin_name, 

1881 plugin_json_metadata, 

1882 path, 

1883 attribute_path, 

1884 ), 

1885 api_compat_version=api_compat, 

1886 plugin_initializer=None, 

1887 plugin_path=path, 

1888 ) 

1889 

1890 

1891@dataclasses.dataclass(slots=True, frozen=True) 

1892class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

1893 name: str 

1894 names: Sequence[str] 

1895 path: VirtualPath 

1896 type_of_service: str 

1897 service_scope: str 

1898 auto_enable_on_install: bool 

1899 auto_start_on_install: bool 

1900 on_upgrade: ServiceUpgradeRule 

1901 definition_source: str 

1902 is_plugin_provided_definition: bool 

1903 service_context: Optional[DSD] 

1904 

1905 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

1906 return dataclasses.replace(self, **changes) 

1907 

1908 

1909class ServiceRegistryImpl(ServiceRegistry[DSD]): 

1910 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

1911 

1912 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

1913 self._service_manager_details = service_manager_details 

1914 self._service_definitions: List[ServiceDefinition[DSD]] = [] 

1915 self._seen_services = set() 

1916 

1917 @property 

1918 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

1919 return self._service_definitions 

1920 

1921 def register_service( 

1922 self, 

1923 path: VirtualPath, 

1924 name: Union[str, List[str]], 

1925 *, 

1926 type_of_service: str = "service", # "timer", etc. 

1927 service_scope: str = "system", 

1928 enable_by_default: bool = True, 

1929 start_by_default: bool = True, 

1930 default_upgrade_rule: ServiceUpgradeRule = "restart", 

1931 service_context: Optional[DSD] = None, 

1932 ) -> None: 

1933 names = name if isinstance(name, list) else [name] 

1934 if len(names) < 1: 

1935 raise ValueError( 

1936 f"The service must have at least one name - {path.absolute} did not have any" 

1937 ) 

1938 for n in names: 

1939 key = (n, type_of_service, service_scope) 

1940 if key in self._seen_services: 

1941 raise PluginAPIViolationError( 

1942 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

1943 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

1944 " allowed by the debputy plugin API." 

1945 ) 

1946 # TODO: We cannot create a service definition immediate once the manifest is involved 

1947 self._service_definitions.append( 

1948 ServiceDefinitionImpl( 

1949 names[0], 

1950 names, 

1951 path, 

1952 type_of_service, 

1953 service_scope, 

1954 enable_by_default, 

1955 start_by_default, 

1956 default_upgrade_rule, 

1957 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

1958 True, 

1959 service_context, 

1960 ) 

1961 )