From ec14b3742103754d9022782587d0b5cf2f9fd1e3 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 25 Apr 2024 04:59:48 +0200 Subject: Merging upstream version 0.1.29. Signed-off-by: Daniel Baumann --- coverage-report/d_64287305fe0c6642_impl_py.html | 2060 ----------------------- 1 file changed, 2060 deletions(-) delete mode 100644 coverage-report/d_64287305fe0c6642_impl_py.html (limited to 'coverage-report/d_64287305fe0c6642_impl_py.html') diff --git a/coverage-report/d_64287305fe0c6642_impl_py.html b/coverage-report/d_64287305fe0c6642_impl_py.html deleted file mode 100644 index c2c961d..0000000 --- a/coverage-report/d_64287305fe0c6642_impl_py.html +++ /dev/null @@ -1,2060 +0,0 @@ - - - - - Coverage for src/debputy/plugin/api/impl.py: 55% - - - - - -
-
-

- Coverage for src/debputy/plugin/api/impl.py: - 55% -

- -

- 753 statements   - - - - -

-

- « prev     - ^ index     - » next -       - coverage.py v7.2.7, - created at 2024-04-07 12:14 +0200 -

- -
-
-
-

1import contextlib 

-

2import dataclasses 

-

3import functools 

-

4import importlib 

-

5import importlib.util 

-

6import itertools 

-

7import json 

-

8import os 

-

9import re 

-

10import subprocess 

-

11import sys 

-

12from abc import ABC 

-

13from json import JSONDecodeError 

-

14from typing import ( 

-

15 Optional, 

-

16 Callable, 

-

17 Dict, 

-

18 Tuple, 

-

19 Iterable, 

-

20 Sequence, 

-

21 Type, 

-

22 List, 

-

23 Union, 

-

24 Set, 

-

25 Iterator, 

-

26 IO, 

-

27 Mapping, 

-

28 AbstractSet, 

-

29 cast, 

-

30 FrozenSet, 

-

31 Any, 

-

32 Literal, 

-

33) 

-

34 

-

35from debputy import DEBPUTY_DOC_ROOT_DIR 

-

36from debputy.exceptions import ( 

-

37 DebputySubstitutionError, 

-

38 PluginConflictError, 

-

39 PluginMetadataError, 

-

40 PluginBaseError, 

-

41 PluginInitializationError, 

-

42 PluginAPIViolationError, 

-

43 PluginNotFoundError, 

-

44) 

-

45from debputy.maintscript_snippet import ( 

-

46 STD_CONTROL_SCRIPTS, 

-

47 MaintscriptSnippetContainer, 

-

48 MaintscriptSnippet, 

-

49) 

-

50from debputy.manifest_parser.base_types import TypeMapping 

-

51from debputy.manifest_parser.exceptions import ManifestParseException 

-

52from debputy.manifest_parser.parser_data import ParserContextData 

-

53from debputy.manifest_parser.util import AttributePath 

-

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

-

55from debputy.plugin.api.impl_types import ( 

-

56 DebputyPluginMetadata, 

-

57 PackagerProvidedFileClassSpec, 

-

58 MetadataOrMaintscriptDetector, 

-

59 PluginProvidedTrigger, 

-

60 TTP, 

-

61 DIPHandler, 

-

62 PF, 

-

63 SF, 

-

64 DIPKWHandler, 

-

65 PluginProvidedManifestVariable, 

-

66 PluginProvidedPackageProcessor, 

-

67 PluginProvidedDiscardRule, 

-

68 AutomaticDiscardRuleExample, 

-

69 PPFFormatParam, 

-

70 ServiceManagerDetails, 

-

71 resolve_package_type_selectors, 

-

72 KnownPackagingFileInfo, 

-

73 PluginProvidedKnownPackagingFile, 

-

74 InstallPatternDHCompatRule, 

-

75 PluginProvidedTypeMapping, 

-

76) 

-

77from debputy.plugin.api.plugin_parser import ( 

-

78 PLUGIN_METADATA_PARSER, 

-

79 PluginJsonMetadata, 

-

80 PLUGIN_PPF_PARSER, 

-

81 PackagerProvidedFileJsonDescription, 

-

82 PLUGIN_MANIFEST_VARS_PARSER, 

-

83 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

-

84) 

-

85from debputy.plugin.api.spec import ( 

-

86 MaintscriptAccessor, 

-

87 Maintscript, 

-

88 DpkgTriggerType, 

-

89 BinaryCtrlAccessor, 

-

90 PackageProcessingContext, 

-

91 MetadataAutoDetector, 

-

92 PluginInitializationEntryPoint, 

-

93 DebputyPluginInitializer, 

-

94 PackageTypeSelector, 

-

95 FlushableSubstvars, 

-

96 ParserDocumentation, 

-

97 PackageProcessor, 

-

98 VirtualPath, 

-

99 ServiceIntegrator, 

-

100 ServiceDetector, 

-

101 ServiceRegistry, 

-

102 ServiceDefinition, 

-

103 DSD, 

-

104 ServiceUpgradeRule, 

-

105 PackagerProvidedFileReferenceDocumentation, 

-

106 packager_provided_file_reference_documentation, 

-

107 TypeMappingDocumentation, 

-

108) 

-

109from debputy.substitution import ( 

-

110 Substitution, 

-

111 VariableNameState, 

-

112 SUBST_VAR_RE, 

-

113 VariableContext, 

-

114) 

-

115from debputy.util import ( 

-

116 _normalize_path, 

-

117 POSTINST_DEFAULT_CONDITION, 

-

118 _error, 

-

119 print_command, 

-

120 _warn, 

-

121) 

-

122 

-

123PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

-

124 

-

125 

-

126def _validate_known_packaging_file_dh_compat_rules( 

-

127 dh_compat_rules: Optional[List[InstallPatternDHCompatRule]], 

-

128) -> None: 

-

129 max_compat = None 

-

130 if not dh_compat_rules: 

-

131 return 

-

132 dh_compat_rule: InstallPatternDHCompatRule 

-

133 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

-

134 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

-

135 compat = dh_compat_rule.get("starting_with_compat_level") 

-

136 

-

137 remaining = dh_compat_rule.keys() - { 

-

138 "after_debhelper_version", 

-

139 "starting_with_compat_level", 

-

140 } 

-

141 if not remaining: 

-

142 raise ValueError( 

-

143 f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?" 

-

144 ) 

-

145 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

-

146 raise ValueError( 

-

147 f"The dh compat-rule at index {idx} is not the last and is missing either" 

-

148 " before-debhelper-version or before-compat-level" 

-

149 ) 

-

150 if compat is not None and compat < 0: 

-

151 raise ValueError( 

-

152 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

-

153 f" for something that appeared when migrating from {compat} to {compat + 1}." 

-

154 ) 

-

155 

-

156 if max_compat is None: 

-

157 max_compat = compat 

-

158 elif compat is not None: 

-

159 if compat >= max_compat: 

-

160 raise ValueError( 

-

161 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

-

162 ) 

-

163 max_compat = compat 

-

164 

-

165 install_pattern = dh_compat_rule.get("install_pattern") 

-

166 if ( 

-

167 install_pattern is not None 

-

168 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

-

169 ): 

-

170 raise ValueError( 

-

171 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

-

172 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

-

173 ) 

-

174 

-

175 

-

176class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

-

177 __slots__ = ( 

-

178 "_plugin_metadata", 

-

179 "_feature_set", 

-

180 "_plugin_detector_ids", 

-

181 "_substitution", 

-

182 "_unloaders", 

-

183 "_load_started", 

-

184 ) 

-

185 

-

186 def __init__( 

-

187 self, 

-

188 plugin_metadata: DebputyPluginMetadata, 

-

189 feature_set: PluginProvidedFeatureSet, 

-

190 substitution: Substitution, 

-

191 ) -> None: 

-

192 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

-

193 self._feature_set = feature_set 

-

194 self._plugin_detector_ids: Set[str] = set() 

-

195 self._substitution = substitution 

-

196 self._unloaders: List[Callable[[], None]] = [] 

-

197 self._load_started = False 

-

198 

-

199 def unload_plugin(self) -> None: 

-

200 if self._load_started: 

-

201 for unloader in self._unloaders: 

-

202 unloader() 

-

203 del self._feature_set.plugin_data[self._plugin_name] 

-

204 

-

205 def load_plugin(self) -> None: 

-

206 metadata = self._plugin_metadata 

-

207 if metadata.plugin_name in self._feature_set.plugin_data: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

-

208 raise PluginConflictError( 

-

209 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

-

210 ) 

-

211 assert ( 

-

212 metadata.api_compat_version == 1 

-

213 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

-

214 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

-

215 self._load_started = True 

-

216 assert not metadata.is_initialized 

-

217 try: 

-

218 metadata.initialize_plugin(self) 

-

219 except Exception as e: 

-

220 initializer = metadata.plugin_initializer 

-

221 if ( 221 ↛ 226line 221 didn't jump to line 226

-

222 isinstance(e, TypeError) 

-

223 and initializer is not None 

-

224 and not callable(initializer) 

-

225 ): 

-

226 raise PluginMetadataError( 

-

227 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

-

228 f" callable (callable returns False). The specified entry point identifies" 

-

229 f' itself as "{initializer.__qualname__}".' 

-

230 ) from e 

-

231 elif isinstance(e, PluginBaseError): 231 ↛ 233line 231 didn't jump to line 233, because the condition on line 231 was never false

-

232 raise 

-

233 raise PluginInitializationError( 

-

234 f"Exception while attempting to load plugin {metadata.plugin_name}" 

-

235 ) from e 

-

236 

-

237 def packager_provided_file( 

-

238 self, 

-

239 stem: str, 

-

240 installed_path: str, 

-

241 *, 

-

242 default_mode: int = 0o0644, 

-

243 default_priority: Optional[int] = None, 

-

244 allow_name_segment: bool = True, 

-

245 allow_architecture_segment: bool = False, 

-

246 post_formatting_rewrite: Optional[Callable[[str], str]] = None, 

-

247 packageless_is_fallback_for_all_packages: bool = False, 

-

248 reservation_only: bool = False, 

-

249 format_callback: Optional[ 

-

250 Callable[[str, PPFFormatParam, VirtualPath], str] 

-

251 ] = None, 

-

252 reference_documentation: Optional[ 

-

253 PackagerProvidedFileReferenceDocumentation 

-

254 ] = None, 

-

255 ) -> None: 

-

256 packager_provided_files = self._feature_set.packager_provided_files 

-

257 existing = packager_provided_files.get(stem) 

-

258 

-

259 if format_callback is not None and self._plugin_name != "debputy": 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

-

260 raise ValueError( 

-

261 "Sorry; Using format_callback is a debputy-internal" 

-

262 f" API. Triggered by plugin {self._plugin_name}" 

-

263 ) 

-

264 

-

265 if installed_path.endswith("/"): 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

-

266 raise ValueError( 

-

267 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

-

268 f" Triggered by plugin {self._plugin_name}." 

-

269 ) 

-

270 

-

271 installed_path = _normalize_path(installed_path) 

-

272 

-

273 has_name_var = "{name}" in installed_path 

-

274 

-

275 if installed_path.startswith("./DEBIAN") or reservation_only: 

-

276 # Special-case, used for control files. 

-

277 if self._plugin_name != "debputy": 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true

-

278 raise ValueError( 

-

279 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

-

280 f" API. Triggered by plugin {self._plugin_name}" 

-

281 ) 

-

282 elif not has_name_var and "{owning_package}" not in installed_path: 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

-

283 raise ValueError( 

-

284 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

-

285 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

-

286 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

-

287 ) 

-

288 

-

289 if allow_name_segment and not has_name_var: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true

-

290 raise ValueError( 

-

291 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

-

292 " variable. Otherwise, the name segment will not work properly. Triggered by" 

-

293 f" plugin {self._plugin_name}" 

-

294 ) 

-

295 

-

296 if ( 296 ↛ 301line 296 didn't jump to line 301

-

297 default_priority is not None 

-

298 and "{priority}" not in installed_path 

-

299 and "{priority:02}" not in installed_path 

-

300 ): 

-

301 raise ValueError( 

-

302 'When default_priority is not None, the installed_path should have a "{priority}"' 

-

303 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

-

304 f" Triggered by plugin {self._plugin_name}" 

-

305 ) 

-

306 

-

307 if existing is not None: 

-

308 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 308 ↛ 315line 308 didn't jump to line 315

-

309 message = ( 

-

310 f'The stem "{stem}" is registered twice for packager provided files.' 

-

311 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

-

312 f" by {self._plugin_name}" 

-

313 ) 

-

314 else: 

-

315 message = ( 

-

316 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

317 f' stem "{stem}" twice for packager provided files.' 

-

318 ) 

-

319 raise PluginConflictError( 

-

320 message, existing.debputy_plugin_metadata, self._plugin_metadata 

-

321 ) 

-

322 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

-

323 self._plugin_metadata, 

-

324 stem, 

-

325 installed_path, 

-

326 default_mode=default_mode, 

-

327 default_priority=default_priority, 

-

328 allow_name_segment=allow_name_segment, 

-

329 allow_architecture_segment=allow_architecture_segment, 

-

330 post_formatting_rewrite=post_formatting_rewrite, 

-

331 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

-

332 reservation_only=reservation_only, 

-

333 formatting_callback=format_callback, 

-

334 reference_documentation=reference_documentation, 

-

335 ) 

-

336 

-

337 def _unload() -> None: 

-

338 del packager_provided_files[stem] 

-

339 

-

340 self._unloaders.append(_unload) 

-

341 

-

342 def metadata_or_maintscript_detector( 

-

343 self, 

-

344 auto_detector_id: str, 

-

345 auto_detector: MetadataAutoDetector, 

-

346 *, 

-

347 package_type: PackageTypeSelector = "deb", 

-

348 ) -> None: 

-

349 if auto_detector_id in self._plugin_detector_ids: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

-

350 raise ValueError( 

-

351 f"The plugin {self._plugin_name} tried to register" 

-

352 f' "{auto_detector_id}" twice' 

-

353 ) 

-

354 self._plugin_detector_ids.add(auto_detector_id) 

-

355 all_detectors = self._feature_set.metadata_maintscript_detectors 

-

356 if self._plugin_name not in all_detectors: 

-

357 all_detectors[self._plugin_name] = [] 

-

358 package_types = resolve_package_type_selectors(package_type) 

-

359 all_detectors[self._plugin_name].append( 

-

360 MetadataOrMaintscriptDetector( 

-

361 detector_id=auto_detector_id, 

-

362 detector=auto_detector, 

-

363 plugin_metadata=self._plugin_metadata, 

-

364 applies_to_package_types=package_types, 

-

365 enabled=True, 

-

366 ) 

-

367 ) 

-

368 

-

369 def _unload() -> None: 

-

370 if self._plugin_name in all_detectors: 

-

371 del all_detectors[self._plugin_name] 

-

372 

-

373 self._unloaders.append(_unload) 

-

374 

-

375 def document_builtin_variable( 

-

376 self, 

-

377 variable_name: str, 

-

378 variable_reference_documentation: str, 

-

379 *, 

-

380 is_context_specific: bool = False, 

-

381 is_for_special_case: bool = False, 

-

382 ) -> None: 

-

383 manifest_variables = self._feature_set.manifest_variables 

-

384 self._restricted_api() 

-

385 state = self._substitution.variable_state(variable_name) 

-

386 if state == VariableNameState.UNDEFINED: 386 ↛ 387line 386 didn't jump to line 387, because the condition on line 386 was never true

-

387 raise ValueError( 

-

388 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

-

389 f" but it is not known to be a variable" 

-

390 ) 

-

391 

-

392 assert variable_name not in manifest_variables 

-

393 

-

394 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

-

395 self._plugin_metadata, 

-

396 variable_name, 

-

397 None, 

-

398 is_context_specific_variable=is_context_specific, 

-

399 variable_reference_documentation=variable_reference_documentation, 

-

400 is_documentation_placeholder=True, 

-

401 is_for_special_case=is_for_special_case, 

-

402 ) 

-

403 

-

404 def _unload() -> None: 

-

405 del manifest_variables[variable_name] 

-

406 

-

407 self._unloaders.append(_unload) 

-

408 

-

409 def manifest_variable_provider( 

-

410 self, 

-

411 provider: Callable[[VariableContext], Mapping[str, str]], 

-

412 variables: Union[Sequence[str], Mapping[str, Optional[str]]], 

-

413 ) -> None: 

-

414 self._restricted_api() 

-

415 cached_provider = functools.lru_cache(None)(provider) 

-

416 permitted_variables = frozenset(variables) 

-

417 variables_iter: Iterable[Tuple[str, Optional[str]]] 

-

418 if not isinstance(variables, Mapping): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true

-

419 variables_iter = zip(variables, itertools.repeat(None)) 

-

420 else: 

-

421 variables_iter = variables.items() 

-

422 

-

423 checked_vars = False 

-

424 manifest_variables = self._feature_set.manifest_variables 

-

425 plugin_name = self._plugin_name 

-

426 

-

427 def _value_resolver_generator( 

-

428 variable_name: str, 

-

429 ) -> Callable[[VariableContext], str]: 

-

430 def _value_resolver(variable_context: VariableContext) -> str: 

-

431 res = cached_provider(variable_context) 

-

432 nonlocal checked_vars 

-

433 if not checked_vars: 433 ↛ 444line 433 didn't jump to line 444, because the condition on line 433 was never false

-

434 if permitted_variables != res.keys(): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

-

435 expected = ", ".join(sorted(permitted_variables)) 

-

436 actual = ", ".join(sorted(res)) 

-

437 raise PluginAPIViolationError( 

-

438 f"The plugin {plugin_name} claimed to provide" 

-

439 f" the following variables {expected}," 

-

440 f" but when resolving the variables, the plugin provided" 

-

441 f" {actual}. These two lists should have been the same." 

-

442 ) 

-

443 checked_vars = False 

-

444 return res[variable_name] 

-

445 

-

446 return _value_resolver 

-

447 

-

448 for varname, vardoc in variables_iter: 

-

449 self._check_variable_name(varname) 

-

450 manifest_variables[varname] = PluginProvidedManifestVariable( 

-

451 self._plugin_metadata, 

-

452 varname, 

-

453 _value_resolver_generator(varname), 

-

454 is_context_specific_variable=False, 

-

455 variable_reference_documentation=vardoc, 

-

456 ) 

-

457 

-

458 def _unload() -> None: 

-

459 raise PluginInitializationError( 

-

460 "Cannot unload manifest_variable_provider (not implemented)" 

-

461 ) 

-

462 

-

463 self._unloaders.append(_unload) 

-

464 

-

465 def _check_variable_name(self, variable_name: str) -> None: 

-

466 manifest_variables = self._feature_set.manifest_variables 

-

467 existing = manifest_variables.get(variable_name) 

-

468 

-

469 if existing is not None: 

-

470 if existing.plugin_metadata.plugin_name == self._plugin_name: 470 ↛ 476line 470 didn't jump to line 476

-

471 message = ( 

-

472 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

473 f' manifest variable "{variable_name}" twice.' 

-

474 ) 

-

475 else: 

-

476 message = ( 

-

477 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

-

478 f" both tried to provide the manifest variable {variable_name}" 

-

479 ) 

-

480 raise PluginConflictError( 

-

481 message, existing.plugin_metadata, self._plugin_metadata 

-

482 ) 

-

483 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

-

484 raise ValueError( 

-

485 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

-

486 f" which is not a valid variable name" 

-

487 ) 

-

488 

-

489 namespace = "" 

-

490 variable_basename = variable_name 

-

491 if ":" in variable_name: 

-

492 namespace, variable_basename = variable_name.rsplit(":", 1) 

-

493 assert namespace != "" 

-

494 assert variable_name != "" 

-

495 

-

496 if namespace != "" and namespace not in ("token", "path"): 

-

497 raise ValueError( 

-

498 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

-

499 f" which is in the reserved namespace {namespace}" 

-

500 ) 

-

501 

-

502 variable_name_upper = variable_name.upper() 

-

503 if ( 

-

504 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

-

505 or variable_basename.startswith("_") 

-

506 or variable_basename.upper().startswith("DEBPUTY") 

-

507 ) and self._plugin_name != "debputy": 

-

508 raise ValueError( 

-

509 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

-

510 f" which is a variable name reserved by debputy" 

-

511 ) 

-

512 

-

513 state = self._substitution.variable_state(variable_name) 

-

514 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

-

515 raise ValueError( 

-

516 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

-

517 f" which would shadow a built-in variable" 

-

518 ) 

-

519 

-

520 def package_processor( 

-

521 self, 

-

522 processor_id: str, 

-

523 processor: PackageProcessor, 

-

524 *, 

-

525 depends_on_processor: Iterable[str] = tuple(), 

-

526 package_type: PackageTypeSelector = "deb", 

-

527 ) -> None: 

-

528 self._restricted_api(allowed_plugins={"lua"}) 

-

529 package_processors = self._feature_set.all_package_processors 

-

530 dependencies = set() 

-

531 processor_key = (self._plugin_name, processor_id) 

-

532 

-

533 if processor_key in package_processors: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

-

534 raise PluginConflictError( 

-

535 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

-

536 self._plugin_metadata, 

-

537 self._plugin_metadata, 

-

538 ) 

-

539 

-

540 for depends_ref in depends_on_processor: 

-

541 if isinstance(depends_ref, str): 541 ↛ 555line 541 didn't jump to line 555, because the condition on line 541 was never false

-

542 if (self._plugin_name, depends_ref) in package_processors: 542 ↛ 544line 542 didn't jump to line 544, because the condition on line 542 was never false

-

543 depends_key = (self._plugin_name, depends_ref) 

-

544 elif ("debputy", depends_ref) in package_processors: 

-

545 depends_key = ("debputy", depends_ref) 

-

546 else: 

-

547 raise ValueError( 

-

548 f'Could not resolve dependency "{depends_ref}" for' 

-

549 f' "{processor_id}". It was not provided by the plugin itself' 

-

550 f" ({self._plugin_name}) nor debputy." 

-

551 ) 

-

552 else: 

-

553 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

-

554 # via the direct dependencies. 

-

555 assert False 

-

556 

-

557 existing_processor = package_processors.get(depends_key) 

-

558 if existing_processor is None: 558 ↛ 561line 558 didn't jump to line 561, because the condition on line 558 was never true

-

559 # We currently require the processor to be declared already. If this ever changes, 

-

560 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

-

561 dplugin_name, dprocessor_name = depends_key 

-

562 available_processors = ", ".join( 

-

563 n for p, n in package_processors.keys() if p == dplugin_name 

-

564 ) 

-

565 raise ValueError( 

-

566 f"The plugin {dplugin_name} does not provide a processor called" 

-

567 f" {dprocessor_name}. Available processors for that plugin are:" 

-

568 f" {available_processors}" 

-

569 ) 

-

570 dependencies.add(depends_key) 

-

571 

-

572 package_processors[processor_key] = PluginProvidedPackageProcessor( 

-

573 processor_id, 

-

574 resolve_package_type_selectors(package_type), 

-

575 processor, 

-

576 frozenset(dependencies), 

-

577 self._plugin_metadata, 

-

578 ) 

-

579 

-

580 def _unload() -> None: 

-

581 del package_processors[processor_key] 

-

582 

-

583 self._unloaders.append(_unload) 

-

584 

-

585 def automatic_discard_rule( 

-

586 self, 

-

587 name: str, 

-

588 should_discard: Callable[[VirtualPath], bool], 

-

589 *, 

-

590 rule_reference_documentation: Optional[str] = None, 

-

591 examples: Union[ 

-

592 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample] 

-

593 ] = tuple(), 

-

594 ) -> None: 

-

595 """Register an automatic discard rule 

-

596 

-

597 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

-

598 If any discard rule concludes that a path should not be installed, then the path is not installed. 

-

599 In the case where the discard path is a: 

-

600 

-

601 * directory: Then the entire directory is excluded along with anything beneath it. 

-

602 * symlink: Then the symlink itself (but not its target) is excluded. 

-

603 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

-

604 

-

605 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

-

606 

-

607 Automatic discard rules should be written with the assumption that directories will be tested 

-

608 before their content *when it is relevant* for the discard rule to examine whether the directory 

-

609 can be excluded. 

-

610 

-

611 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

-

612 without any globs. As example: 

-

613 

-

614 installations: 

-

615 - install: 

-

616 sources: 

-

617 - usr/lib/libfoo.la # <-- This path is always installed 

-

618 # (Discard rules are never asked in this case) 

-

619 # 

-

620 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

-

621 # Though, they will not examine `libfoo.la` as it has already been installed 

-

622 # 

-

623 # Note: usr/lib itself is never tested in this case (it is assumed to be 

-

624 # explicitly requested). But any subdir of usr/lib will be examined. 

-

625 

-

626 When an automatic discard rule is evaluated, it can see the source path currently being considered 

-

627 for installation. While it can look at "surrounding" context (like parent directory), it will not 

-

628 know whether those paths are to be installed or will be installed. 

-

629 

-

630 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

-

631 metacharacters and spaces. 

-

632 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

-

633 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

-

634 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

-

635 A source path will either be from the root of the source tree or the root of a search directory such as 

-

636 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

-

637 evaluated. 

-

638 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

-

639 looks up this automatic discard rule. 

-

640 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

-

641 generate the examples. 

-

642 

-

643 """ 

-

644 self._restricted_api() 

-

645 auto_discard_rules = self._feature_set.auto_discard_rules 

-

646 existing = auto_discard_rules.get(name) 

-

647 if existing is not None: 647 ↛ 648line 647 didn't jump to line 648, because the condition on line 647 was never true

-

648 if existing.plugin_metadata.plugin_name == self._plugin_name: 

-

649 message = ( 

-

650 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

651 f' automatic discard rule "{name}" twice.' 

-

652 ) 

-

653 else: 

-

654 message = ( 

-

655 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

-

656 f" both tried to provide the automatic discard rule {name}" 

-

657 ) 

-

658 raise PluginConflictError( 

-

659 message, existing.plugin_metadata, self._plugin_metadata 

-

660 ) 

-

661 examples = ( 

-

662 (examples,) 

-

663 if isinstance(examples, AutomaticDiscardRuleExample) 

-

664 else tuple(examples) 

-

665 ) 

-

666 auto_discard_rules[name] = PluginProvidedDiscardRule( 

-

667 name, 

-

668 self._plugin_metadata, 

-

669 should_discard, 

-

670 rule_reference_documentation, 

-

671 examples, 

-

672 ) 

-

673 

-

674 def _unload() -> None: 

-

675 del auto_discard_rules[name] 

-

676 

-

677 self._unloaders.append(_unload) 

-

678 

-

679 def service_provider( 

-

680 self, 

-

681 service_manager: str, 

-

682 detector: ServiceDetector, 

-

683 integrator: ServiceIntegrator, 

-

684 ) -> None: 

-

685 self._restricted_api() 

-

686 service_managers = self._feature_set.service_managers 

-

687 existing = service_managers.get(service_manager) 

-

688 if existing is not None: 688 ↛ 689line 688 didn't jump to line 689, because the condition on line 688 was never true

-

689 if existing.plugin_metadata.plugin_name == self._plugin_name: 

-

690 message = ( 

-

691 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

692 f' service manager "{service_manager}" twice.' 

-

693 ) 

-

694 else: 

-

695 message = ( 

-

696 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

-

697 f' both tried to provide the service manager "{service_manager}"' 

-

698 ) 

-

699 raise PluginConflictError( 

-

700 message, existing.plugin_metadata, self._plugin_metadata 

-

701 ) 

-

702 service_managers[service_manager] = ServiceManagerDetails( 

-

703 service_manager, 

-

704 detector, 

-

705 integrator, 

-

706 self._plugin_metadata, 

-

707 ) 

-

708 

-

709 def _unload() -> None: 

-

710 del service_managers[service_manager] 

-

711 

-

712 self._unloaders.append(_unload) 

-

713 

-

714 def manifest_variable( 

-

715 self, 

-

716 variable_name: str, 

-

717 value: str, 

-

718 variable_reference_documentation: Optional[str] = None, 

-

719 ) -> None: 

-

720 self._check_variable_name(variable_name) 

-

721 manifest_variables = self._feature_set.manifest_variables 

-

722 try: 

-

723 resolved_value = self._substitution.substitute( 

-

724 value, "Plugin initialization" 

-

725 ) 

-

726 depends_on_variable = resolved_value != value 

-

727 except DebputySubstitutionError: 

-

728 depends_on_variable = True 

-

729 if depends_on_variable: 

-

730 raise ValueError( 

-

731 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

-

732 f" This value depends on another variable, which is not supported. This restriction may be" 

-

733 f" lifted in the future." 

-

734 ) 

-

735 

-

736 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

-

737 self._plugin_metadata, 

-

738 variable_name, 

-

739 value, 

-

740 is_context_specific_variable=False, 

-

741 variable_reference_documentation=variable_reference_documentation, 

-

742 ) 

-

743 

-

744 def _unload() -> None: 

-

745 # We need to check it was never resolved 

-

746 raise PluginInitializationError( 

-

747 "Cannot unload manifest_variable (not implemented)" 

-

748 ) 

-

749 

-

750 self._unloaders.append(_unload) 

-

751 

-

752 @property 

-

753 def _plugin_name(self) -> str: 

-

754 return self._plugin_metadata.plugin_name 

-

755 

-

756 def provide_manifest_keyword( 

-

757 self, 

-

758 rule_type: TTP, 

-

759 rule_name: Union[str, List[str]], 

-

760 handler: DIPKWHandler, 

-

761 *, 

-

762 inline_reference_documentation: Optional[ParserDocumentation] = None, 

-

763 ) -> None: 

-

764 self._restricted_api() 

-

765 parser_generator = self._feature_set.manifest_parser_generator 

-

766 if rule_type not in parser_generator.dispatchable_table_parsers: 766 ↛ 767line 766 didn't jump to line 767, because the condition on line 766 was never true

-

767 types = ", ".join( 

-

768 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

-

769 ) 

-

770 raise ValueError( 

-

771 f"The rule_type was not a supported type. It must be one of {types}" 

-

772 ) 

-

773 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

-

774 dispatching_parser.register_keyword( 

-

775 rule_name, 

-

776 handler, 

-

777 self._plugin_metadata, 

-

778 inline_reference_documentation=inline_reference_documentation, 

-

779 ) 

-

780 

-

781 def _unload() -> None: 

-

782 raise PluginInitializationError( 

-

783 "Cannot unload provide_manifest_keyword (not implemented)" 

-

784 ) 

-

785 

-

786 self._unloaders.append(_unload) 

-

787 

-

788 def pluggable_object_parser( 

-

789 self, 

-

790 rule_type: str, 

-

791 rule_name: str, 

-

792 *, 

-

793 object_parser_key: Optional[str] = None, 

-

794 on_end_parse_step: Optional[ 

-

795 Callable[ 

-

796 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData], 

-

797 None, 

-

798 ] 

-

799 ] = None, 

-

800 nested_in_package_context: bool = False, 

-

801 ) -> None: 

-

802 self._restricted_api() 

-

803 if object_parser_key is None: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true

-

804 object_parser_key = rule_name 

-

805 

-

806 parser_generator = self._feature_set.manifest_parser_generator 

-

807 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

-

808 if rule_type not in dispatchable_object_parsers: 808 ↛ 809line 808 didn't jump to line 809, because the condition on line 808 was never true

-

809 types = ", ".join(sorted(dispatchable_object_parsers)) 

-

810 raise ValueError( 

-

811 f"The rule_type was not a supported type. It must be one of {types}" 

-

812 ) 

-

813 if object_parser_key not in dispatchable_object_parsers: 813 ↛ 814line 813 didn't jump to line 814, because the condition on line 813 was never true

-

814 types = ", ".join(sorted(dispatchable_object_parsers)) 

-

815 raise ValueError( 

-

816 f"The object_parser_key was not a supported type. It must be one of {types}" 

-

817 ) 

-

818 parent_dispatcher = dispatchable_object_parsers[rule_type] 

-

819 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

-

820 parent_dispatcher.register_child_parser( 

-

821 rule_name, 

-

822 child_dispatcher, 

-

823 self._plugin_metadata, 

-

824 on_end_parse_step=on_end_parse_step, 

-

825 nested_in_package_context=nested_in_package_context, 

-

826 ) 

-

827 

-

828 def _unload() -> None: 

-

829 raise PluginInitializationError( 

-

830 "Cannot unload pluggable_object_parser (not implemented)" 

-

831 ) 

-

832 

-

833 self._unloaders.append(_unload) 

-

834 

-

835 def pluggable_manifest_rule( 

-

836 self, 

-

837 rule_type: Union[TTP, str], 

-

838 rule_name: Union[str, List[str]], 

-

839 parsed_format: Type[PF], 

-

840 handler: DIPHandler, 

-

841 *, 

-

842 source_format: Optional[SF] = None, 

-

843 inline_reference_documentation: Optional[ParserDocumentation] = None, 

-

844 ) -> None: 

-

845 self._restricted_api() 

-

846 feature_set = self._feature_set 

-

847 parser_generator = feature_set.manifest_parser_generator 

-

848 if isinstance(rule_type, str): 

-

849 if rule_type not in parser_generator.dispatchable_object_parsers: 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true

-

850 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

-

851 raise ValueError( 

-

852 f"The rule_type was not a supported type. It must be one of {types}" 

-

853 ) 

-

854 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

-

855 else: 

-

856 if rule_type not in parser_generator.dispatchable_table_parsers: 856 ↛ 857line 856 didn't jump to line 857, because the condition on line 856 was never true

-

857 types = ", ".join( 

-

858 sorted( 

-

859 x.__name__ for x in parser_generator.dispatchable_table_parsers 

-

860 ) 

-

861 ) 

-

862 raise ValueError( 

-

863 f"The rule_type was not a supported type. It must be one of {types}" 

-

864 ) 

-

865 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

-

866 

-

867 parser = feature_set.manifest_parser_generator.generate_parser( 

-

868 parsed_format, 

-

869 source_content=source_format, 

-

870 inline_reference_documentation=inline_reference_documentation, 

-

871 ) 

-

872 dispatching_parser.register_parser( 

-

873 rule_name, 

-

874 parser, 

-

875 handler, 

-

876 self._plugin_metadata, 

-

877 ) 

-

878 

-

879 def _unload() -> None: 

-

880 raise PluginInitializationError( 

-

881 "Cannot unload pluggable_manifest_rule (not implemented)" 

-

882 ) 

-

883 

-

884 self._unloaders.append(_unload) 

-

885 

-

886 def known_packaging_files( 

-

887 self, 

-

888 packaging_file_details: KnownPackagingFileInfo, 

-

889 ) -> None: 

-

890 known_packaging_files = self._feature_set.known_packaging_files 

-

891 detection_method = packaging_file_details.get( 

-

892 "detection_method", cast("Literal['path']", "path") 

-

893 ) 

-

894 path = packaging_file_details.get("path") 

-

895 dhpkgfile = packaging_file_details.get("pkgfile") 

-

896 

-

897 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy() 

-

898 

-

899 if detection_method == "path": 

-

900 if dhpkgfile is not None: 

-

901 raise ValueError( 

-

902 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

-

903 ) 

-

904 if path != _normalize_path(path, with_prefix=False): 

-

905 raise ValueError( 

-

906 f"The path for known packaging files must be normalized. Please replace" 

-

907 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

-

908 ) 

-

909 detection_value = path 

-

910 else: 

-

911 assert detection_method == "dh.pkgfile" 

-

912 if path is not None: 

-

913 raise ValueError( 

-

914 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

-

915 ) 

-

916 if "/" in dhpkgfile: 

-

917 raise ValueError( 

-

918 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

-

919 ) 

-

920 detection_value = dhpkgfile 

-

921 key = f"{detection_method}::{detection_value}" 

-

922 existing = known_packaging_files.get(key) 

-

923 if existing is not None: 

-

924 if existing.plugin_metadata.plugin_name != self._plugin_name: 

-

925 message = ( 

-

926 f'The key "{key}" is registered twice for known packaging files.' 

-

927 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

-

928 ) 

-

929 else: 

-

930 message = ( 

-

931 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

932 f' key "{key}" twice for known packaging files.' 

-

933 ) 

-

934 raise PluginConflictError( 

-

935 message, existing.plugin_metadata, self._plugin_metadata 

-

936 ) 

-

937 _validate_known_packaging_file_dh_compat_rules( 

-

938 packaging_file_details.get("dh_compat_rules") 

-

939 ) 

-

940 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

-

941 packaging_file_details, 

-

942 detection_method, 

-

943 detection_value, 

-

944 self._plugin_metadata, 

-

945 ) 

-

946 

-

947 def _unload() -> None: 

-

948 del known_packaging_files[key] 

-

949 

-

950 self._unloaders.append(_unload) 

-

951 

-

952 def register_mapped_type( 

-

953 self, 

-

954 type_mapping: TypeMapping, 

-

955 *, 

-

956 reference_documentation: Optional[TypeMappingDocumentation] = None, 

-

957 ) -> None: 

-

958 self._restricted_api() 

-

959 target_type = type_mapping.target_type 

-

960 mapped_types = self._feature_set.mapped_types 

-

961 existing = mapped_types.get(target_type) 

-

962 if existing is not None: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true

-

963 if existing.plugin_metadata.plugin_name != self._plugin_name: 

-

964 message = ( 

-

965 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

-

966 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

-

967 ) 

-

968 else: 

-

969 message = ( 

-

970 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

-

971 f' key "{target_type.__name__}" twice for known packaging files.' 

-

972 ) 

-

973 raise PluginConflictError( 

-

974 message, existing.plugin_metadata, self._plugin_metadata 

-

975 ) 

-

976 parser_generator = self._feature_set.manifest_parser_generator 

-

977 mapped_types[target_type] = PluginProvidedTypeMapping( 

-

978 type_mapping, reference_documentation, self._plugin_metadata 

-

979 ) 

-

980 parser_generator.register_mapped_type(type_mapping) 

-

981 

-

982 def _restricted_api( 

-

983 self, 

-

984 *, 

-

985 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(), 

-

986 ) -> None: 

-

987 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true

-

988 raise PluginAPIViolationError( 

-

989 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

-

990 " If you are the maintainer of this plugin and want access to this" 

-

991 " API, please file a feature request to make this public." 

-

992 " (The API is currently private as it is unstable.)" 

-

993 ) 

-

994 

-

995 

-

996class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

-

997 __slots__ = () 

-

998 

-

999 def _append_script( 

-

1000 self, 

-

1001 caller_name: str, 

-

1002 maintscript: Maintscript, 

-

1003 full_script: str, 

-

1004 /, 

-

1005 perform_substitution: bool = True, 

-

1006 ) -> None: 

-

1007 raise NotImplementedError 

-

1008 

-

1009 @classmethod 

-

1010 def _apply_condition_to_script( 

-

1011 cls, 

-

1012 condition: str, 

-

1013 run_snippet: str, 

-

1014 /, 

-

1015 indent: Optional[bool] = None, 

-

1016 ) -> str: 

-

1017 if indent is None: 

-

1018 # We auto-determine this based on heredocs currently 

-

1019 indent = "<<" not in run_snippet 

-

1020 

-

1021 if indent: 

-

1022 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

-

1023 if not run_snippet.endswith("\n"): 

-

1024 run_snippet += "\n" 

-

1025 condition_line = f"if {condition}; then\n" 

-

1026 end_line = "fi\n" 

-

1027 return "".join((condition_line, run_snippet, end_line)) 

-

1028 

-

1029 def on_configure( 

-

1030 self, 

-

1031 run_snippet: str, 

-

1032 /, 

-

1033 indent: Optional[bool] = None, 

-

1034 perform_substitution: bool = True, 

-

1035 skip_on_rollback: bool = False, 

-

1036 ) -> None: 

-

1037 condition = POSTINST_DEFAULT_CONDITION 

-

1038 if skip_on_rollback: 1038 ↛ 1039line 1038 didn't jump to line 1039, because the condition on line 1038 was never true

-

1039 condition = '[ "$1" = "configure" ]' 

-

1040 return self._append_script( 

-

1041 "on_configure", 

-

1042 "postinst", 

-

1043 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1044 perform_substitution=perform_substitution, 

-

1045 ) 

-

1046 

-

1047 def on_initial_install( 

-

1048 self, 

-

1049 run_snippet: str, 

-

1050 /, 

-

1051 indent: Optional[bool] = None, 

-

1052 perform_substitution: bool = True, 

-

1053 ) -> None: 

-

1054 condition = '[ "$1" = "configure" -a -z "$2" ]' 

-

1055 return self._append_script( 

-

1056 "on_initial_install", 

-

1057 "postinst", 

-

1058 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1059 perform_substitution=perform_substitution, 

-

1060 ) 

-

1061 

-

1062 def on_upgrade( 

-

1063 self, 

-

1064 run_snippet: str, 

-

1065 /, 

-

1066 indent: Optional[bool] = None, 

-

1067 perform_substitution: bool = True, 

-

1068 ) -> None: 

-

1069 condition = '[ "$1" = "configure" -a -n "$2" ]' 

-

1070 return self._append_script( 

-

1071 "on_upgrade", 

-

1072 "postinst", 

-

1073 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1074 perform_substitution=perform_substitution, 

-

1075 ) 

-

1076 

-

1077 def on_upgrade_from( 

-

1078 self, 

-

1079 version: str, 

-

1080 run_snippet: str, 

-

1081 /, 

-

1082 indent: Optional[bool] = None, 

-

1083 perform_substitution: bool = True, 

-

1084 ) -> None: 

-

1085 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

-

1086 return self._append_script( 

-

1087 "on_upgrade_from", 

-

1088 "postinst", 

-

1089 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1090 perform_substitution=perform_substitution, 

-

1091 ) 

-

1092 

-

1093 def on_before_removal( 

-

1094 self, 

-

1095 run_snippet: str, 

-

1096 /, 

-

1097 indent: Optional[bool] = None, 

-

1098 perform_substitution: bool = True, 

-

1099 ) -> None: 

-

1100 condition = '[ "$1" = "remove" ]' 

-

1101 return self._append_script( 

-

1102 "on_before_removal", 

-

1103 "prerm", 

-

1104 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1105 perform_substitution=perform_substitution, 

-

1106 ) 

-

1107 

-

1108 def on_removed( 

-

1109 self, 

-

1110 run_snippet: str, 

-

1111 /, 

-

1112 indent: Optional[bool] = None, 

-

1113 perform_substitution: bool = True, 

-

1114 ) -> None: 

-

1115 condition = '[ "$1" = "remove" ]' 

-

1116 return self._append_script( 

-

1117 "on_removed", 

-

1118 "postrm", 

-

1119 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1120 perform_substitution=perform_substitution, 

-

1121 ) 

-

1122 

-

1123 def on_purge( 

-

1124 self, 

-

1125 run_snippet: str, 

-

1126 /, 

-

1127 indent: Optional[bool] = None, 

-

1128 perform_substitution: bool = True, 

-

1129 ) -> None: 

-

1130 condition = '[ "$1" = "purge" ]' 

-

1131 return self._append_script( 

-

1132 "on_purge", 

-

1133 "postrm", 

-

1134 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

-

1135 perform_substitution=perform_substitution, 

-

1136 ) 

-

1137 

-

1138 def unconditionally_in_script( 

-

1139 self, 

-

1140 maintscript: Maintscript, 

-

1141 run_snippet: str, 

-

1142 /, 

-

1143 perform_substitution: bool = True, 

-

1144 ) -> None: 

-

1145 if maintscript not in STD_CONTROL_SCRIPTS: 1145 ↛ 1146line 1145 didn't jump to line 1146, because the condition on line 1145 was never true

-

1146 raise ValueError( 

-

1147 f'Unknown script "{maintscript}". Should have been one of:' 

-

1148 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

-

1149 ) 

-

1150 return self._append_script( 

-

1151 "unconditionally_in_script", 

-

1152 maintscript, 

-

1153 run_snippet, 

-

1154 perform_substitution=perform_substitution, 

-

1155 ) 

-

1156 

-

1157 

-

1158class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

-

1159 __slots__ = ( 

-

1160 "_plugin_metadata", 

-

1161 "_maintscript_snippets", 

-

1162 "_plugin_source_id", 

-

1163 "_package_substitution", 

-

1164 "_default_snippet_order", 

-

1165 ) 

-

1166 

-

1167 def __init__( 

-

1168 self, 

-

1169 plugin_metadata: DebputyPluginMetadata, 

-

1170 plugin_source_id: str, 

-

1171 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

-

1172 package_substitution: Substitution, 

-

1173 *, 

-

1174 default_snippet_order: Optional[Literal["service"]] = None, 

-

1175 ): 

-

1176 self._plugin_metadata = plugin_metadata 

-

1177 self._plugin_source_id = plugin_source_id 

-

1178 self._maintscript_snippets = maintscript_snippets 

-

1179 self._package_substitution = package_substitution 

-

1180 self._default_snippet_order = default_snippet_order 

-

1181 

-

1182 def _append_script( 

-

1183 self, 

-

1184 caller_name: str, 

-

1185 maintscript: Maintscript, 

-

1186 full_script: str, 

-

1187 /, 

-

1188 perform_substitution: bool = True, 

-

1189 ) -> None: 

-

1190 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

-

1191 if perform_substitution: 

-

1192 full_script = self._package_substitution.substitute(full_script, def_source) 

-

1193 

-

1194 snippet = MaintscriptSnippet( 

-

1195 snippet=full_script, 

-

1196 definition_source=def_source, 

-

1197 snippet_order=self._default_snippet_order, 

-

1198 ) 

-

1199 self._maintscript_snippets[maintscript].append(snippet) 

-

1200 

-

1201 

-

1202class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

-

1203 __slots__ = ( 

-

1204 "_plugin_metadata", 

-

1205 "_plugin_source_id", 

-

1206 "_package_metadata_context", 

-

1207 "_triggers", 

-

1208 "_substvars", 

-

1209 "_maintscript", 

-

1210 "_shlibs_details", 

-

1211 ) 

-

1212 

-

1213 def __init__( 

-

1214 self, 

-

1215 plugin_metadata: DebputyPluginMetadata, 

-

1216 plugin_source_id: str, 

-

1217 package_metadata_context: PackageProcessingContext, 

-

1218 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

-

1219 substvars: FlushableSubstvars, 

-

1220 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

-

1221 ) -> None: 

-

1222 self._plugin_metadata = plugin_metadata 

-

1223 self._plugin_source_id = plugin_source_id 

-

1224 self._package_metadata_context = package_metadata_context 

-

1225 self._triggers = triggers 

-

1226 self._substvars = substvars 

-

1227 self._maintscript: Optional[MaintscriptAccessor] = None 

-

1228 self._shlibs_details = shlibs_details 

-

1229 

-

1230 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

-

1231 raise NotImplementedError 

-

1232 

-

1233 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

-

1234 """Register a declarative dpkg level trigger 

-

1235 

-

1236 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

-

1237 

-

1238 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

-

1239 """ 

-

1240 key = (trigger_type, trigger_target) 

-

1241 if key in self._triggers: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true

-

1242 return 

-

1243 self._triggers[key] = PluginProvidedTrigger( 

-

1244 dpkg_trigger_type=trigger_type, 

-

1245 dpkg_trigger_target=trigger_target, 

-

1246 provider=self._plugin_metadata, 

-

1247 provider_source_id=self._plugin_source_id, 

-

1248 ) 

-

1249 

-

1250 @property 

-

1251 def maintscript(self) -> MaintscriptAccessor: 

-

1252 maintscript = self._maintscript 

-

1253 if maintscript is None: 

-

1254 maintscript = self._create_maintscript_accessor() 

-

1255 self._maintscript = maintscript 

-

1256 return maintscript 

-

1257 

-

1258 @property 

-

1259 def substvars(self) -> FlushableSubstvars: 

-

1260 return self._substvars 

-

1261 

-

1262 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

-

1263 binary_package = self._package_metadata_context.binary_package 

-

1264 with self.substvars.flush() as substvars_file: 

-

1265 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

-

1266 if binary_package.is_udeb: 

-

1267 dpkg_cmd.append("-tudeb") 

-

1268 if binary_package.is_essential: 1268 ↛ 1269line 1268 didn't jump to line 1269, because the condition on line 1268 was never true

-

1269 dpkg_cmd.append("-dPre-Depends") 

-

1270 shlibs_local, shlib_dirs = self._shlibs_details 

-

1271 if shlibs_local is not None: 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true

-

1272 dpkg_cmd.append(f"-L{shlibs_local}") 

-

1273 if shlib_dirs: 1273 ↛ 1274line 1273 didn't jump to line 1274, because the condition on line 1273 was never true

-

1274 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

-

1275 dpkg_cmd.extend(p.fs_path for p in paths) 

-

1276 print_command(*dpkg_cmd) 

-

1277 try: 

-

1278 subprocess.check_call(dpkg_cmd) 

-

1279 except subprocess.CalledProcessError: 

-

1280 _error( 

-

1281 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

-

1282 " review the output from dpkg-shlibdeps above to understand what went wrong." 

-

1283 ) 

-

1284 

-

1285 

-

1286class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

-

1287 __slots__ = ( 

-

1288 "_maintscript", 

-

1289 "_maintscript_snippets", 

-

1290 "_package_substitution", 

-

1291 ) 

-

1292 

-

1293 def __init__( 

-

1294 self, 

-

1295 plugin_metadata: DebputyPluginMetadata, 

-

1296 plugin_source_id: str, 

-

1297 package_metadata_context: PackageProcessingContext, 

-

1298 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

-

1299 substvars: FlushableSubstvars, 

-

1300 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

-

1301 package_substitution: Substitution, 

-

1302 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

-

1303 *, 

-

1304 default_snippet_order: Optional[Literal["service"]] = None, 

-

1305 ) -> None: 

-

1306 super().__init__( 

-

1307 plugin_metadata, 

-

1308 plugin_source_id, 

-

1309 package_metadata_context, 

-

1310 triggers, 

-

1311 substvars, 

-

1312 shlibs_details, 

-

1313 ) 

-

1314 self._maintscript_snippets = maintscript_snippets 

-

1315 self._package_substitution = package_substitution 

-

1316 self._maintscript = MaintscriptAccessorProvider( 

-

1317 plugin_metadata, 

-

1318 plugin_source_id, 

-

1319 maintscript_snippets, 

-

1320 package_substitution, 

-

1321 default_snippet_order=default_snippet_order, 

-

1322 ) 

-

1323 

-

1324 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

-

1325 return MaintscriptAccessorProvider( 

-

1326 self._plugin_metadata, 

-

1327 self._plugin_source_id, 

-

1328 self._maintscript_snippets, 

-

1329 self._package_substitution, 

-

1330 ) 

-

1331 

-

1332 

-

1333class BinaryCtrlAccessorProviderCreator: 

-

1334 def __init__( 

-

1335 self, 

-

1336 package_metadata_context: PackageProcessingContext, 

-

1337 substvars: FlushableSubstvars, 

-

1338 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

-

1339 substitution: Substitution, 

-

1340 ) -> None: 

-

1341 self._package_metadata_context = package_metadata_context 

-

1342 self._substvars = substvars 

-

1343 self._maintscript_snippets = maintscript_snippets 

-

1344 self._substitution = substitution 

-

1345 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

-

1346 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None 

-

1347 

-

1348 def for_plugin( 

-

1349 self, 

-

1350 plugin_metadata: DebputyPluginMetadata, 

-

1351 plugin_source_id: str, 

-

1352 *, 

-

1353 default_snippet_order: Optional[Literal["service"]] = None, 

-

1354 ) -> BinaryCtrlAccessor: 

-

1355 return BinaryCtrlAccessorProvider( 

-

1356 plugin_metadata, 

-

1357 plugin_source_id, 

-

1358 self._package_metadata_context, 

-

1359 self._triggers, 

-

1360 self._substvars, 

-

1361 self._maintscript_snippets, 

-

1362 self._substitution, 

-

1363 self.shlibs_details, 

-

1364 default_snippet_order=default_snippet_order, 

-

1365 ) 

-

1366 

-

1367 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

-

1368 return self._triggers.values() 

-

1369 

-

1370 

-

1371def plugin_metadata_for_debputys_own_plugin( 

-

1372 loader: Optional[PluginInitializationEntryPoint] = None, 

-

1373) -> DebputyPluginMetadata: 

-

1374 if loader is None: 

-

1375 from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

-

1376 

-

1377 loader = initialize_debputy_features 

-

1378 return DebputyPluginMetadata( 

-

1379 plugin_name="debputy", 

-

1380 api_compat_version=1, 

-

1381 plugin_initializer=loader, 

-

1382 plugin_loader=None, 

-

1383 plugin_path="<bundled>", 

-

1384 ) 

-

1385 

-

1386 

-

1387def load_plugin_features( 

-

1388 plugin_search_dirs: Sequence[str], 

-

1389 substitution: Substitution, 

-

1390 requested_plugins_only: Optional[Sequence[str]] = None, 

-

1391 required_plugins: Optional[Set[str]] = None, 

-

1392 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None, 

-

1393 debug_mode: bool = False, 

-

1394) -> PluginProvidedFeatureSet: 

-

1395 if plugin_feature_set is None: 

-

1396 plugin_feature_set = PluginProvidedFeatureSet() 

-

1397 plugins = [plugin_metadata_for_debputys_own_plugin()] 

-

1398 unloadable_plugins = set() 

-

1399 if required_plugins: 

-

1400 plugins.extend( 

-

1401 find_json_plugins( 

-

1402 plugin_search_dirs, 

-

1403 required_plugins, 

-

1404 ) 

-

1405 ) 

-

1406 if requested_plugins_only is not None: 

-

1407 plugins.extend( 

-

1408 find_json_plugins( 

-

1409 plugin_search_dirs, 

-

1410 requested_plugins_only, 

-

1411 ) 

-

1412 ) 

-

1413 else: 

-

1414 auto_loaded = _find_all_json_plugins( 

-

1415 plugin_search_dirs, 

-

1416 required_plugins if required_plugins is not None else frozenset(), 

-

1417 debug_mode=debug_mode, 

-

1418 ) 

-

1419 for plugin_metadata in auto_loaded: 

-

1420 plugins.append(plugin_metadata) 

-

1421 unloadable_plugins.add(plugin_metadata.plugin_name) 

-

1422 

-

1423 for plugin_metadata in plugins: 

-

1424 api = DebputyPluginInitializerProvider( 

-

1425 plugin_metadata, plugin_feature_set, substitution 

-

1426 ) 

-

1427 try: 

-

1428 api.load_plugin() 

-

1429 except PluginBaseError as e: 

-

1430 if plugin_metadata.plugin_name not in unloadable_plugins: 

-

1431 raise 

-

1432 if debug_mode: 

-

1433 raise 

-

1434 try: 

-

1435 api.unload_plugin() 

-

1436 except Exception: 

-

1437 _warn( 

-

1438 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

-

1439 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

-

1440 " module might have tainted the feature set." 

-

1441 ) 

-

1442 raise e from None 

-

1443 else: 

-

1444 if debug_mode: 

-

1445 _warn( 

-

1446 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

-

1447 f" to --debug/-d." 

-

1448 ) 

-

1449 _warn( 

-

1450 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

-

1451 f" deactivated. Use debug mode (--debug) to show the stacktrace (the warning will become an error)" 

-

1452 ) 

-

1453 

-

1454 return plugin_feature_set 

-

1455 

-

1456 

-

1457def find_json_plugin( 

-

1458 search_dirs: Sequence[str], 

-

1459 requested_plugin: str, 

-

1460) -> DebputyPluginMetadata: 

-

1461 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

-

1462 assert len(r) == 1 

-

1463 return r[0] 

-

1464 

-

1465 

-

1466def find_related_implementation_files_for_plugin( 

-

1467 plugin_metadata: DebputyPluginMetadata, 

-

1468) -> List[str]: 

-

1469 plugin_path = plugin_metadata.plugin_path 

-

1470 if not os.path.isfile(plugin_path): 

-

1471 plugin_name = plugin_metadata.plugin_name 

-

1472 _error( 

-

1473 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

-

1474 " or loaded via a mechanism that does not support detecting its tests." 

-

1475 ) 

-

1476 files = [] 

-

1477 module_name, module_file = _find_plugin_implementation_file( 

-

1478 plugin_metadata.plugin_name, 

-

1479 plugin_metadata.plugin_path, 

-

1480 ) 

-

1481 if os.path.isfile(module_file): 

-

1482 files.append(module_file) 

-

1483 else: 

-

1484 if not plugin_metadata.is_loaded: 

-

1485 plugin_metadata.load_plugin() 

-

1486 if module_name in sys.modules: 

-

1487 _error( 

-

1488 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

-

1489 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

-

1490 f" installed via this method. The related Python would not be installed" 

-

1491 f" (which would result in a plugin that would fail to load)" 

-

1492 ) 

-

1493 

-

1494 return files 

-

1495 

-

1496 

-

1497def find_tests_for_plugin( 

-

1498 plugin_metadata: DebputyPluginMetadata, 

-

1499) -> List[str]: 

-

1500 plugin_name = plugin_metadata.plugin_name 

-

1501 plugin_path = plugin_metadata.plugin_path 

-

1502 

-

1503 if not os.path.isfile(plugin_path): 

-

1504 _error( 

-

1505 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

-

1506 " mechanism that does not support detecting its tests." 

-

1507 ) 

-

1508 

-

1509 plugin_dir = os.path.dirname(plugin_path) 

-

1510 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

-

1511 tests = [] 

-

1512 with os.scandir(plugin_dir) as dir_iter: 

-

1513 for p in dir_iter: 

-

1514 if ( 

-

1515 p.is_file() 

-

1516 and p.name.startswith(test_basename_prefix) 

-

1517 and PLUGIN_TEST_SUFFIX.search(p.name) 

-

1518 ): 

-

1519 tests.append(p.path) 

-

1520 return tests 

-

1521 

-

1522 

-

1523def find_json_plugins( 

-

1524 search_dirs: Sequence[str], 

-

1525 requested_plugins: Iterable[str], 

-

1526) -> Iterable[DebputyPluginMetadata]: 

-

1527 for plugin_name_or_path in requested_plugins: 

-

1528 found = False 

-

1529 if "/" in plugin_name_or_path: 1529 ↛ 1530line 1529 didn't jump to line 1530, because the condition on line 1529 was never true

-

1530 if not os.path.isfile(plugin_name_or_path): 

-

1531 raise PluginNotFoundError( 

-

1532 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

-

1533 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

-

1534 " is not used." 

-

1535 ) 

-

1536 yield parse_json_plugin_desc(plugin_name_or_path) 

-

1537 return 

-

1538 for search_dir in search_dirs: 

-

1539 path = os.path.join( 

-

1540 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

-

1541 ) 

-

1542 if not os.path.isfile(path): 1542 ↛ 1543line 1542 didn't jump to line 1543, because the condition on line 1542 was never true

-

1543 continue 

-

1544 found = True 

-

1545 yield parse_json_plugin_desc(path) 

-

1546 if not found: 1546 ↛ 1547line 1546 didn't jump to line 1547, because the condition on line 1546 was never true

-

1547 search_dir_str = ":".join(search_dirs) 

-

1548 raise PluginNotFoundError( 

-

1549 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

-

1550 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

-

1551 ) 

-

1552 

-

1553 

-

1554def _find_all_json_plugins( 

-

1555 search_dirs: Sequence[str], 

-

1556 required_plugins: AbstractSet[str], 

-

1557 debug_mode: bool = False, 

-

1558) -> Iterable[DebputyPluginMetadata]: 

-

1559 seen = set(required_plugins) 

-

1560 error_seen = False 

-

1561 for search_dir in search_dirs: 

-

1562 try: 

-

1563 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

-

1564 except FileNotFoundError: 

-

1565 continue 

-

1566 with dir_fd: 

-

1567 for entry in dir_fd: 

-

1568 if ( 

-

1569 not entry.is_file(follow_symlinks=True) 

-

1570 or not entry.name.endswith(".json") 

-

1571 or entry.name in seen 

-

1572 ): 

-

1573 continue 

-

1574 try: 

-

1575 plugin_metadata = parse_json_plugin_desc(entry.path) 

-

1576 except PluginBaseError as e: 

-

1577 if debug_mode: 

-

1578 raise 

-

1579 if not error_seen: 

-

1580 error_seen = True 

-

1581 _warn( 

-

1582 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

-

1583 ) 

-

1584 else: 

-

1585 _warn( 

-

1586 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

-

1587 ) 

-

1588 else: 

-

1589 yield plugin_metadata 

-

1590 

-

1591 

-

1592def _find_plugin_implementation_file( 

-

1593 plugin_name: str, 

-

1594 json_file_path: str, 

-

1595) -> Tuple[str, str]: 

-

1596 guessed_module_basename = plugin_name.replace("-", "_") 

-

1597 module_name = f"debputy.plugin.{guessed_module_basename}" 

-

1598 module_fs_path = os.path.join( 

-

1599 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

-

1600 ) 

-

1601 return module_name, module_fs_path 

-

1602 

-

1603 

-

1604def _resolve_module_initializer( 

-

1605 plugin_name: str, 

-

1606 plugin_initializer_name: str, 

-

1607 module_name: Optional[str], 

-

1608 json_file_path: str, 

-

1609) -> PluginInitializationEntryPoint: 

-

1610 module = None 

-

1611 module_fs_path = None 

-

1612 if module_name is None: 1612 ↛ 1640line 1612 didn't jump to line 1640, because the condition on line 1612 was never false

-

1613 module_name, module_fs_path = _find_plugin_implementation_file( 

-

1614 plugin_name, json_file_path 

-

1615 ) 

-

1616 if os.path.isfile(module_fs_path): 1616 ↛ 1640line 1616 didn't jump to line 1640, because the condition on line 1616 was never false

-

1617 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

-

1618 if spec is None: 1618 ↛ 1619line 1618 didn't jump to line 1619, because the condition on line 1618 was never true

-

1619 raise PluginInitializationError( 

-

1620 f"Failed to load {plugin_name} (path: {module_fs_path})." 

-

1621 " The spec_from_file_location function returned None." 

-

1622 ) 

-

1623 mod = importlib.util.module_from_spec(spec) 

-

1624 loader = spec.loader 

-

1625 if loader is None: 1625 ↛ 1626line 1625 didn't jump to line 1626, because the condition on line 1625 was never true

-

1626 raise PluginInitializationError( 

-

1627 f"Failed to load {plugin_name} (path: {module_fs_path})." 

-

1628 " Python could not find a suitable loader (spec.loader was None)" 

-

1629 ) 

-

1630 sys.modules[module_name] = mod 

-

1631 try: 

-

1632 loader.exec_module(mod) 

-

1633 except (Exception, GeneratorExit) as e: 

-

1634 raise PluginInitializationError( 

-

1635 f"Failed to load {plugin_name} (path: {module_fs_path})." 

-

1636 " The module threw an exception while being loaded." 

-

1637 ) from e 

-

1638 module = mod 

-

1639 

-

1640 if module is None: 1640 ↛ 1641line 1640 didn't jump to line 1641, because the condition on line 1640 was never true

-

1641 try: 

-

1642 module = importlib.import_module(module_name) 

-

1643 except ModuleNotFoundError as e: 

-

1644 if module_fs_path is None: 

-

1645 raise PluginMetadataError( 

-

1646 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

-

1647 " this module is not available in the python search path" 

-

1648 ) from e 

-

1649 raise PluginInitializationError( 

-

1650 f"Failed to load {plugin_name}. Tried loading it from" 

-

1651 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

-

1652 f" {module_name} (where it was not found either). Please ensure" 

-

1653 " the module code is installed in the correct spot or provide an" 

-

1654 f' explicit "module" definition in {json_file_path}.' 

-

1655 ) from e 

-

1656 

-

1657 plugin_initializer = getattr(module, plugin_initializer_name) 

-

1658 

-

1659 if plugin_initializer is None: 1659 ↛ 1660line 1659 didn't jump to line 1660, because the condition on line 1659 was never true

-

1660 raise PluginMetadataError( 

-

1661 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

-

1662 f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin" 

-

1663 f" metadata or initializer name in the Python module." 

-

1664 ) 

-

1665 return cast("PluginInitializationEntryPoint", plugin_initializer) 

-

1666 

-

1667 

-

1668def _json_plugin_loader( 

-

1669 plugin_name: str, 

-

1670 plugin_json_metadata: PluginJsonMetadata, 

-

1671 json_file_path: str, 

-

1672 attribute_path: AttributePath, 

-

1673) -> Callable[["DebputyPluginInitializer"], None]: 

-

1674 api_compat = plugin_json_metadata["api_compat_version"] 

-

1675 module_name = plugin_json_metadata.get("module") 

-

1676 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

-

1677 packager_provided_files_raw = plugin_json_metadata.get( 

-

1678 "packager_provided_files", [] 

-

1679 ) 

-

1680 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

-

1681 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

-

1682 if api_compat != 1: 1682 ↛ 1683line 1682 didn't jump to line 1683, because the condition on line 1682 was never true

-

1683 raise PluginMetadataError( 

-

1684 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

-

1685 f" version of debputy only supports API compat version of 1" 

-

1686 ) 

-

1687 if plugin_initializer_name is not None and "." in plugin_initializer_name: 1687 ↛ 1688line 1687 didn't jump to line 1688, because the condition on line 1687 was never true

-

1688 p = attribute_path["plugin_initializer"] 

-

1689 raise PluginMetadataError( 

-

1690 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

-

1691 ) 

-

1692 

-

1693 plugin_initializers = [] 

-

1694 

-

1695 if plugin_initializer_name is not None: 

-

1696 plugin_initializer = _resolve_module_initializer( 

-

1697 plugin_name, 

-

1698 plugin_initializer_name, 

-

1699 module_name, 

-

1700 json_file_path, 

-

1701 ) 

-

1702 plugin_initializers.append(plugin_initializer) 

-

1703 

-

1704 if known_packaging_files_raw: 1704 ↛ 1705line 1704 didn't jump to line 1705, because the condition on line 1704 was never true

-

1705 kpf_root_path = attribute_path["known_packaging_files"] 

-

1706 known_packaging_files = [] 

-

1707 for k, v in enumerate(known_packaging_files_raw): 

-

1708 kpf_path = kpf_root_path[k] 

-

1709 p = v.get("path") 

-

1710 if isinstance(p, str): 

-

1711 kpf_path.path_hint = p 

-

1712 if plugin_name.startswith("debputy-") and isinstance(v, dict): 

-

1713 docs = v.get("documentation-uris") 

-

1714 if docs is not None and isinstance(docs, list): 

-

1715 docs = [ 

-

1716 ( 

-

1717 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

-

1718 if isinstance(d, str) 

-

1719 else d 

-

1720 ) 

-

1721 for d in docs 

-

1722 ] 

-

1723 v["documentation-uris"] = docs 

-

1724 known_packaging_file: KnownPackagingFileInfo = ( 

-

1725 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

-

1726 v, 

-

1727 kpf_path, 

-

1728 ) 

-

1729 ) 

-

1730 known_packaging_files.append((kpf_path, known_packaging_file)) 

-

1731 

-

1732 def _initialize_json_provided_known_packaging_files( 

-

1733 api: DebputyPluginInitializerProvider, 

-

1734 ) -> None: 

-

1735 for p, details in known_packaging_files: 

-

1736 try: 

-

1737 api.known_packaging_files(details) 

-

1738 except ValueError as ex: 

-

1739 raise PluginMetadataError( 

-

1740 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

-

1741 ) 

-

1742 

-

1743 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

-

1744 

-

1745 if manifest_variables_raw: 

-

1746 manifest_var_path = attribute_path["manifest_variables"] 

-

1747 manifest_variables = [ 

-

1748 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

-

1749 for i, p in enumerate(manifest_variables_raw) 

-

1750 ] 

-

1751 

-

1752 def _initialize_json_provided_manifest_vars( 

-

1753 api: DebputyPluginInitializer, 

-

1754 ) -> None: 

-

1755 for idx, manifest_variable in enumerate(manifest_variables): 

-

1756 name = manifest_variable["name"] 

-

1757 value = manifest_variable["value"] 

-

1758 doc = manifest_variable.get("reference_documentation") 

-

1759 try: 

-

1760 api.manifest_variable( 

-

1761 name, value, variable_reference_documentation=doc 

-

1762 ) 

-

1763 except ValueError as ex: 

-

1764 var_path = manifest_var_path[idx] 

-

1765 raise PluginMetadataError( 

-

1766 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

-

1767 ) 

-

1768 

-

1769 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

-

1770 

-

1771 if packager_provided_files_raw: 

-

1772 ppf_path = attribute_path["packager_provided_files"] 

-

1773 ppfs = [ 

-

1774 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

-

1775 for i, p in enumerate(packager_provided_files_raw) 

-

1776 ] 

-

1777 

-

1778 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

-

1779 ppf: PackagerProvidedFileJsonDescription 

-

1780 for idx, ppf in enumerate(ppfs): 

-

1781 c = dict(ppf) 

-

1782 stem = ppf["stem"] 

-

1783 installed_path = ppf["installed_path"] 

-

1784 default_mode = ppf.get("default_mode") 

-

1785 ref_doc_dict = ppf.get("reference_documentation") 

-

1786 if default_mode is not None: 1786 ↛ 1789line 1786 didn't jump to line 1789, because the condition on line 1786 was never false

-

1787 c["default_mode"] = default_mode.octal_mode 

-

1788 

-

1789 if ref_doc_dict is not None: 1789 ↛ 1794line 1789 didn't jump to line 1794, because the condition on line 1789 was never false

-

1790 ref_doc = packager_provided_file_reference_documentation( 

-

1791 **ref_doc_dict 

-

1792 ) 

-

1793 else: 

-

1794 ref_doc = None 

-

1795 

-

1796 for k in [ 

-

1797 "stem", 

-

1798 "installed_path", 

-

1799 "reference_documentation", 

-

1800 ]: 

-

1801 try: 

-

1802 del c[k] 

-

1803 except KeyError: 

-

1804 pass 

-

1805 

-

1806 try: 

-

1807 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

-

1808 except ValueError as ex: 

-

1809 p_path = ppf_path[idx] 

-

1810 raise PluginMetadataError( 

-

1811 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

-

1812 ) 

-

1813 

-

1814 plugin_initializers.append(_initialize_json_provided_ppfs) 

-

1815 

-

1816 if not plugin_initializers: 1816 ↛ 1817line 1816 didn't jump to line 1817, because the condition on line 1816 was never true

-

1817 raise PluginMetadataError( 

-

1818 f"The plugin defined in {json_file_path} does not seem to provide features, " 

-

1819 f" such as module + plugin-initializer or packager-provided-files." 

-

1820 ) 

-

1821 

-

1822 if len(plugin_initializers) == 1: 

-

1823 return plugin_initializers[0] 

-

1824 

-

1825 def _chain_loader(api: DebputyPluginInitializer) -> None: 

-

1826 for initializer in plugin_initializers: 

-

1827 initializer(api) 

-

1828 

-

1829 return _chain_loader 

-

1830 

-

1831 

-

1832@contextlib.contextmanager 

-

1833def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]: 

-

1834 if fd is not None: 

-

1835 yield fd 

-

1836 else: 

-

1837 with open(path, "rb") as fd: 

-

1838 yield fd 

-

1839 

-

1840 

-

1841def parse_json_plugin_desc( 

-

1842 path: str, *, fd: Optional[IO[bytes]] = None 

-

1843) -> DebputyPluginMetadata: 

-

1844 with _open(path, fd=fd) as rfd: 

-

1845 try: 

-

1846 raw = json.load(rfd) 

-

1847 except JSONDecodeError as e: 

-

1848 raise PluginMetadataError( 

-

1849 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

-

1850 ) from e 

-

1851 plugin_name = os.path.basename(path) 

-

1852 if plugin_name.endswith(".json"): 

-

1853 plugin_name = plugin_name[:-5] 

-

1854 elif plugin_name.endswith(".json.in"): 

-

1855 plugin_name = plugin_name[:-8] 

-

1856 

-

1857 if plugin_name == "debputy": 1857 ↛ 1859line 1857 didn't jump to line 1859, because the condition on line 1857 was never true

-

1858 # Provide a better error message than "The plugin has already loaded!?" 

-

1859 raise PluginMetadataError( 

-

1860 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

-

1861 f" clash with the bundled plugin of same name." 

-

1862 ) 

-

1863 

-

1864 attribute_path = AttributePath.root_path() 

-

1865 

-

1866 try: 

-

1867 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

-

1868 raw, 

-

1869 attribute_path, 

-

1870 ) 

-

1871 except ManifestParseException as e: 

-

1872 raise PluginMetadataError( 

-

1873 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

-

1874 ) from e 

-

1875 api_compat = plugin_json_metadata["api_compat_version"] 

-

1876 

-

1877 return DebputyPluginMetadata( 

-

1878 plugin_name=plugin_name, 

-

1879 plugin_loader=lambda: _json_plugin_loader( 

-

1880 plugin_name, 

-

1881 plugin_json_metadata, 

-

1882 path, 

-

1883 attribute_path, 

-

1884 ), 

-

1885 api_compat_version=api_compat, 

-

1886 plugin_initializer=None, 

-

1887 plugin_path=path, 

-

1888 ) 

-

1889 

-

1890 

-

1891@dataclasses.dataclass(slots=True, frozen=True) 

-

1892class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

-

1893 name: str 

-

1894 names: Sequence[str] 

-

1895 path: VirtualPath 

-

1896 type_of_service: str 

-

1897 service_scope: str 

-

1898 auto_enable_on_install: bool 

-

1899 auto_start_on_install: bool 

-

1900 on_upgrade: ServiceUpgradeRule 

-

1901 definition_source: str 

-

1902 is_plugin_provided_definition: bool 

-

1903 service_context: Optional[DSD] 

-

1904 

-

1905 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

-

1906 return dataclasses.replace(self, **changes) 

-

1907 

-

1908 

-

1909class ServiceRegistryImpl(ServiceRegistry[DSD]): 

-

1910 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

-

1911 

-

1912 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

-

1913 self._service_manager_details = service_manager_details 

-

1914 self._service_definitions: List[ServiceDefinition[DSD]] = [] 

-

1915 self._seen_services = set() 

-

1916 

-

1917 @property 

-

1918 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

-

1919 return self._service_definitions 

-

1920 

-

1921 def register_service( 

-

1922 self, 

-

1923 path: VirtualPath, 

-

1924 name: Union[str, List[str]], 

-

1925 *, 

-

1926 type_of_service: str = "service", # "timer", etc. 

-

1927 service_scope: str = "system", 

-

1928 enable_by_default: bool = True, 

-

1929 start_by_default: bool = True, 

-

1930 default_upgrade_rule: ServiceUpgradeRule = "restart", 

-

1931 service_context: Optional[DSD] = None, 

-

1932 ) -> None: 

-

1933 names = name if isinstance(name, list) else [name] 

-

1934 if len(names) < 1: 

-

1935 raise ValueError( 

-

1936 f"The service must have at least one name - {path.absolute} did not have any" 

-

1937 ) 

-

1938 for n in names: 

-

1939 key = (n, type_of_service, service_scope) 

-

1940 if key in self._seen_services: 

-

1941 raise PluginAPIViolationError( 

-

1942 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

-

1943 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

-

1944 " allowed by the debputy plugin API." 

-

1945 ) 

-

1946 # TODO: We cannot create a service definition immediate once the manifest is involved 

-

1947 self._service_definitions.append( 

-

1948 ServiceDefinitionImpl( 

-

1949 names[0], 

-

1950 names, 

-

1951 path, 

-

1952 type_of_service, 

-

1953 service_scope, 

-

1954 enable_by_default, 

-

1955 start_by_default, 

-

1956 default_upgrade_rule, 

-

1957 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

-

1958 True, 

-

1959 service_context, 

-

1960 ) 

-

1961 ) 

-
- - - -- cgit v1.2.3