From 11180d7be49c1912b7bdcba23d932747041e5b5d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 14 Apr 2024 22:18:23 +0200 Subject: Merging upstream version 0.1.28. Signed-off-by: Daniel Baumann --- coverage-report/d_64287305fe0c6642_impl_py.html | 2060 +++++++++++++++++++++++ 1 file changed, 2060 insertions(+) create mode 100644 coverage-report/d_64287305fe0c6642_impl_py.html (limited to 'coverage-report/d_64287305fe0c6642_impl_py.html') diff --git a/coverage-report/d_64287305fe0c6642_impl_py.html b/coverage-report/d_64287305fe0c6642_impl_py.html new file mode 100644 index 0000000..c2c961d --- /dev/null +++ b/coverage-report/d_64287305fe0c6642_impl_py.html @@ -0,0 +1,2060 @@ + + + + + Coverage for src/debputy/plugin/api/impl.py: 55% + + + + + +
+
+

+ Coverage for src/debputy/plugin/api/impl.py: + 55% +

+ +

+ 753 statements   + + + + +

+

+ « prev     + ^ index     + » next +       + coverage.py v7.2.7, + created at 2024-04-07 12:14 +0200 +

+ +
+
+
+

1import contextlib 

+

2import dataclasses 

+

3import functools 

+

4import importlib 

+

5import importlib.util 

+

6import itertools 

+

7import json 

+

8import os 

+

9import re 

+

10import subprocess 

+

11import sys 

+

12from abc import ABC 

+

13from json import JSONDecodeError 

+

14from typing import ( 

+

15 Optional, 

+

16 Callable, 

+

17 Dict, 

+

18 Tuple, 

+

19 Iterable, 

+

20 Sequence, 

+

21 Type, 

+

22 List, 

+

23 Union, 

+

24 Set, 

+

25 Iterator, 

+

26 IO, 

+

27 Mapping, 

+

28 AbstractSet, 

+

29 cast, 

+

30 FrozenSet, 

+

31 Any, 

+

32 Literal, 

+

33) 

+

34 

+

35from debputy import DEBPUTY_DOC_ROOT_DIR 

+

36from debputy.exceptions import ( 

+

37 DebputySubstitutionError, 

+

38 PluginConflictError, 

+

39 PluginMetadataError, 

+

40 PluginBaseError, 

+

41 PluginInitializationError, 

+

42 PluginAPIViolationError, 

+

43 PluginNotFoundError, 

+

44) 

+

45from debputy.maintscript_snippet import ( 

+

46 STD_CONTROL_SCRIPTS, 

+

47 MaintscriptSnippetContainer, 

+

48 MaintscriptSnippet, 

+

49) 

+

50from debputy.manifest_parser.base_types import TypeMapping 

+

51from debputy.manifest_parser.exceptions import ManifestParseException 

+

52from debputy.manifest_parser.parser_data import ParserContextData 

+

53from debputy.manifest_parser.util import AttributePath 

+

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

+

55from debputy.plugin.api.impl_types import ( 

+

56 DebputyPluginMetadata, 

+

57 PackagerProvidedFileClassSpec, 

+

58 MetadataOrMaintscriptDetector, 

+

59 PluginProvidedTrigger, 

+

60 TTP, 

+

61 DIPHandler, 

+

62 PF, 

+

63 SF, 

+

64 DIPKWHandler, 

+

65 PluginProvidedManifestVariable, 

+

66 PluginProvidedPackageProcessor, 

+

67 PluginProvidedDiscardRule, 

+

68 AutomaticDiscardRuleExample, 

+

69 PPFFormatParam, 

+

70 ServiceManagerDetails, 

+

71 resolve_package_type_selectors, 

+

72 KnownPackagingFileInfo, 

+

73 PluginProvidedKnownPackagingFile, 

+

74 InstallPatternDHCompatRule, 

+

75 PluginProvidedTypeMapping, 

+

76) 

+

77from debputy.plugin.api.plugin_parser import ( 

+

78 PLUGIN_METADATA_PARSER, 

+

79 PluginJsonMetadata, 

+

80 PLUGIN_PPF_PARSER, 

+

81 PackagerProvidedFileJsonDescription, 

+

82 PLUGIN_MANIFEST_VARS_PARSER, 

+

83 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

+

84) 

+

85from debputy.plugin.api.spec import ( 

+

86 MaintscriptAccessor, 

+

87 Maintscript, 

+

88 DpkgTriggerType, 

+

89 BinaryCtrlAccessor, 

+

90 PackageProcessingContext, 

+

91 MetadataAutoDetector, 

+

92 PluginInitializationEntryPoint, 

+

93 DebputyPluginInitializer, 

+

94 PackageTypeSelector, 

+

95 FlushableSubstvars, 

+

96 ParserDocumentation, 

+

97 PackageProcessor, 

+

98 VirtualPath, 

+

99 ServiceIntegrator, 

+

100 ServiceDetector, 

+

101 ServiceRegistry, 

+

102 ServiceDefinition, 

+

103 DSD, 

+

104 ServiceUpgradeRule, 

+

105 PackagerProvidedFileReferenceDocumentation, 

+

106 packager_provided_file_reference_documentation, 

+

107 TypeMappingDocumentation, 

+

108) 

+

109from debputy.substitution import ( 

+

110 Substitution, 

+

111 VariableNameState, 

+

112 SUBST_VAR_RE, 

+

113 VariableContext, 

+

114) 

+

115from debputy.util import ( 

+

116 _normalize_path, 

+

117 POSTINST_DEFAULT_CONDITION, 

+

118 _error, 

+

119 print_command, 

+

120 _warn, 

+

121) 

+

122 

+

123PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

+

124 

+

125 

+

126def _validate_known_packaging_file_dh_compat_rules( 

+

127 dh_compat_rules: Optional[List[InstallPatternDHCompatRule]], 

+

128) -> None: 

+

129 max_compat = None 

+

130 if not dh_compat_rules: 

+

131 return 

+

132 dh_compat_rule: InstallPatternDHCompatRule 

+

133 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

+

134 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

+

135 compat = dh_compat_rule.get("starting_with_compat_level") 

+

136 

+

137 remaining = dh_compat_rule.keys() - { 

+

138 "after_debhelper_version", 

+

139 "starting_with_compat_level", 

+

140 } 

+

141 if not remaining: 

+

142 raise ValueError( 

+

143 f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?" 

+

144 ) 

+

145 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

+

146 raise ValueError( 

+

147 f"The dh compat-rule at index {idx} is not the last and is missing either" 

+

148 " before-debhelper-version or before-compat-level" 

+

149 ) 

+

150 if compat is not None and compat < 0: 

+

151 raise ValueError( 

+

152 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

+

153 f" for something that appeared when migrating from {compat} to {compat + 1}." 

+

154 ) 

+

155 

+

156 if max_compat is None: 

+

157 max_compat = compat 

+

158 elif compat is not None: 

+

159 if compat >= max_compat: 

+

160 raise ValueError( 

+

161 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

+

162 ) 

+

163 max_compat = compat 

+

164 

+

165 install_pattern = dh_compat_rule.get("install_pattern") 

+

166 if ( 

+

167 install_pattern is not None 

+

168 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

+

169 ): 

+

170 raise ValueError( 

+

171 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

+

172 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

+

173 ) 

+

174 

+

175 

+

176class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

+

177 __slots__ = ( 

+

178 "_plugin_metadata", 

+

179 "_feature_set", 

+

180 "_plugin_detector_ids", 

+

181 "_substitution", 

+

182 "_unloaders", 

+

183 "_load_started", 

+

184 ) 

+

185 

+

186 def __init__( 

+

187 self, 

+

188 plugin_metadata: DebputyPluginMetadata, 

+

189 feature_set: PluginProvidedFeatureSet, 

+

190 substitution: Substitution, 

+

191 ) -> None: 

+

192 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

+

193 self._feature_set = feature_set 

+

194 self._plugin_detector_ids: Set[str] = set() 

+

195 self._substitution = substitution 

+

196 self._unloaders: List[Callable[[], None]] = [] 

+

197 self._load_started = False 

+

198 

+

199 def unload_plugin(self) -> None: 

+

200 if self._load_started: 

+

201 for unloader in self._unloaders: 

+

202 unloader() 

+

203 del self._feature_set.plugin_data[self._plugin_name] 

+

204 

+

205 def load_plugin(self) -> None: 

+

206 metadata = self._plugin_metadata 

+

207 if metadata.plugin_name in self._feature_set.plugin_data: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

+

208 raise PluginConflictError( 

+

209 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

+

210 ) 

+

211 assert ( 

+

212 metadata.api_compat_version == 1 

+

213 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

+

214 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

+

215 self._load_started = True 

+

216 assert not metadata.is_initialized 

+

217 try: 

+

218 metadata.initialize_plugin(self) 

+

219 except Exception as e: 

+

220 initializer = metadata.plugin_initializer 

+

221 if ( 221 ↛ 226line 221 didn't jump to line 226

+

222 isinstance(e, TypeError) 

+

223 and initializer is not None 

+

224 and not callable(initializer) 

+

225 ): 

+

226 raise PluginMetadataError( 

+

227 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

+

228 f" callable (callable returns False). The specified entry point identifies" 

+

229 f' itself as "{initializer.__qualname__}".' 

+

230 ) from e 

+

231 elif isinstance(e, PluginBaseError): 231 ↛ 233line 231 didn't jump to line 233, because the condition on line 231 was never false

+

232 raise 

+

233 raise PluginInitializationError( 

+

234 f"Exception while attempting to load plugin {metadata.plugin_name}" 

+

235 ) from e 

+

236 

+

237 def packager_provided_file( 

+

238 self, 

+

239 stem: str, 

+

240 installed_path: str, 

+

241 *, 

+

242 default_mode: int = 0o0644, 

+

243 default_priority: Optional[int] = None, 

+

244 allow_name_segment: bool = True, 

+

245 allow_architecture_segment: bool = False, 

+

246 post_formatting_rewrite: Optional[Callable[[str], str]] = None, 

+

247 packageless_is_fallback_for_all_packages: bool = False, 

+

248 reservation_only: bool = False, 

+

249 format_callback: Optional[ 

+

250 Callable[[str, PPFFormatParam, VirtualPath], str] 

+

251 ] = None, 

+

252 reference_documentation: Optional[ 

+

253 PackagerProvidedFileReferenceDocumentation 

+

254 ] = None, 

+

255 ) -> None: 

+

256 packager_provided_files = self._feature_set.packager_provided_files 

+

257 existing = packager_provided_files.get(stem) 

+

258 

+

259 if format_callback is not None and self._plugin_name != "debputy": 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

+

260 raise ValueError( 

+

261 "Sorry; Using format_callback is a debputy-internal" 

+

262 f" API. Triggered by plugin {self._plugin_name}" 

+

263 ) 

+

264 

+

265 if installed_path.endswith("/"): 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

+

266 raise ValueError( 

+

267 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

+

268 f" Triggered by plugin {self._plugin_name}." 

+

269 ) 

+

270 

+

271 installed_path = _normalize_path(installed_path) 

+

272 

+

273 has_name_var = "{name}" in installed_path 

+

274 

+

275 if installed_path.startswith("./DEBIAN") or reservation_only: 

+

276 # Special-case, used for control files. 

+

277 if self._plugin_name != "debputy": 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true

+

278 raise ValueError( 

+

279 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

+

280 f" API. Triggered by plugin {self._plugin_name}" 

+

281 ) 

+

282 elif not has_name_var and "{owning_package}" not in installed_path: 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

+

283 raise ValueError( 

+

284 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

+

285 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

+

286 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

+

287 ) 

+

288 

+

289 if allow_name_segment and not has_name_var: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true

+

290 raise ValueError( 

+

291 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

+

292 " variable. Otherwise, the name segment will not work properly. Triggered by" 

+

293 f" plugin {self._plugin_name}" 

+

294 ) 

+

295 

+

296 if ( 296 ↛ 301line 296 didn't jump to line 301

+

297 default_priority is not None 

+

298 and "{priority}" not in installed_path 

+

299 and "{priority:02}" not in installed_path 

+

300 ): 

+

301 raise ValueError( 

+

302 'When default_priority is not None, the installed_path should have a "{priority}"' 

+

303 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

+

304 f" Triggered by plugin {self._plugin_name}" 

+

305 ) 

+

306 

+

307 if existing is not None: 

+

308 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 308 ↛ 315line 308 didn't jump to line 315

+

309 message = ( 

+

310 f'The stem "{stem}" is registered twice for packager provided files.' 

+

311 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

+

312 f" by {self._plugin_name}" 

+

313 ) 

+

314 else: 

+

315 message = ( 

+

316 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

317 f' stem "{stem}" twice for packager provided files.' 

+

318 ) 

+

319 raise PluginConflictError( 

+

320 message, existing.debputy_plugin_metadata, self._plugin_metadata 

+

321 ) 

+

322 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

+

323 self._plugin_metadata, 

+

324 stem, 

+

325 installed_path, 

+

326 default_mode=default_mode, 

+

327 default_priority=default_priority, 

+

328 allow_name_segment=allow_name_segment, 

+

329 allow_architecture_segment=allow_architecture_segment, 

+

330 post_formatting_rewrite=post_formatting_rewrite, 

+

331 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

+

332 reservation_only=reservation_only, 

+

333 formatting_callback=format_callback, 

+

334 reference_documentation=reference_documentation, 

+

335 ) 

+

336 

+

337 def _unload() -> None: 

+

338 del packager_provided_files[stem] 

+

339 

+

340 self._unloaders.append(_unload) 

+

341 

+

342 def metadata_or_maintscript_detector( 

+

343 self, 

+

344 auto_detector_id: str, 

+

345 auto_detector: MetadataAutoDetector, 

+

346 *, 

+

347 package_type: PackageTypeSelector = "deb", 

+

348 ) -> None: 

+

349 if auto_detector_id in self._plugin_detector_ids: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

+

350 raise ValueError( 

+

351 f"The plugin {self._plugin_name} tried to register" 

+

352 f' "{auto_detector_id}" twice' 

+

353 ) 

+

354 self._plugin_detector_ids.add(auto_detector_id) 

+

355 all_detectors = self._feature_set.metadata_maintscript_detectors 

+

356 if self._plugin_name not in all_detectors: 

+

357 all_detectors[self._plugin_name] = [] 

+

358 package_types = resolve_package_type_selectors(package_type) 

+

359 all_detectors[self._plugin_name].append( 

+

360 MetadataOrMaintscriptDetector( 

+

361 detector_id=auto_detector_id, 

+

362 detector=auto_detector, 

+

363 plugin_metadata=self._plugin_metadata, 

+

364 applies_to_package_types=package_types, 

+

365 enabled=True, 

+

366 ) 

+

367 ) 

+

368 

+

369 def _unload() -> None: 

+

370 if self._plugin_name in all_detectors: 

+

371 del all_detectors[self._plugin_name] 

+

372 

+

373 self._unloaders.append(_unload) 

+

374 

+

375 def document_builtin_variable( 

+

376 self, 

+

377 variable_name: str, 

+

378 variable_reference_documentation: str, 

+

379 *, 

+

380 is_context_specific: bool = False, 

+

381 is_for_special_case: bool = False, 

+

382 ) -> None: 

+

383 manifest_variables = self._feature_set.manifest_variables 

+

384 self._restricted_api() 

+

385 state = self._substitution.variable_state(variable_name) 

+

386 if state == VariableNameState.UNDEFINED: 386 ↛ 387line 386 didn't jump to line 387, because the condition on line 386 was never true

+

387 raise ValueError( 

+

388 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

+

389 f" but it is not known to be a variable" 

+

390 ) 

+

391 

+

392 assert variable_name not in manifest_variables 

+

393 

+

394 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

+

395 self._plugin_metadata, 

+

396 variable_name, 

+

397 None, 

+

398 is_context_specific_variable=is_context_specific, 

+

399 variable_reference_documentation=variable_reference_documentation, 

+

400 is_documentation_placeholder=True, 

+

401 is_for_special_case=is_for_special_case, 

+

402 ) 

+

403 

+

404 def _unload() -> None: 

+

405 del manifest_variables[variable_name] 

+

406 

+

407 self._unloaders.append(_unload) 

+

408 

+

409 def manifest_variable_provider( 

+

410 self, 

+

411 provider: Callable[[VariableContext], Mapping[str, str]], 

+

412 variables: Union[Sequence[str], Mapping[str, Optional[str]]], 

+

413 ) -> None: 

+

414 self._restricted_api() 

+

415 cached_provider = functools.lru_cache(None)(provider) 

+

416 permitted_variables = frozenset(variables) 

+

417 variables_iter: Iterable[Tuple[str, Optional[str]]] 

+

418 if not isinstance(variables, Mapping): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true

+

419 variables_iter = zip(variables, itertools.repeat(None)) 

+

420 else: 

+

421 variables_iter = variables.items() 

+

422 

+

423 checked_vars = False 

+

424 manifest_variables = self._feature_set.manifest_variables 

+

425 plugin_name = self._plugin_name 

+

426 

+

427 def _value_resolver_generator( 

+

428 variable_name: str, 

+

429 ) -> Callable[[VariableContext], str]: 

+

430 def _value_resolver(variable_context: VariableContext) -> str: 

+

431 res = cached_provider(variable_context) 

+

432 nonlocal checked_vars 

+

433 if not checked_vars: 433 ↛ 444line 433 didn't jump to line 444, because the condition on line 433 was never false

+

434 if permitted_variables != res.keys(): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

+

435 expected = ", ".join(sorted(permitted_variables)) 

+

436 actual = ", ".join(sorted(res)) 

+

437 raise PluginAPIViolationError( 

+

438 f"The plugin {plugin_name} claimed to provide" 

+

439 f" the following variables {expected}," 

+

440 f" but when resolving the variables, the plugin provided" 

+

441 f" {actual}. These two lists should have been the same." 

+

442 ) 

+

443 checked_vars = False 

+

444 return res[variable_name] 

+

445 

+

446 return _value_resolver 

+

447 

+

448 for varname, vardoc in variables_iter: 

+

449 self._check_variable_name(varname) 

+

450 manifest_variables[varname] = PluginProvidedManifestVariable( 

+

451 self._plugin_metadata, 

+

452 varname, 

+

453 _value_resolver_generator(varname), 

+

454 is_context_specific_variable=False, 

+

455 variable_reference_documentation=vardoc, 

+

456 ) 

+

457 

+

458 def _unload() -> None: 

+

459 raise PluginInitializationError( 

+

460 "Cannot unload manifest_variable_provider (not implemented)" 

+

461 ) 

+

462 

+

463 self._unloaders.append(_unload) 

+

464 

+

465 def _check_variable_name(self, variable_name: str) -> None: 

+

466 manifest_variables = self._feature_set.manifest_variables 

+

467 existing = manifest_variables.get(variable_name) 

+

468 

+

469 if existing is not None: 

+

470 if existing.plugin_metadata.plugin_name == self._plugin_name: 470 ↛ 476line 470 didn't jump to line 476

+

471 message = ( 

+

472 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

473 f' manifest variable "{variable_name}" twice.' 

+

474 ) 

+

475 else: 

+

476 message = ( 

+

477 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

+

478 f" both tried to provide the manifest variable {variable_name}" 

+

479 ) 

+

480 raise PluginConflictError( 

+

481 message, existing.plugin_metadata, self._plugin_metadata 

+

482 ) 

+

483 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

+

484 raise ValueError( 

+

485 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

+

486 f" which is not a valid variable name" 

+

487 ) 

+

488 

+

489 namespace = "" 

+

490 variable_basename = variable_name 

+

491 if ":" in variable_name: 

+

492 namespace, variable_basename = variable_name.rsplit(":", 1) 

+

493 assert namespace != "" 

+

494 assert variable_name != "" 

+

495 

+

496 if namespace != "" and namespace not in ("token", "path"): 

+

497 raise ValueError( 

+

498 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

+

499 f" which is in the reserved namespace {namespace}" 

+

500 ) 

+

501 

+

502 variable_name_upper = variable_name.upper() 

+

503 if ( 

+

504 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

+

505 or variable_basename.startswith("_") 

+

506 or variable_basename.upper().startswith("DEBPUTY") 

+

507 ) and self._plugin_name != "debputy": 

+

508 raise ValueError( 

+

509 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

+

510 f" which is a variable name reserved by debputy" 

+

511 ) 

+

512 

+

513 state = self._substitution.variable_state(variable_name) 

+

514 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

+

515 raise ValueError( 

+

516 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

+

517 f" which would shadow a built-in variable" 

+

518 ) 

+

519 

+

520 def package_processor( 

+

521 self, 

+

522 processor_id: str, 

+

523 processor: PackageProcessor, 

+

524 *, 

+

525 depends_on_processor: Iterable[str] = tuple(), 

+

526 package_type: PackageTypeSelector = "deb", 

+

527 ) -> None: 

+

528 self._restricted_api(allowed_plugins={"lua"}) 

+

529 package_processors = self._feature_set.all_package_processors 

+

530 dependencies = set() 

+

531 processor_key = (self._plugin_name, processor_id) 

+

532 

+

533 if processor_key in package_processors: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

+

534 raise PluginConflictError( 

+

535 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

+

536 self._plugin_metadata, 

+

537 self._plugin_metadata, 

+

538 ) 

+

539 

+

540 for depends_ref in depends_on_processor: 

+

541 if isinstance(depends_ref, str): 541 ↛ 555line 541 didn't jump to line 555, because the condition on line 541 was never false

+

542 if (self._plugin_name, depends_ref) in package_processors: 542 ↛ 544line 542 didn't jump to line 544, because the condition on line 542 was never false

+

543 depends_key = (self._plugin_name, depends_ref) 

+

544 elif ("debputy", depends_ref) in package_processors: 

+

545 depends_key = ("debputy", depends_ref) 

+

546 else: 

+

547 raise ValueError( 

+

548 f'Could not resolve dependency "{depends_ref}" for' 

+

549 f' "{processor_id}". It was not provided by the plugin itself' 

+

550 f" ({self._plugin_name}) nor debputy." 

+

551 ) 

+

552 else: 

+

553 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

+

554 # via the direct dependencies. 

+

555 assert False 

+

556 

+

557 existing_processor = package_processors.get(depends_key) 

+

558 if existing_processor is None: 558 ↛ 561line 558 didn't jump to line 561, because the condition on line 558 was never true

+

559 # We currently require the processor to be declared already. If this ever changes, 

+

560 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

+

561 dplugin_name, dprocessor_name = depends_key 

+

562 available_processors = ", ".join( 

+

563 n for p, n in package_processors.keys() if p == dplugin_name 

+

564 ) 

+

565 raise ValueError( 

+

566 f"The plugin {dplugin_name} does not provide a processor called" 

+

567 f" {dprocessor_name}. Available processors for that plugin are:" 

+

568 f" {available_processors}" 

+

569 ) 

+

570 dependencies.add(depends_key) 

+

571 

+

572 package_processors[processor_key] = PluginProvidedPackageProcessor( 

+

573 processor_id, 

+

574 resolve_package_type_selectors(package_type), 

+

575 processor, 

+

576 frozenset(dependencies), 

+

577 self._plugin_metadata, 

+

578 ) 

+

579 

+

580 def _unload() -> None: 

+

581 del package_processors[processor_key] 

+

582 

+

583 self._unloaders.append(_unload) 

+

584 

+

585 def automatic_discard_rule( 

+

586 self, 

+

587 name: str, 

+

588 should_discard: Callable[[VirtualPath], bool], 

+

589 *, 

+

590 rule_reference_documentation: Optional[str] = None, 

+

591 examples: Union[ 

+

592 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample] 

+

593 ] = tuple(), 

+

594 ) -> None: 

+

595 """Register an automatic discard rule 

+

596 

+

597 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

+

598 If any discard rule concludes that a path should not be installed, then the path is not installed. 

+

599 In the case where the discard path is a: 

+

600 

+

601 * directory: Then the entire directory is excluded along with anything beneath it. 

+

602 * symlink: Then the symlink itself (but not its target) is excluded. 

+

603 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

+

604 

+

605 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

+

606 

+

607 Automatic discard rules should be written with the assumption that directories will be tested 

+

608 before their content *when it is relevant* for the discard rule to examine whether the directory 

+

609 can be excluded. 

+

610 

+

611 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

+

612 without any globs. As example: 

+

613 

+

614 installations: 

+

615 - install: 

+

616 sources: 

+

617 - usr/lib/libfoo.la # <-- This path is always installed 

+

618 # (Discard rules are never asked in this case) 

+

619 # 

+

620 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

+

621 # Though, they will not examine `libfoo.la` as it has already been installed 

+

622 # 

+

623 # Note: usr/lib itself is never tested in this case (it is assumed to be 

+

624 # explicitly requested). But any subdir of usr/lib will be examined. 

+

625 

+

626 When an automatic discard rule is evaluated, it can see the source path currently being considered 

+

627 for installation. While it can look at "surrounding" context (like parent directory), it will not 

+

628 know whether those paths are to be installed or will be installed. 

+

629 

+

630 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

+

631 metacharacters and spaces. 

+

632 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

+

633 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

+

634 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

+

635 A source path will either be from the root of the source tree or the root of a search directory such as 

+

636 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

+

637 evaluated. 

+

638 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

+

639 looks up this automatic discard rule. 

+

640 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

+

641 generate the examples. 

+

642 

+

643 """ 

+

644 self._restricted_api() 

+

645 auto_discard_rules = self._feature_set.auto_discard_rules 

+

646 existing = auto_discard_rules.get(name) 

+

647 if existing is not None: 647 ↛ 648line 647 didn't jump to line 648, because the condition on line 647 was never true

+

648 if existing.plugin_metadata.plugin_name == self._plugin_name: 

+

649 message = ( 

+

650 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

651 f' automatic discard rule "{name}" twice.' 

+

652 ) 

+

653 else: 

+

654 message = ( 

+

655 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

+

656 f" both tried to provide the automatic discard rule {name}" 

+

657 ) 

+

658 raise PluginConflictError( 

+

659 message, existing.plugin_metadata, self._plugin_metadata 

+

660 ) 

+

661 examples = ( 

+

662 (examples,) 

+

663 if isinstance(examples, AutomaticDiscardRuleExample) 

+

664 else tuple(examples) 

+

665 ) 

+

666 auto_discard_rules[name] = PluginProvidedDiscardRule( 

+

667 name, 

+

668 self._plugin_metadata, 

+

669 should_discard, 

+

670 rule_reference_documentation, 

+

671 examples, 

+

672 ) 

+

673 

+

674 def _unload() -> None: 

+

675 del auto_discard_rules[name] 

+

676 

+

677 self._unloaders.append(_unload) 

+

678 

+

679 def service_provider( 

+

680 self, 

+

681 service_manager: str, 

+

682 detector: ServiceDetector, 

+

683 integrator: ServiceIntegrator, 

+

684 ) -> None: 

+

685 self._restricted_api() 

+

686 service_managers = self._feature_set.service_managers 

+

687 existing = service_managers.get(service_manager) 

+

688 if existing is not None: 688 ↛ 689line 688 didn't jump to line 689, because the condition on line 688 was never true

+

689 if existing.plugin_metadata.plugin_name == self._plugin_name: 

+

690 message = ( 

+

691 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

692 f' service manager "{service_manager}" twice.' 

+

693 ) 

+

694 else: 

+

695 message = ( 

+

696 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

+

697 f' both tried to provide the service manager "{service_manager}"' 

+

698 ) 

+

699 raise PluginConflictError( 

+

700 message, existing.plugin_metadata, self._plugin_metadata 

+

701 ) 

+

702 service_managers[service_manager] = ServiceManagerDetails( 

+

703 service_manager, 

+

704 detector, 

+

705 integrator, 

+

706 self._plugin_metadata, 

+

707 ) 

+

708 

+

709 def _unload() -> None: 

+

710 del service_managers[service_manager] 

+

711 

+

712 self._unloaders.append(_unload) 

+

713 

+

714 def manifest_variable( 

+

715 self, 

+

716 variable_name: str, 

+

717 value: str, 

+

718 variable_reference_documentation: Optional[str] = None, 

+

719 ) -> None: 

+

720 self._check_variable_name(variable_name) 

+

721 manifest_variables = self._feature_set.manifest_variables 

+

722 try: 

+

723 resolved_value = self._substitution.substitute( 

+

724 value, "Plugin initialization" 

+

725 ) 

+

726 depends_on_variable = resolved_value != value 

+

727 except DebputySubstitutionError: 

+

728 depends_on_variable = True 

+

729 if depends_on_variable: 

+

730 raise ValueError( 

+

731 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

+

732 f" This value depends on another variable, which is not supported. This restriction may be" 

+

733 f" lifted in the future." 

+

734 ) 

+

735 

+

736 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

+

737 self._plugin_metadata, 

+

738 variable_name, 

+

739 value, 

+

740 is_context_specific_variable=False, 

+

741 variable_reference_documentation=variable_reference_documentation, 

+

742 ) 

+

743 

+

744 def _unload() -> None: 

+

745 # We need to check it was never resolved 

+

746 raise PluginInitializationError( 

+

747 "Cannot unload manifest_variable (not implemented)" 

+

748 ) 

+

749 

+

750 self._unloaders.append(_unload) 

+

751 

+

752 @property 

+

753 def _plugin_name(self) -> str: 

+

754 return self._plugin_metadata.plugin_name 

+

755 

+

756 def provide_manifest_keyword( 

+

757 self, 

+

758 rule_type: TTP, 

+

759 rule_name: Union[str, List[str]], 

+

760 handler: DIPKWHandler, 

+

761 *, 

+

762 inline_reference_documentation: Optional[ParserDocumentation] = None, 

+

763 ) -> None: 

+

764 self._restricted_api() 

+

765 parser_generator = self._feature_set.manifest_parser_generator 

+

766 if rule_type not in parser_generator.dispatchable_table_parsers: 766 ↛ 767line 766 didn't jump to line 767, because the condition on line 766 was never true

+

767 types = ", ".join( 

+

768 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

+

769 ) 

+

770 raise ValueError( 

+

771 f"The rule_type was not a supported type. It must be one of {types}" 

+

772 ) 

+

773 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

+

774 dispatching_parser.register_keyword( 

+

775 rule_name, 

+

776 handler, 

+

777 self._plugin_metadata, 

+

778 inline_reference_documentation=inline_reference_documentation, 

+

779 ) 

+

780 

+

781 def _unload() -> None: 

+

782 raise PluginInitializationError( 

+

783 "Cannot unload provide_manifest_keyword (not implemented)" 

+

784 ) 

+

785 

+

786 self._unloaders.append(_unload) 

+

787 

+

788 def pluggable_object_parser( 

+

789 self, 

+

790 rule_type: str, 

+

791 rule_name: str, 

+

792 *, 

+

793 object_parser_key: Optional[str] = None, 

+

794 on_end_parse_step: Optional[ 

+

795 Callable[ 

+

796 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData], 

+

797 None, 

+

798 ] 

+

799 ] = None, 

+

800 nested_in_package_context: bool = False, 

+

801 ) -> None: 

+

802 self._restricted_api() 

+

803 if object_parser_key is None: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true

+

804 object_parser_key = rule_name 

+

805 

+

806 parser_generator = self._feature_set.manifest_parser_generator 

+

807 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

+

808 if rule_type not in dispatchable_object_parsers: 808 ↛ 809line 808 didn't jump to line 809, because the condition on line 808 was never true

+

809 types = ", ".join(sorted(dispatchable_object_parsers)) 

+

810 raise ValueError( 

+

811 f"The rule_type was not a supported type. It must be one of {types}" 

+

812 ) 

+

813 if object_parser_key not in dispatchable_object_parsers: 813 ↛ 814line 813 didn't jump to line 814, because the condition on line 813 was never true

+

814 types = ", ".join(sorted(dispatchable_object_parsers)) 

+

815 raise ValueError( 

+

816 f"The object_parser_key was not a supported type. It must be one of {types}" 

+

817 ) 

+

818 parent_dispatcher = dispatchable_object_parsers[rule_type] 

+

819 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

+

820 parent_dispatcher.register_child_parser( 

+

821 rule_name, 

+

822 child_dispatcher, 

+

823 self._plugin_metadata, 

+

824 on_end_parse_step=on_end_parse_step, 

+

825 nested_in_package_context=nested_in_package_context, 

+

826 ) 

+

827 

+

828 def _unload() -> None: 

+

829 raise PluginInitializationError( 

+

830 "Cannot unload pluggable_object_parser (not implemented)" 

+

831 ) 

+

832 

+

833 self._unloaders.append(_unload) 

+

834 

+

835 def pluggable_manifest_rule( 

+

836 self, 

+

837 rule_type: Union[TTP, str], 

+

838 rule_name: Union[str, List[str]], 

+

839 parsed_format: Type[PF], 

+

840 handler: DIPHandler, 

+

841 *, 

+

842 source_format: Optional[SF] = None, 

+

843 inline_reference_documentation: Optional[ParserDocumentation] = None, 

+

844 ) -> None: 

+

845 self._restricted_api() 

+

846 feature_set = self._feature_set 

+

847 parser_generator = feature_set.manifest_parser_generator 

+

848 if isinstance(rule_type, str): 

+

849 if rule_type not in parser_generator.dispatchable_object_parsers: 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true

+

850 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

+

851 raise ValueError( 

+

852 f"The rule_type was not a supported type. It must be one of {types}" 

+

853 ) 

+

854 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

+

855 else: 

+

856 if rule_type not in parser_generator.dispatchable_table_parsers: 856 ↛ 857line 856 didn't jump to line 857, because the condition on line 856 was never true

+

857 types = ", ".join( 

+

858 sorted( 

+

859 x.__name__ for x in parser_generator.dispatchable_table_parsers 

+

860 ) 

+

861 ) 

+

862 raise ValueError( 

+

863 f"The rule_type was not a supported type. It must be one of {types}" 

+

864 ) 

+

865 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

+

866 

+

867 parser = feature_set.manifest_parser_generator.generate_parser( 

+

868 parsed_format, 

+

869 source_content=source_format, 

+

870 inline_reference_documentation=inline_reference_documentation, 

+

871 ) 

+

872 dispatching_parser.register_parser( 

+

873 rule_name, 

+

874 parser, 

+

875 handler, 

+

876 self._plugin_metadata, 

+

877 ) 

+

878 

+

879 def _unload() -> None: 

+

880 raise PluginInitializationError( 

+

881 "Cannot unload pluggable_manifest_rule (not implemented)" 

+

882 ) 

+

883 

+

884 self._unloaders.append(_unload) 

+

885 

+

886 def known_packaging_files( 

+

887 self, 

+

888 packaging_file_details: KnownPackagingFileInfo, 

+

889 ) -> None: 

+

890 known_packaging_files = self._feature_set.known_packaging_files 

+

891 detection_method = packaging_file_details.get( 

+

892 "detection_method", cast("Literal['path']", "path") 

+

893 ) 

+

894 path = packaging_file_details.get("path") 

+

895 dhpkgfile = packaging_file_details.get("pkgfile") 

+

896 

+

897 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy() 

+

898 

+

899 if detection_method == "path": 

+

900 if dhpkgfile is not None: 

+

901 raise ValueError( 

+

902 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

+

903 ) 

+

904 if path != _normalize_path(path, with_prefix=False): 

+

905 raise ValueError( 

+

906 f"The path for known packaging files must be normalized. Please replace" 

+

907 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

+

908 ) 

+

909 detection_value = path 

+

910 else: 

+

911 assert detection_method == "dh.pkgfile" 

+

912 if path is not None: 

+

913 raise ValueError( 

+

914 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

+

915 ) 

+

916 if "/" in dhpkgfile: 

+

917 raise ValueError( 

+

918 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

+

919 ) 

+

920 detection_value = dhpkgfile 

+

921 key = f"{detection_method}::{detection_value}" 

+

922 existing = known_packaging_files.get(key) 

+

923 if existing is not None: 

+

924 if existing.plugin_metadata.plugin_name != self._plugin_name: 

+

925 message = ( 

+

926 f'The key "{key}" is registered twice for known packaging files.' 

+

927 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

+

928 ) 

+

929 else: 

+

930 message = ( 

+

931 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

932 f' key "{key}" twice for known packaging files.' 

+

933 ) 

+

934 raise PluginConflictError( 

+

935 message, existing.plugin_metadata, self._plugin_metadata 

+

936 ) 

+

937 _validate_known_packaging_file_dh_compat_rules( 

+

938 packaging_file_details.get("dh_compat_rules") 

+

939 ) 

+

940 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

+

941 packaging_file_details, 

+

942 detection_method, 

+

943 detection_value, 

+

944 self._plugin_metadata, 

+

945 ) 

+

946 

+

947 def _unload() -> None: 

+

948 del known_packaging_files[key] 

+

949 

+

950 self._unloaders.append(_unload) 

+

951 

+

952 def register_mapped_type( 

+

953 self, 

+

954 type_mapping: TypeMapping, 

+

955 *, 

+

956 reference_documentation: Optional[TypeMappingDocumentation] = None, 

+

957 ) -> None: 

+

958 self._restricted_api() 

+

959 target_type = type_mapping.target_type 

+

960 mapped_types = self._feature_set.mapped_types 

+

961 existing = mapped_types.get(target_type) 

+

962 if existing is not None: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true

+

963 if existing.plugin_metadata.plugin_name != self._plugin_name: 

+

964 message = ( 

+

965 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

+

966 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

+

967 ) 

+

968 else: 

+

969 message = ( 

+

970 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

+

971 f' key "{target_type.__name__}" twice for known packaging files.' 

+

972 ) 

+

973 raise PluginConflictError( 

+

974 message, existing.plugin_metadata, self._plugin_metadata 

+

975 ) 

+

976 parser_generator = self._feature_set.manifest_parser_generator 

+

977 mapped_types[target_type] = PluginProvidedTypeMapping( 

+

978 type_mapping, reference_documentation, self._plugin_metadata 

+

979 ) 

+

980 parser_generator.register_mapped_type(type_mapping) 

+

981 

+

982 def _restricted_api( 

+

983 self, 

+

984 *, 

+

985 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(), 

+

986 ) -> None: 

+

987 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true

+

988 raise PluginAPIViolationError( 

+

989 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

+

990 " If you are the maintainer of this plugin and want access to this" 

+

991 " API, please file a feature request to make this public." 

+

992 " (The API is currently private as it is unstable.)" 

+

993 ) 

+

994 

+

995 

+

996class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

+

997 __slots__ = () 

+

998 

+

999 def _append_script( 

+

1000 self, 

+

1001 caller_name: str, 

+

1002 maintscript: Maintscript, 

+

1003 full_script: str, 

+

1004 /, 

+

1005 perform_substitution: bool = True, 

+

1006 ) -> None: 

+

1007 raise NotImplementedError 

+

1008 

+

1009 @classmethod 

+

1010 def _apply_condition_to_script( 

+

1011 cls, 

+

1012 condition: str, 

+

1013 run_snippet: str, 

+

1014 /, 

+

1015 indent: Optional[bool] = None, 

+

1016 ) -> str: 

+

1017 if indent is None: 

+

1018 # We auto-determine this based on heredocs currently 

+

1019 indent = "<<" not in run_snippet 

+

1020 

+

1021 if indent: 

+

1022 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

+

1023 if not run_snippet.endswith("\n"): 

+

1024 run_snippet += "\n" 

+

1025 condition_line = f"if {condition}; then\n" 

+

1026 end_line = "fi\n" 

+

1027 return "".join((condition_line, run_snippet, end_line)) 

+

1028 

+

1029 def on_configure( 

+

1030 self, 

+

1031 run_snippet: str, 

+

1032 /, 

+

1033 indent: Optional[bool] = None, 

+

1034 perform_substitution: bool = True, 

+

1035 skip_on_rollback: bool = False, 

+

1036 ) -> None: 

+

1037 condition = POSTINST_DEFAULT_CONDITION 

+

1038 if skip_on_rollback: 1038 ↛ 1039line 1038 didn't jump to line 1039, because the condition on line 1038 was never true

+

1039 condition = '[ "$1" = "configure" ]' 

+

1040 return self._append_script( 

+

1041 "on_configure", 

+

1042 "postinst", 

+

1043 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1044 perform_substitution=perform_substitution, 

+

1045 ) 

+

1046 

+

1047 def on_initial_install( 

+

1048 self, 

+

1049 run_snippet: str, 

+

1050 /, 

+

1051 indent: Optional[bool] = None, 

+

1052 perform_substitution: bool = True, 

+

1053 ) -> None: 

+

1054 condition = '[ "$1" = "configure" -a -z "$2" ]' 

+

1055 return self._append_script( 

+

1056 "on_initial_install", 

+

1057 "postinst", 

+

1058 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1059 perform_substitution=perform_substitution, 

+

1060 ) 

+

1061 

+

1062 def on_upgrade( 

+

1063 self, 

+

1064 run_snippet: str, 

+

1065 /, 

+

1066 indent: Optional[bool] = None, 

+

1067 perform_substitution: bool = True, 

+

1068 ) -> None: 

+

1069 condition = '[ "$1" = "configure" -a -n "$2" ]' 

+

1070 return self._append_script( 

+

1071 "on_upgrade", 

+

1072 "postinst", 

+

1073 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1074 perform_substitution=perform_substitution, 

+

1075 ) 

+

1076 

+

1077 def on_upgrade_from( 

+

1078 self, 

+

1079 version: str, 

+

1080 run_snippet: str, 

+

1081 /, 

+

1082 indent: Optional[bool] = None, 

+

1083 perform_substitution: bool = True, 

+

1084 ) -> None: 

+

1085 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

+

1086 return self._append_script( 

+

1087 "on_upgrade_from", 

+

1088 "postinst", 

+

1089 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1090 perform_substitution=perform_substitution, 

+

1091 ) 

+

1092 

+

1093 def on_before_removal( 

+

1094 self, 

+

1095 run_snippet: str, 

+

1096 /, 

+

1097 indent: Optional[bool] = None, 

+

1098 perform_substitution: bool = True, 

+

1099 ) -> None: 

+

1100 condition = '[ "$1" = "remove" ]' 

+

1101 return self._append_script( 

+

1102 "on_before_removal", 

+

1103 "prerm", 

+

1104 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1105 perform_substitution=perform_substitution, 

+

1106 ) 

+

1107 

+

1108 def on_removed( 

+

1109 self, 

+

1110 run_snippet: str, 

+

1111 /, 

+

1112 indent: Optional[bool] = None, 

+

1113 perform_substitution: bool = True, 

+

1114 ) -> None: 

+

1115 condition = '[ "$1" = "remove" ]' 

+

1116 return self._append_script( 

+

1117 "on_removed", 

+

1118 "postrm", 

+

1119 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1120 perform_substitution=perform_substitution, 

+

1121 ) 

+

1122 

+

1123 def on_purge( 

+

1124 self, 

+

1125 run_snippet: str, 

+

1126 /, 

+

1127 indent: Optional[bool] = None, 

+

1128 perform_substitution: bool = True, 

+

1129 ) -> None: 

+

1130 condition = '[ "$1" = "purge" ]' 

+

1131 return self._append_script( 

+

1132 "on_purge", 

+

1133 "postrm", 

+

1134 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

+

1135 perform_substitution=perform_substitution, 

+

1136 ) 

+

1137 

+

1138 def unconditionally_in_script( 

+

1139 self, 

+

1140 maintscript: Maintscript, 

+

1141 run_snippet: str, 

+

1142 /, 

+

1143 perform_substitution: bool = True, 

+

1144 ) -> None: 

+

1145 if maintscript not in STD_CONTROL_SCRIPTS: 1145 ↛ 1146line 1145 didn't jump to line 1146, because the condition on line 1145 was never true

+

1146 raise ValueError( 

+

1147 f'Unknown script "{maintscript}". Should have been one of:' 

+

1148 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

+

1149 ) 

+

1150 return self._append_script( 

+

1151 "unconditionally_in_script", 

+

1152 maintscript, 

+

1153 run_snippet, 

+

1154 perform_substitution=perform_substitution, 

+

1155 ) 

+

1156 

+

1157 

+

1158class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

+

1159 __slots__ = ( 

+

1160 "_plugin_metadata", 

+

1161 "_maintscript_snippets", 

+

1162 "_plugin_source_id", 

+

1163 "_package_substitution", 

+

1164 "_default_snippet_order", 

+

1165 ) 

+

1166 

+

1167 def __init__( 

+

1168 self, 

+

1169 plugin_metadata: DebputyPluginMetadata, 

+

1170 plugin_source_id: str, 

+

1171 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

+

1172 package_substitution: Substitution, 

+

1173 *, 

+

1174 default_snippet_order: Optional[Literal["service"]] = None, 

+

1175 ): 

+

1176 self._plugin_metadata = plugin_metadata 

+

1177 self._plugin_source_id = plugin_source_id 

+

1178 self._maintscript_snippets = maintscript_snippets 

+

1179 self._package_substitution = package_substitution 

+

1180 self._default_snippet_order = default_snippet_order 

+

1181 

+

1182 def _append_script( 

+

1183 self, 

+

1184 caller_name: str, 

+

1185 maintscript: Maintscript, 

+

1186 full_script: str, 

+

1187 /, 

+

1188 perform_substitution: bool = True, 

+

1189 ) -> None: 

+

1190 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

+

1191 if perform_substitution: 

+

1192 full_script = self._package_substitution.substitute(full_script, def_source) 

+

1193 

+

1194 snippet = MaintscriptSnippet( 

+

1195 snippet=full_script, 

+

1196 definition_source=def_source, 

+

1197 snippet_order=self._default_snippet_order, 

+

1198 ) 

+

1199 self._maintscript_snippets[maintscript].append(snippet) 

+

1200 

+

1201 

+

1202class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

+

1203 __slots__ = ( 

+

1204 "_plugin_metadata", 

+

1205 "_plugin_source_id", 

+

1206 "_package_metadata_context", 

+

1207 "_triggers", 

+

1208 "_substvars", 

+

1209 "_maintscript", 

+

1210 "_shlibs_details", 

+

1211 ) 

+

1212 

+

1213 def __init__( 

+

1214 self, 

+

1215 plugin_metadata: DebputyPluginMetadata, 

+

1216 plugin_source_id: str, 

+

1217 package_metadata_context: PackageProcessingContext, 

+

1218 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

+

1219 substvars: FlushableSubstvars, 

+

1220 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

+

1221 ) -> None: 

+

1222 self._plugin_metadata = plugin_metadata 

+

1223 self._plugin_source_id = plugin_source_id 

+

1224 self._package_metadata_context = package_metadata_context 

+

1225 self._triggers = triggers 

+

1226 self._substvars = substvars 

+

1227 self._maintscript: Optional[MaintscriptAccessor] = None 

+

1228 self._shlibs_details = shlibs_details 

+

1229 

+

1230 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

+

1231 raise NotImplementedError 

+

1232 

+

1233 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

+

1234 """Register a declarative dpkg level trigger 

+

1235 

+

1236 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

+

1237 

+

1238 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

+

1239 """ 

+

1240 key = (trigger_type, trigger_target) 

+

1241 if key in self._triggers: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true

+

1242 return 

+

1243 self._triggers[key] = PluginProvidedTrigger( 

+

1244 dpkg_trigger_type=trigger_type, 

+

1245 dpkg_trigger_target=trigger_target, 

+

1246 provider=self._plugin_metadata, 

+

1247 provider_source_id=self._plugin_source_id, 

+

1248 ) 

+

1249 

+

1250 @property 

+

1251 def maintscript(self) -> MaintscriptAccessor: 

+

1252 maintscript = self._maintscript 

+

1253 if maintscript is None: 

+

1254 maintscript = self._create_maintscript_accessor() 

+

1255 self._maintscript = maintscript 

+

1256 return maintscript 

+

1257 

+

1258 @property 

+

1259 def substvars(self) -> FlushableSubstvars: 

+

1260 return self._substvars 

+

1261 

+

1262 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

+

1263 binary_package = self._package_metadata_context.binary_package 

+

1264 with self.substvars.flush() as substvars_file: 

+

1265 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

+

1266 if binary_package.is_udeb: 

+

1267 dpkg_cmd.append("-tudeb") 

+

1268 if binary_package.is_essential: 1268 ↛ 1269line 1268 didn't jump to line 1269, because the condition on line 1268 was never true

+

1269 dpkg_cmd.append("-dPre-Depends") 

+

1270 shlibs_local, shlib_dirs = self._shlibs_details 

+

1271 if shlibs_local is not None: 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true

+

1272 dpkg_cmd.append(f"-L{shlibs_local}") 

+

1273 if shlib_dirs: 1273 ↛ 1274line 1273 didn't jump to line 1274, because the condition on line 1273 was never true

+

1274 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

+

1275 dpkg_cmd.extend(p.fs_path for p in paths) 

+

1276 print_command(*dpkg_cmd) 

+

1277 try: 

+

1278 subprocess.check_call(dpkg_cmd) 

+

1279 except subprocess.CalledProcessError: 

+

1280 _error( 

+

1281 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

+

1282 " review the output from dpkg-shlibdeps above to understand what went wrong." 

+

1283 ) 

+

1284 

+

1285 

+

1286class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

+

1287 __slots__ = ( 

+

1288 "_maintscript", 

+

1289 "_maintscript_snippets", 

+

1290 "_package_substitution", 

+

1291 ) 

+

1292 

+

1293 def __init__( 

+

1294 self, 

+

1295 plugin_metadata: DebputyPluginMetadata, 

+

1296 plugin_source_id: str, 

+

1297 package_metadata_context: PackageProcessingContext, 

+

1298 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

+

1299 substvars: FlushableSubstvars, 

+

1300 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

+

1301 package_substitution: Substitution, 

+

1302 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

+

1303 *, 

+

1304 default_snippet_order: Optional[Literal["service"]] = None, 

+

1305 ) -> None: 

+

1306 super().__init__( 

+

1307 plugin_metadata, 

+

1308 plugin_source_id, 

+

1309 package_metadata_context, 

+

1310 triggers, 

+

1311 substvars, 

+

1312 shlibs_details, 

+

1313 ) 

+

1314 self._maintscript_snippets = maintscript_snippets 

+

1315 self._package_substitution = package_substitution 

+

1316 self._maintscript = MaintscriptAccessorProvider( 

+

1317 plugin_metadata, 

+

1318 plugin_source_id, 

+

1319 maintscript_snippets, 

+

1320 package_substitution, 

+

1321 default_snippet_order=default_snippet_order, 

+

1322 ) 

+

1323 

+

1324 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

+

1325 return MaintscriptAccessorProvider( 

+

1326 self._plugin_metadata, 

+

1327 self._plugin_source_id, 

+

1328 self._maintscript_snippets, 

+

1329 self._package_substitution, 

+

1330 ) 

+

1331 

+

1332 

+

1333class BinaryCtrlAccessorProviderCreator: 

+

1334 def __init__( 

+

1335 self, 

+

1336 package_metadata_context: PackageProcessingContext, 

+

1337 substvars: FlushableSubstvars, 

+

1338 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

+

1339 substitution: Substitution, 

+

1340 ) -> None: 

+

1341 self._package_metadata_context = package_metadata_context 

+

1342 self._substvars = substvars 

+

1343 self._maintscript_snippets = maintscript_snippets 

+

1344 self._substitution = substitution 

+

1345 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

+

1346 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None 

+

1347 

+

1348 def for_plugin( 

+

1349 self, 

+

1350 plugin_metadata: DebputyPluginMetadata, 

+

1351 plugin_source_id: str, 

+

1352 *, 

+

1353 default_snippet_order: Optional[Literal["service"]] = None, 

+

1354 ) -> BinaryCtrlAccessor: 

+

1355 return BinaryCtrlAccessorProvider( 

+

1356 plugin_metadata, 

+

1357 plugin_source_id, 

+

1358 self._package_metadata_context, 

+

1359 self._triggers, 

+

1360 self._substvars, 

+

1361 self._maintscript_snippets, 

+

1362 self._substitution, 

+

1363 self.shlibs_details, 

+

1364 default_snippet_order=default_snippet_order, 

+

1365 ) 

+

1366 

+

1367 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

+

1368 return self._triggers.values() 

+

1369 

+

1370 

+

1371def plugin_metadata_for_debputys_own_plugin( 

+

1372 loader: Optional[PluginInitializationEntryPoint] = None, 

+

1373) -> DebputyPluginMetadata: 

+

1374 if loader is None: 

+

1375 from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

+

1376 

+

1377 loader = initialize_debputy_features 

+

1378 return DebputyPluginMetadata( 

+

1379 plugin_name="debputy", 

+

1380 api_compat_version=1, 

+

1381 plugin_initializer=loader, 

+

1382 plugin_loader=None, 

+

1383 plugin_path="<bundled>", 

+

1384 ) 

+

1385 

+

1386 

+

1387def load_plugin_features( 

+

1388 plugin_search_dirs: Sequence[str], 

+

1389 substitution: Substitution, 

+

1390 requested_plugins_only: Optional[Sequence[str]] = None, 

+

1391 required_plugins: Optional[Set[str]] = None, 

+

1392 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None, 

+

1393 debug_mode: bool = False, 

+

1394) -> PluginProvidedFeatureSet: 

+

1395 if plugin_feature_set is None: 

+

1396 plugin_feature_set = PluginProvidedFeatureSet() 

+

1397 plugins = [plugin_metadata_for_debputys_own_plugin()] 

+

1398 unloadable_plugins = set() 

+

1399 if required_plugins: 

+

1400 plugins.extend( 

+

1401 find_json_plugins( 

+

1402 plugin_search_dirs, 

+

1403 required_plugins, 

+

1404 ) 

+

1405 ) 

+

1406 if requested_plugins_only is not None: 

+

1407 plugins.extend( 

+

1408 find_json_plugins( 

+

1409 plugin_search_dirs, 

+

1410 requested_plugins_only, 

+

1411 ) 

+

1412 ) 

+

1413 else: 

+

1414 auto_loaded = _find_all_json_plugins( 

+

1415 plugin_search_dirs, 

+

1416 required_plugins if required_plugins is not None else frozenset(), 

+

1417 debug_mode=debug_mode, 

+

1418 ) 

+

1419 for plugin_metadata in auto_loaded: 

+

1420 plugins.append(plugin_metadata) 

+

1421 unloadable_plugins.add(plugin_metadata.plugin_name) 

+

1422 

+

1423 for plugin_metadata in plugins: 

+

1424 api = DebputyPluginInitializerProvider( 

+

1425 plugin_metadata, plugin_feature_set, substitution 

+

1426 ) 

+

1427 try: 

+

1428 api.load_plugin() 

+

1429 except PluginBaseError as e: 

+

1430 if plugin_metadata.plugin_name not in unloadable_plugins: 

+

1431 raise 

+

1432 if debug_mode: 

+

1433 raise 

+

1434 try: 

+

1435 api.unload_plugin() 

+

1436 except Exception: 

+

1437 _warn( 

+

1438 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

+

1439 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

+

1440 " module might have tainted the feature set." 

+

1441 ) 

+

1442 raise e from None 

+

1443 else: 

+

1444 if debug_mode: 

+

1445 _warn( 

+

1446 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

+

1447 f" to --debug/-d." 

+

1448 ) 

+

1449 _warn( 

+

1450 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

+

1451 f" deactivated. Use debug mode (--debug) to show the stacktrace (the warning will become an error)" 

+

1452 ) 

+

1453 

+

1454 return plugin_feature_set 

+

1455 

+

1456 

+

1457def find_json_plugin( 

+

1458 search_dirs: Sequence[str], 

+

1459 requested_plugin: str, 

+

1460) -> DebputyPluginMetadata: 

+

1461 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

+

1462 assert len(r) == 1 

+

1463 return r[0] 

+

1464 

+

1465 

+

1466def find_related_implementation_files_for_plugin( 

+

1467 plugin_metadata: DebputyPluginMetadata, 

+

1468) -> List[str]: 

+

1469 plugin_path = plugin_metadata.plugin_path 

+

1470 if not os.path.isfile(plugin_path): 

+

1471 plugin_name = plugin_metadata.plugin_name 

+

1472 _error( 

+

1473 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

+

1474 " or loaded via a mechanism that does not support detecting its tests." 

+

1475 ) 

+

1476 files = [] 

+

1477 module_name, module_file = _find_plugin_implementation_file( 

+

1478 plugin_metadata.plugin_name, 

+

1479 plugin_metadata.plugin_path, 

+

1480 ) 

+

1481 if os.path.isfile(module_file): 

+

1482 files.append(module_file) 

+

1483 else: 

+

1484 if not plugin_metadata.is_loaded: 

+

1485 plugin_metadata.load_plugin() 

+

1486 if module_name in sys.modules: 

+

1487 _error( 

+

1488 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

+

1489 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

+

1490 f" installed via this method. The related Python would not be installed" 

+

1491 f" (which would result in a plugin that would fail to load)" 

+

1492 ) 

+

1493 

+

1494 return files 

+

1495 

+

1496 

+

1497def find_tests_for_plugin( 

+

1498 plugin_metadata: DebputyPluginMetadata, 

+

1499) -> List[str]: 

+

1500 plugin_name = plugin_metadata.plugin_name 

+

1501 plugin_path = plugin_metadata.plugin_path 

+

1502 

+

1503 if not os.path.isfile(plugin_path): 

+

1504 _error( 

+

1505 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

+

1506 " mechanism that does not support detecting its tests." 

+

1507 ) 

+

1508 

+

1509 plugin_dir = os.path.dirname(plugin_path) 

+

1510 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

+

1511 tests = [] 

+

1512 with os.scandir(plugin_dir) as dir_iter: 

+

1513 for p in dir_iter: 

+

1514 if ( 

+

1515 p.is_file() 

+

1516 and p.name.startswith(test_basename_prefix) 

+

1517 and PLUGIN_TEST_SUFFIX.search(p.name) 

+

1518 ): 

+

1519 tests.append(p.path) 

+

1520 return tests 

+

1521 

+

1522 

+

1523def find_json_plugins( 

+

1524 search_dirs: Sequence[str], 

+

1525 requested_plugins: Iterable[str], 

+

1526) -> Iterable[DebputyPluginMetadata]: 

+

1527 for plugin_name_or_path in requested_plugins: 

+

1528 found = False 

+

1529 if "/" in plugin_name_or_path: 1529 ↛ 1530line 1529 didn't jump to line 1530, because the condition on line 1529 was never true

+

1530 if not os.path.isfile(plugin_name_or_path): 

+

1531 raise PluginNotFoundError( 

+

1532 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

+

1533 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

+

1534 " is not used." 

+

1535 ) 

+

1536 yield parse_json_plugin_desc(plugin_name_or_path) 

+

1537 return 

+

1538 for search_dir in search_dirs: 

+

1539 path = os.path.join( 

+

1540 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

+

1541 ) 

+

1542 if not os.path.isfile(path): 1542 ↛ 1543line 1542 didn't jump to line 1543, because the condition on line 1542 was never true

+

1543 continue 

+

1544 found = True 

+

1545 yield parse_json_plugin_desc(path) 

+

1546 if not found: 1546 ↛ 1547line 1546 didn't jump to line 1547, because the condition on line 1546 was never true

+

1547 search_dir_str = ":".join(search_dirs) 

+

1548 raise PluginNotFoundError( 

+

1549 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

+

1550 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

+

1551 ) 

+

1552 

+

1553 

+

1554def _find_all_json_plugins( 

+

1555 search_dirs: Sequence[str], 

+

1556 required_plugins: AbstractSet[str], 

+

1557 debug_mode: bool = False, 

+

1558) -> Iterable[DebputyPluginMetadata]: 

+

1559 seen = set(required_plugins) 

+

1560 error_seen = False 

+

1561 for search_dir in search_dirs: 

+

1562 try: 

+

1563 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

+

1564 except FileNotFoundError: 

+

1565 continue 

+

1566 with dir_fd: 

+

1567 for entry in dir_fd: 

+

1568 if ( 

+

1569 not entry.is_file(follow_symlinks=True) 

+

1570 or not entry.name.endswith(".json") 

+

1571 or entry.name in seen 

+

1572 ): 

+

1573 continue 

+

1574 try: 

+

1575 plugin_metadata = parse_json_plugin_desc(entry.path) 

+

1576 except PluginBaseError as e: 

+

1577 if debug_mode: 

+

1578 raise 

+

1579 if not error_seen: 

+

1580 error_seen = True 

+

1581 _warn( 

+

1582 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

+

1583 ) 

+

1584 else: 

+

1585 _warn( 

+

1586 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

+

1587 ) 

+

1588 else: 

+

1589 yield plugin_metadata 

+

1590 

+

1591 

+

1592def _find_plugin_implementation_file( 

+

1593 plugin_name: str, 

+

1594 json_file_path: str, 

+

1595) -> Tuple[str, str]: 

+

1596 guessed_module_basename = plugin_name.replace("-", "_") 

+

1597 module_name = f"debputy.plugin.{guessed_module_basename}" 

+

1598 module_fs_path = os.path.join( 

+

1599 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

+

1600 ) 

+

1601 return module_name, module_fs_path 

+

1602 

+

1603 

+

1604def _resolve_module_initializer( 

+

1605 plugin_name: str, 

+

1606 plugin_initializer_name: str, 

+

1607 module_name: Optional[str], 

+

1608 json_file_path: str, 

+

1609) -> PluginInitializationEntryPoint: 

+

1610 module = None 

+

1611 module_fs_path = None 

+

1612 if module_name is None: 1612 ↛ 1640line 1612 didn't jump to line 1640, because the condition on line 1612 was never false

+

1613 module_name, module_fs_path = _find_plugin_implementation_file( 

+

1614 plugin_name, json_file_path 

+

1615 ) 

+

1616 if os.path.isfile(module_fs_path): 1616 ↛ 1640line 1616 didn't jump to line 1640, because the condition on line 1616 was never false

+

1617 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

+

1618 if spec is None: 1618 ↛ 1619line 1618 didn't jump to line 1619, because the condition on line 1618 was never true

+

1619 raise PluginInitializationError( 

+

1620 f"Failed to load {plugin_name} (path: {module_fs_path})." 

+

1621 " The spec_from_file_location function returned None." 

+

1622 ) 

+

1623 mod = importlib.util.module_from_spec(spec) 

+

1624 loader = spec.loader 

+

1625 if loader is None: 1625 ↛ 1626line 1625 didn't jump to line 1626, because the condition on line 1625 was never true

+

1626 raise PluginInitializationError( 

+

1627 f"Failed to load {plugin_name} (path: {module_fs_path})." 

+

1628 " Python could not find a suitable loader (spec.loader was None)" 

+

1629 ) 

+

1630 sys.modules[module_name] = mod 

+

1631 try: 

+

1632 loader.exec_module(mod) 

+

1633 except (Exception, GeneratorExit) as e: 

+

1634 raise PluginInitializationError( 

+

1635 f"Failed to load {plugin_name} (path: {module_fs_path})." 

+

1636 " The module threw an exception while being loaded." 

+

1637 ) from e 

+

1638 module = mod 

+

1639 

+

1640 if module is None: 1640 ↛ 1641line 1640 didn't jump to line 1641, because the condition on line 1640 was never true

+

1641 try: 

+

1642 module = importlib.import_module(module_name) 

+

1643 except ModuleNotFoundError as e: 

+

1644 if module_fs_path is None: 

+

1645 raise PluginMetadataError( 

+

1646 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

+

1647 " this module is not available in the python search path" 

+

1648 ) from e 

+

1649 raise PluginInitializationError( 

+

1650 f"Failed to load {plugin_name}. Tried loading it from" 

+

1651 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

+

1652 f" {module_name} (where it was not found either). Please ensure" 

+

1653 " the module code is installed in the correct spot or provide an" 

+

1654 f' explicit "module" definition in {json_file_path}.' 

+

1655 ) from e 

+

1656 

+

1657 plugin_initializer = getattr(module, plugin_initializer_name) 

+

1658 

+

1659 if plugin_initializer is None: 1659 ↛ 1660line 1659 didn't jump to line 1660, because the condition on line 1659 was never true

+

1660 raise PluginMetadataError( 

+

1661 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

+

1662 f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin" 

+

1663 f" metadata or initializer name in the Python module." 

+

1664 ) 

+

1665 return cast("PluginInitializationEntryPoint", plugin_initializer) 

+

1666 

+

1667 

+

1668def _json_plugin_loader( 

+

1669 plugin_name: str, 

+

1670 plugin_json_metadata: PluginJsonMetadata, 

+

1671 json_file_path: str, 

+

1672 attribute_path: AttributePath, 

+

1673) -> Callable[["DebputyPluginInitializer"], None]: 

+

1674 api_compat = plugin_json_metadata["api_compat_version"] 

+

1675 module_name = plugin_json_metadata.get("module") 

+

1676 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

+

1677 packager_provided_files_raw = plugin_json_metadata.get( 

+

1678 "packager_provided_files", [] 

+

1679 ) 

+

1680 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

+

1681 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

+

1682 if api_compat != 1: 1682 ↛ 1683line 1682 didn't jump to line 1683, because the condition on line 1682 was never true

+

1683 raise PluginMetadataError( 

+

1684 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

+

1685 f" version of debputy only supports API compat version of 1" 

+

1686 ) 

+

1687 if plugin_initializer_name is not None and "." in plugin_initializer_name: 1687 ↛ 1688line 1687 didn't jump to line 1688, because the condition on line 1687 was never true

+

1688 p = attribute_path["plugin_initializer"] 

+

1689 raise PluginMetadataError( 

+

1690 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

+

1691 ) 

+

1692 

+

1693 plugin_initializers = [] 

+

1694 

+

1695 if plugin_initializer_name is not None: 

+

1696 plugin_initializer = _resolve_module_initializer( 

+

1697 plugin_name, 

+

1698 plugin_initializer_name, 

+

1699 module_name, 

+

1700 json_file_path, 

+

1701 ) 

+

1702 plugin_initializers.append(plugin_initializer) 

+

1703 

+

1704 if known_packaging_files_raw: 1704 ↛ 1705line 1704 didn't jump to line 1705, because the condition on line 1704 was never true

+

1705 kpf_root_path = attribute_path["known_packaging_files"] 

+

1706 known_packaging_files = [] 

+

1707 for k, v in enumerate(known_packaging_files_raw): 

+

1708 kpf_path = kpf_root_path[k] 

+

1709 p = v.get("path") 

+

1710 if isinstance(p, str): 

+

1711 kpf_path.path_hint = p 

+

1712 if plugin_name.startswith("debputy-") and isinstance(v, dict): 

+

1713 docs = v.get("documentation-uris") 

+

1714 if docs is not None and isinstance(docs, list): 

+

1715 docs = [ 

+

1716 ( 

+

1717 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

+

1718 if isinstance(d, str) 

+

1719 else d 

+

1720 ) 

+

1721 for d in docs 

+

1722 ] 

+

1723 v["documentation-uris"] = docs 

+

1724 known_packaging_file: KnownPackagingFileInfo = ( 

+

1725 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

+

1726 v, 

+

1727 kpf_path, 

+

1728 ) 

+

1729 ) 

+

1730 known_packaging_files.append((kpf_path, known_packaging_file)) 

+

1731 

+

1732 def _initialize_json_provided_known_packaging_files( 

+

1733 api: DebputyPluginInitializerProvider, 

+

1734 ) -> None: 

+

1735 for p, details in known_packaging_files: 

+

1736 try: 

+

1737 api.known_packaging_files(details) 

+

1738 except ValueError as ex: 

+

1739 raise PluginMetadataError( 

+

1740 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

+

1741 ) 

+

1742 

+

1743 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

+

1744 

+

1745 if manifest_variables_raw: 

+

1746 manifest_var_path = attribute_path["manifest_variables"] 

+

1747 manifest_variables = [ 

+

1748 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

+

1749 for i, p in enumerate(manifest_variables_raw) 

+

1750 ] 

+

1751 

+

1752 def _initialize_json_provided_manifest_vars( 

+

1753 api: DebputyPluginInitializer, 

+

1754 ) -> None: 

+

1755 for idx, manifest_variable in enumerate(manifest_variables): 

+

1756 name = manifest_variable["name"] 

+

1757 value = manifest_variable["value"] 

+

1758 doc = manifest_variable.get("reference_documentation") 

+

1759 try: 

+

1760 api.manifest_variable( 

+

1761 name, value, variable_reference_documentation=doc 

+

1762 ) 

+

1763 except ValueError as ex: 

+

1764 var_path = manifest_var_path[idx] 

+

1765 raise PluginMetadataError( 

+

1766 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

+

1767 ) 

+

1768 

+

1769 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

+

1770 

+

1771 if packager_provided_files_raw: 

+

1772 ppf_path = attribute_path["packager_provided_files"] 

+

1773 ppfs = [ 

+

1774 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

+

1775 for i, p in enumerate(packager_provided_files_raw) 

+

1776 ] 

+

1777 

+

1778 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

+

1779 ppf: PackagerProvidedFileJsonDescription 

+

1780 for idx, ppf in enumerate(ppfs): 

+

1781 c = dict(ppf) 

+

1782 stem = ppf["stem"] 

+

1783 installed_path = ppf["installed_path"] 

+

1784 default_mode = ppf.get("default_mode") 

+

1785 ref_doc_dict = ppf.get("reference_documentation") 

+

1786 if default_mode is not None: 1786 ↛ 1789line 1786 didn't jump to line 1789, because the condition on line 1786 was never false

+

1787 c["default_mode"] = default_mode.octal_mode 

+

1788 

+

1789 if ref_doc_dict is not None: 1789 ↛ 1794line 1789 didn't jump to line 1794, because the condition on line 1789 was never false

+

1790 ref_doc = packager_provided_file_reference_documentation( 

+

1791 **ref_doc_dict 

+

1792 ) 

+

1793 else: 

+

1794 ref_doc = None 

+

1795 

+

1796 for k in [ 

+

1797 "stem", 

+

1798 "installed_path", 

+

1799 "reference_documentation", 

+

1800 ]: 

+

1801 try: 

+

1802 del c[k] 

+

1803 except KeyError: 

+

1804 pass 

+

1805 

+

1806 try: 

+

1807 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

+

1808 except ValueError as ex: 

+

1809 p_path = ppf_path[idx] 

+

1810 raise PluginMetadataError( 

+

1811 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

+

1812 ) 

+

1813 

+

1814 plugin_initializers.append(_initialize_json_provided_ppfs) 

+

1815 

+

1816 if not plugin_initializers: 1816 ↛ 1817line 1816 didn't jump to line 1817, because the condition on line 1816 was never true

+

1817 raise PluginMetadataError( 

+

1818 f"The plugin defined in {json_file_path} does not seem to provide features, " 

+

1819 f" such as module + plugin-initializer or packager-provided-files." 

+

1820 ) 

+

1821 

+

1822 if len(plugin_initializers) == 1: 

+

1823 return plugin_initializers[0] 

+

1824 

+

1825 def _chain_loader(api: DebputyPluginInitializer) -> None: 

+

1826 for initializer in plugin_initializers: 

+

1827 initializer(api) 

+

1828 

+

1829 return _chain_loader 

+

1830 

+

1831 

+

1832@contextlib.contextmanager 

+

1833def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]: 

+

1834 if fd is not None: 

+

1835 yield fd 

+

1836 else: 

+

1837 with open(path, "rb") as fd: 

+

1838 yield fd 

+

1839 

+

1840 

+

1841def parse_json_plugin_desc( 

+

1842 path: str, *, fd: Optional[IO[bytes]] = None 

+

1843) -> DebputyPluginMetadata: 

+

1844 with _open(path, fd=fd) as rfd: 

+

1845 try: 

+

1846 raw = json.load(rfd) 

+

1847 except JSONDecodeError as e: 

+

1848 raise PluginMetadataError( 

+

1849 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

+

1850 ) from e 

+

1851 plugin_name = os.path.basename(path) 

+

1852 if plugin_name.endswith(".json"): 

+

1853 plugin_name = plugin_name[:-5] 

+

1854 elif plugin_name.endswith(".json.in"): 

+

1855 plugin_name = plugin_name[:-8] 

+

1856 

+

1857 if plugin_name == "debputy": 1857 ↛ 1859line 1857 didn't jump to line 1859, because the condition on line 1857 was never true

+

1858 # Provide a better error message than "The plugin has already loaded!?" 

+

1859 raise PluginMetadataError( 

+

1860 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

+

1861 f" clash with the bundled plugin of same name." 

+

1862 ) 

+

1863 

+

1864 attribute_path = AttributePath.root_path() 

+

1865 

+

1866 try: 

+

1867 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

+

1868 raw, 

+

1869 attribute_path, 

+

1870 ) 

+

1871 except ManifestParseException as e: 

+

1872 raise PluginMetadataError( 

+

1873 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

+

1874 ) from e 

+

1875 api_compat = plugin_json_metadata["api_compat_version"] 

+

1876 

+

1877 return DebputyPluginMetadata( 

+

1878 plugin_name=plugin_name, 

+

1879 plugin_loader=lambda: _json_plugin_loader( 

+

1880 plugin_name, 

+

1881 plugin_json_metadata, 

+

1882 path, 

+

1883 attribute_path, 

+

1884 ), 

+

1885 api_compat_version=api_compat, 

+

1886 plugin_initializer=None, 

+

1887 plugin_path=path, 

+

1888 ) 

+

1889 

+

1890 

+

1891@dataclasses.dataclass(slots=True, frozen=True) 

+

1892class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

+

1893 name: str 

+

1894 names: Sequence[str] 

+

1895 path: VirtualPath 

+

1896 type_of_service: str 

+

1897 service_scope: str 

+

1898 auto_enable_on_install: bool 

+

1899 auto_start_on_install: bool 

+

1900 on_upgrade: ServiceUpgradeRule 

+

1901 definition_source: str 

+

1902 is_plugin_provided_definition: bool 

+

1903 service_context: Optional[DSD] 

+

1904 

+

1905 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

+

1906 return dataclasses.replace(self, **changes) 

+

1907 

+

1908 

+

1909class ServiceRegistryImpl(ServiceRegistry[DSD]): 

+

1910 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

+

1911 

+

1912 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

+

1913 self._service_manager_details = service_manager_details 

+

1914 self._service_definitions: List[ServiceDefinition[DSD]] = [] 

+

1915 self._seen_services = set() 

+

1916 

+

1917 @property 

+

1918 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

+

1919 return self._service_definitions 

+

1920 

+

1921 def register_service( 

+

1922 self, 

+

1923 path: VirtualPath, 

+

1924 name: Union[str, List[str]], 

+

1925 *, 

+

1926 type_of_service: str = "service", # "timer", etc. 

+

1927 service_scope: str = "system", 

+

1928 enable_by_default: bool = True, 

+

1929 start_by_default: bool = True, 

+

1930 default_upgrade_rule: ServiceUpgradeRule = "restart", 

+

1931 service_context: Optional[DSD] = None, 

+

1932 ) -> None: 

+

1933 names = name if isinstance(name, list) else [name] 

+

1934 if len(names) < 1: 

+

1935 raise ValueError( 

+

1936 f"The service must have at least one name - {path.absolute} did not have any" 

+

1937 ) 

+

1938 for n in names: 

+

1939 key = (n, type_of_service, service_scope) 

+

1940 if key in self._seen_services: 

+

1941 raise PluginAPIViolationError( 

+

1942 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

+

1943 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

+

1944 " allowed by the debputy plugin API." 

+

1945 ) 

+

1946 # TODO: We cannot create a service definition immediate once the manifest is involved 

+

1947 self._service_definitions.append( 

+

1948 ServiceDefinitionImpl( 

+

1949 names[0], 

+

1950 names, 

+

1951 path, 

+

1952 type_of_service, 

+

1953 service_scope, 

+

1954 enable_by_default, 

+

1955 start_by_default, 

+

1956 default_upgrade_rule, 

+

1957 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

+

1958 True, 

+

1959 service_context, 

+

1960 ) 

+

1961 ) 

+
+ + + -- cgit v1.2.3