Coverage for src/debputy/plugin/api/impl.py: 55%
753 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import contextlib
2import dataclasses
3import functools
4import importlib
5import importlib.util
6import itertools
7import json
8import os
9import re
10import subprocess
11import sys
12from abc import ABC
13from json import JSONDecodeError
14from typing import (
15 Optional,
16 Callable,
17 Dict,
18 Tuple,
19 Iterable,
20 Sequence,
21 Type,
22 List,
23 Union,
24 Set,
25 Iterator,
26 IO,
27 Mapping,
28 AbstractSet,
29 cast,
30 FrozenSet,
31 Any,
32 Literal,
33)
35from debputy import DEBPUTY_DOC_ROOT_DIR
36from debputy.exceptions import (
37 DebputySubstitutionError,
38 PluginConflictError,
39 PluginMetadataError,
40 PluginBaseError,
41 PluginInitializationError,
42 PluginAPIViolationError,
43 PluginNotFoundError,
44)
45from debputy.maintscript_snippet import (
46 STD_CONTROL_SCRIPTS,
47 MaintscriptSnippetContainer,
48 MaintscriptSnippet,
49)
50from debputy.manifest_parser.base_types import TypeMapping
51from debputy.manifest_parser.exceptions import ManifestParseException
52from debputy.manifest_parser.parser_data import ParserContextData
53from debputy.manifest_parser.util import AttributePath
54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
55from debputy.plugin.api.impl_types import (
56 DebputyPluginMetadata,
57 PackagerProvidedFileClassSpec,
58 MetadataOrMaintscriptDetector,
59 PluginProvidedTrigger,
60 TTP,
61 DIPHandler,
62 PF,
63 SF,
64 DIPKWHandler,
65 PluginProvidedManifestVariable,
66 PluginProvidedPackageProcessor,
67 PluginProvidedDiscardRule,
68 AutomaticDiscardRuleExample,
69 PPFFormatParam,
70 ServiceManagerDetails,
71 resolve_package_type_selectors,
72 KnownPackagingFileInfo,
73 PluginProvidedKnownPackagingFile,
74 InstallPatternDHCompatRule,
75 PluginProvidedTypeMapping,
76)
77from debputy.plugin.api.plugin_parser import (
78 PLUGIN_METADATA_PARSER,
79 PluginJsonMetadata,
80 PLUGIN_PPF_PARSER,
81 PackagerProvidedFileJsonDescription,
82 PLUGIN_MANIFEST_VARS_PARSER,
83 PLUGIN_KNOWN_PACKAGING_FILES_PARSER,
84)
85from debputy.plugin.api.spec import (
86 MaintscriptAccessor,
87 Maintscript,
88 DpkgTriggerType,
89 BinaryCtrlAccessor,
90 PackageProcessingContext,
91 MetadataAutoDetector,
92 PluginInitializationEntryPoint,
93 DebputyPluginInitializer,
94 PackageTypeSelector,
95 FlushableSubstvars,
96 ParserDocumentation,
97 PackageProcessor,
98 VirtualPath,
99 ServiceIntegrator,
100 ServiceDetector,
101 ServiceRegistry,
102 ServiceDefinition,
103 DSD,
104 ServiceUpgradeRule,
105 PackagerProvidedFileReferenceDocumentation,
106 packager_provided_file_reference_documentation,
107 TypeMappingDocumentation,
108)
109from debputy.substitution import (
110 Substitution,
111 VariableNameState,
112 SUBST_VAR_RE,
113 VariableContext,
114)
115from debputy.util import (
116 _normalize_path,
117 POSTINST_DEFAULT_CONDITION,
118 _error,
119 print_command,
120 _warn,
121)
123PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$")
126def _validate_known_packaging_file_dh_compat_rules(
127 dh_compat_rules: Optional[List[InstallPatternDHCompatRule]],
128) -> None:
129 max_compat = None
130 if not dh_compat_rules:
131 return
132 dh_compat_rule: InstallPatternDHCompatRule
133 for idx, dh_compat_rule in enumerate(dh_compat_rules):
134 dh_version = dh_compat_rule.get("starting_with_debhelper_version")
135 compat = dh_compat_rule.get("starting_with_compat_level")
137 remaining = dh_compat_rule.keys() - {
138 "after_debhelper_version",
139 "starting_with_compat_level",
140 }
141 if not remaining:
142 raise ValueError(
143 f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?"
144 )
145 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1:
146 raise ValueError(
147 f"The dh compat-rule at index {idx} is not the last and is missing either"
148 " before-debhelper-version or before-compat-level"
149 )
150 if compat is not None and compat < 0:
151 raise ValueError(
152 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule"
153 f" for something that appeared when migrating from {compat} to {compat + 1}."
154 )
156 if max_compat is None:
157 max_compat = compat
158 elif compat is not None:
159 if compat >= max_compat:
160 raise ValueError(
161 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}."
162 )
163 max_compat = compat
165 install_pattern = dh_compat_rule.get("install_pattern")
166 if (
167 install_pattern is not None
168 and _normalize_path(install_pattern, with_prefix=False) != install_pattern
169 ):
170 raise ValueError(
171 f"The install-pattern in dh compat-rule at {idx} must be normalized as"
172 f' "{_normalize_path(install_pattern, with_prefix=False)}".'
173 )
176class DebputyPluginInitializerProvider(DebputyPluginInitializer):
177 __slots__ = (
178 "_plugin_metadata",
179 "_feature_set",
180 "_plugin_detector_ids",
181 "_substitution",
182 "_unloaders",
183 "_load_started",
184 )
186 def __init__(
187 self,
188 plugin_metadata: DebputyPluginMetadata,
189 feature_set: PluginProvidedFeatureSet,
190 substitution: Substitution,
191 ) -> None:
192 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata
193 self._feature_set = feature_set
194 self._plugin_detector_ids: Set[str] = set()
195 self._substitution = substitution
196 self._unloaders: List[Callable[[], None]] = []
197 self._load_started = False
199 def unload_plugin(self) -> None:
200 if self._load_started:
201 for unloader in self._unloaders:
202 unloader()
203 del self._feature_set.plugin_data[self._plugin_name]
205 def load_plugin(self) -> None:
206 metadata = self._plugin_metadata
207 if metadata.plugin_name in self._feature_set.plugin_data: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true
208 raise PluginConflictError(
209 f'The plugin "{metadata.plugin_name}" has already been loaded!?'
210 )
211 assert (
212 metadata.api_compat_version == 1
213 ), f"Unsupported plugin API compat version {metadata.api_compat_version}"
214 self._feature_set.plugin_data[metadata.plugin_name] = metadata
215 self._load_started = True
216 assert not metadata.is_initialized
217 try:
218 metadata.initialize_plugin(self)
219 except Exception as e:
220 initializer = metadata.plugin_initializer
221 if ( 221 ↛ 226line 221 didn't jump to line 226
222 isinstance(e, TypeError)
223 and initializer is not None
224 and not callable(initializer)
225 ):
226 raise PluginMetadataError(
227 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a"
228 f" callable (callable returns False). The specified entry point identifies"
229 f' itself as "{initializer.__qualname__}".'
230 ) from e
231 elif isinstance(e, PluginBaseError): 231 ↛ 233line 231 didn't jump to line 233, because the condition on line 231 was never false
232 raise
233 raise PluginInitializationError(
234 f"Exception while attempting to load plugin {metadata.plugin_name}"
235 ) from e
237 def packager_provided_file(
238 self,
239 stem: str,
240 installed_path: str,
241 *,
242 default_mode: int = 0o0644,
243 default_priority: Optional[int] = None,
244 allow_name_segment: bool = True,
245 allow_architecture_segment: bool = False,
246 post_formatting_rewrite: Optional[Callable[[str], str]] = None,
247 packageless_is_fallback_for_all_packages: bool = False,
248 reservation_only: bool = False,
249 format_callback: Optional[
250 Callable[[str, PPFFormatParam, VirtualPath], str]
251 ] = None,
252 reference_documentation: Optional[
253 PackagerProvidedFileReferenceDocumentation
254 ] = None,
255 ) -> None:
256 packager_provided_files = self._feature_set.packager_provided_files
257 existing = packager_provided_files.get(stem)
259 if format_callback is not None and self._plugin_name != "debputy": 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true
260 raise ValueError(
261 "Sorry; Using format_callback is a debputy-internal"
262 f" API. Triggered by plugin {self._plugin_name}"
263 )
265 if installed_path.endswith("/"): 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true
266 raise ValueError(
267 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.'
268 f" Triggered by plugin {self._plugin_name}."
269 )
271 installed_path = _normalize_path(installed_path)
273 has_name_var = "{name}" in installed_path
275 if installed_path.startswith("./DEBIAN") or reservation_only:
276 # Special-case, used for control files.
277 if self._plugin_name != "debputy": 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true
278 raise ValueError(
279 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal"
280 f" API. Triggered by plugin {self._plugin_name}"
281 )
282 elif not has_name_var and "{owning_package}" not in installed_path: 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true
283 raise ValueError(
284 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"'
285 " substitution (or have installed_path end with a slash). Otherwise, the installed"
286 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}"
287 )
289 if allow_name_segment and not has_name_var: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true
290 raise ValueError(
291 'When allow_name_segment is True, the installed_path must have a "{name}" substitution'
292 " variable. Otherwise, the name segment will not work properly. Triggered by"
293 f" plugin {self._plugin_name}"
294 )
296 if ( 296 ↛ 301line 296 didn't jump to line 301
297 default_priority is not None
298 and "{priority}" not in installed_path
299 and "{priority:02}" not in installed_path
300 ):
301 raise ValueError(
302 'When default_priority is not None, the installed_path should have a "{priority}"'
303 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.'
304 f" Triggered by plugin {self._plugin_name}"
305 )
307 if existing is not None:
308 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 308 ↛ 315line 308 didn't jump to line 315
309 message = (
310 f'The stem "{stem}" is registered twice for packager provided files.'
311 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once"
312 f" by {self._plugin_name}"
313 )
314 else:
315 message = (
316 f"Bug in the plugin {self._plugin_name}: It tried to register the"
317 f' stem "{stem}" twice for packager provided files.'
318 )
319 raise PluginConflictError(
320 message, existing.debputy_plugin_metadata, self._plugin_metadata
321 )
322 packager_provided_files[stem] = PackagerProvidedFileClassSpec(
323 self._plugin_metadata,
324 stem,
325 installed_path,
326 default_mode=default_mode,
327 default_priority=default_priority,
328 allow_name_segment=allow_name_segment,
329 allow_architecture_segment=allow_architecture_segment,
330 post_formatting_rewrite=post_formatting_rewrite,
331 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages,
332 reservation_only=reservation_only,
333 formatting_callback=format_callback,
334 reference_documentation=reference_documentation,
335 )
337 def _unload() -> None:
338 del packager_provided_files[stem]
340 self._unloaders.append(_unload)
342 def metadata_or_maintscript_detector(
343 self,
344 auto_detector_id: str,
345 auto_detector: MetadataAutoDetector,
346 *,
347 package_type: PackageTypeSelector = "deb",
348 ) -> None:
349 if auto_detector_id in self._plugin_detector_ids: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true
350 raise ValueError(
351 f"The plugin {self._plugin_name} tried to register"
352 f' "{auto_detector_id}" twice'
353 )
354 self._plugin_detector_ids.add(auto_detector_id)
355 all_detectors = self._feature_set.metadata_maintscript_detectors
356 if self._plugin_name not in all_detectors:
357 all_detectors[self._plugin_name] = []
358 package_types = resolve_package_type_selectors(package_type)
359 all_detectors[self._plugin_name].append(
360 MetadataOrMaintscriptDetector(
361 detector_id=auto_detector_id,
362 detector=auto_detector,
363 plugin_metadata=self._plugin_metadata,
364 applies_to_package_types=package_types,
365 enabled=True,
366 )
367 )
369 def _unload() -> None:
370 if self._plugin_name in all_detectors:
371 del all_detectors[self._plugin_name]
373 self._unloaders.append(_unload)
375 def document_builtin_variable(
376 self,
377 variable_name: str,
378 variable_reference_documentation: str,
379 *,
380 is_context_specific: bool = False,
381 is_for_special_case: bool = False,
382 ) -> None:
383 manifest_variables = self._feature_set.manifest_variables
384 self._restricted_api()
385 state = self._substitution.variable_state(variable_name)
386 if state == VariableNameState.UNDEFINED: 386 ↛ 387line 386 didn't jump to line 387, because the condition on line 386 was never true
387 raise ValueError(
388 f"The plugin {self._plugin_name} attempted to document built-in {variable_name},"
389 f" but it is not known to be a variable"
390 )
392 assert variable_name not in manifest_variables
394 manifest_variables[variable_name] = PluginProvidedManifestVariable(
395 self._plugin_metadata,
396 variable_name,
397 None,
398 is_context_specific_variable=is_context_specific,
399 variable_reference_documentation=variable_reference_documentation,
400 is_documentation_placeholder=True,
401 is_for_special_case=is_for_special_case,
402 )
404 def _unload() -> None:
405 del manifest_variables[variable_name]
407 self._unloaders.append(_unload)
409 def manifest_variable_provider(
410 self,
411 provider: Callable[[VariableContext], Mapping[str, str]],
412 variables: Union[Sequence[str], Mapping[str, Optional[str]]],
413 ) -> None:
414 self._restricted_api()
415 cached_provider = functools.lru_cache(None)(provider)
416 permitted_variables = frozenset(variables)
417 variables_iter: Iterable[Tuple[str, Optional[str]]]
418 if not isinstance(variables, Mapping): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true
419 variables_iter = zip(variables, itertools.repeat(None))
420 else:
421 variables_iter = variables.items()
423 checked_vars = False
424 manifest_variables = self._feature_set.manifest_variables
425 plugin_name = self._plugin_name
427 def _value_resolver_generator(
428 variable_name: str,
429 ) -> Callable[[VariableContext], str]:
430 def _value_resolver(variable_context: VariableContext) -> str:
431 res = cached_provider(variable_context)
432 nonlocal checked_vars
433 if not checked_vars: 433 ↛ 444line 433 didn't jump to line 444, because the condition on line 433 was never false
434 if permitted_variables != res.keys(): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true
435 expected = ", ".join(sorted(permitted_variables))
436 actual = ", ".join(sorted(res))
437 raise PluginAPIViolationError(
438 f"The plugin {plugin_name} claimed to provide"
439 f" the following variables {expected},"
440 f" but when resolving the variables, the plugin provided"
441 f" {actual}. These two lists should have been the same."
442 )
443 checked_vars = False
444 return res[variable_name]
446 return _value_resolver
448 for varname, vardoc in variables_iter:
449 self._check_variable_name(varname)
450 manifest_variables[varname] = PluginProvidedManifestVariable(
451 self._plugin_metadata,
452 varname,
453 _value_resolver_generator(varname),
454 is_context_specific_variable=False,
455 variable_reference_documentation=vardoc,
456 )
458 def _unload() -> None:
459 raise PluginInitializationError(
460 "Cannot unload manifest_variable_provider (not implemented)"
461 )
463 self._unloaders.append(_unload)
465 def _check_variable_name(self, variable_name: str) -> None:
466 manifest_variables = self._feature_set.manifest_variables
467 existing = manifest_variables.get(variable_name)
469 if existing is not None:
470 if existing.plugin_metadata.plugin_name == self._plugin_name: 470 ↛ 476line 470 didn't jump to line 476
471 message = (
472 f"Bug in the plugin {self._plugin_name}: It tried to register the"
473 f' manifest variable "{variable_name}" twice.'
474 )
475 else:
476 message = (
477 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
478 f" both tried to provide the manifest variable {variable_name}"
479 )
480 raise PluginConflictError(
481 message, existing.plugin_metadata, self._plugin_metadata
482 )
483 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"):
484 raise ValueError(
485 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
486 f" which is not a valid variable name"
487 )
489 namespace = ""
490 variable_basename = variable_name
491 if ":" in variable_name:
492 namespace, variable_basename = variable_name.rsplit(":", 1)
493 assert namespace != ""
494 assert variable_name != ""
496 if namespace != "" and namespace not in ("token", "path"):
497 raise ValueError(
498 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
499 f" which is in the reserved namespace {namespace}"
500 )
502 variable_name_upper = variable_name.upper()
503 if (
504 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY"))
505 or variable_basename.startswith("_")
506 or variable_basename.upper().startswith("DEBPUTY")
507 ) and self._plugin_name != "debputy":
508 raise ValueError(
509 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
510 f" which is a variable name reserved by debputy"
511 )
513 state = self._substitution.variable_state(variable_name)
514 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy":
515 raise ValueError(
516 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
517 f" which would shadow a built-in variable"
518 )
520 def package_processor(
521 self,
522 processor_id: str,
523 processor: PackageProcessor,
524 *,
525 depends_on_processor: Iterable[str] = tuple(),
526 package_type: PackageTypeSelector = "deb",
527 ) -> None:
528 self._restricted_api(allowed_plugins={"lua"})
529 package_processors = self._feature_set.all_package_processors
530 dependencies = set()
531 processor_key = (self._plugin_name, processor_id)
533 if processor_key in package_processors: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true
534 raise PluginConflictError(
535 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}",
536 self._plugin_metadata,
537 self._plugin_metadata,
538 )
540 for depends_ref in depends_on_processor:
541 if isinstance(depends_ref, str): 541 ↛ 555line 541 didn't jump to line 555, because the condition on line 541 was never false
542 if (self._plugin_name, depends_ref) in package_processors: 542 ↛ 544line 542 didn't jump to line 544, because the condition on line 542 was never false
543 depends_key = (self._plugin_name, depends_ref)
544 elif ("debputy", depends_ref) in package_processors:
545 depends_key = ("debputy", depends_ref)
546 else:
547 raise ValueError(
548 f'Could not resolve dependency "{depends_ref}" for'
549 f' "{processor_id}". It was not provided by the plugin itself'
550 f" ({self._plugin_name}) nor debputy."
551 )
552 else:
553 # TODO: Add proper dependencies first, at which point we should probably resolve "name"
554 # via the direct dependencies.
555 assert False
557 existing_processor = package_processors.get(depends_key)
558 if existing_processor is None: 558 ↛ 561line 558 didn't jump to line 561, because the condition on line 558 was never true
559 # We currently require the processor to be declared already. If this ever changes,
560 # PluginProvidedFeatureSet.package_processors_in_order will need an update
561 dplugin_name, dprocessor_name = depends_key
562 available_processors = ", ".join(
563 n for p, n in package_processors.keys() if p == dplugin_name
564 )
565 raise ValueError(
566 f"The plugin {dplugin_name} does not provide a processor called"
567 f" {dprocessor_name}. Available processors for that plugin are:"
568 f" {available_processors}"
569 )
570 dependencies.add(depends_key)
572 package_processors[processor_key] = PluginProvidedPackageProcessor(
573 processor_id,
574 resolve_package_type_selectors(package_type),
575 processor,
576 frozenset(dependencies),
577 self._plugin_metadata,
578 )
580 def _unload() -> None:
581 del package_processors[processor_key]
583 self._unloaders.append(_unload)
585 def automatic_discard_rule(
586 self,
587 name: str,
588 should_discard: Callable[[VirtualPath], bool],
589 *,
590 rule_reference_documentation: Optional[str] = None,
591 examples: Union[
592 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample]
593 ] = tuple(),
594 ) -> None:
595 """Register an automatic discard rule
597 An automatic discard rule is basically applied to *every* path about to be installed in to any package.
598 If any discard rule concludes that a path should not be installed, then the path is not installed.
599 In the case where the discard path is a:
601 * directory: Then the entire directory is excluded along with anything beneath it.
602 * symlink: Then the symlink itself (but not its target) is excluded.
603 * hardlink: Then the current hardlink will not be installed, but other instances of it will be.
605 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file.
607 Automatic discard rules should be written with the assumption that directories will be tested
608 before their content *when it is relevant* for the discard rule to examine whether the directory
609 can be excluded.
611 The packager can via the manifest overrule automatic discard rules by explicitly listing the path
612 without any globs. As example:
614 installations:
615 - install:
616 sources:
617 - usr/lib/libfoo.la # <-- This path is always installed
618 # (Discard rules are never asked in this case)
619 #
620 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches
621 # Though, they will not examine `libfoo.la` as it has already been installed
622 #
623 # Note: usr/lib itself is never tested in this case (it is assumed to be
624 # explicitly requested). But any subdir of usr/lib will be examined.
626 When an automatic discard rule is evaluated, it can see the source path currently being considered
627 for installation. While it can look at "surrounding" context (like parent directory), it will not
628 know whether those paths are to be installed or will be installed.
630 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell
631 metacharacters and spaces.
632 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive
633 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the
634 path is discarded. If it returns `False`, the path is not discarded (by this rule at least).
635 A source path will either be from the root of the source tree or the root of a search directory such as
636 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is
637 evaluated.
638 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user
639 looks up this automatic discard rule.
640 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to
641 generate the examples.
643 """
644 self._restricted_api()
645 auto_discard_rules = self._feature_set.auto_discard_rules
646 existing = auto_discard_rules.get(name)
647 if existing is not None: 647 ↛ 648line 647 didn't jump to line 648, because the condition on line 647 was never true
648 if existing.plugin_metadata.plugin_name == self._plugin_name:
649 message = (
650 f"Bug in the plugin {self._plugin_name}: It tried to register the"
651 f' automatic discard rule "{name}" twice.'
652 )
653 else:
654 message = (
655 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
656 f" both tried to provide the automatic discard rule {name}"
657 )
658 raise PluginConflictError(
659 message, existing.plugin_metadata, self._plugin_metadata
660 )
661 examples = (
662 (examples,)
663 if isinstance(examples, AutomaticDiscardRuleExample)
664 else tuple(examples)
665 )
666 auto_discard_rules[name] = PluginProvidedDiscardRule(
667 name,
668 self._plugin_metadata,
669 should_discard,
670 rule_reference_documentation,
671 examples,
672 )
674 def _unload() -> None:
675 del auto_discard_rules[name]
677 self._unloaders.append(_unload)
679 def service_provider(
680 self,
681 service_manager: str,
682 detector: ServiceDetector,
683 integrator: ServiceIntegrator,
684 ) -> None:
685 self._restricted_api()
686 service_managers = self._feature_set.service_managers
687 existing = service_managers.get(service_manager)
688 if existing is not None: 688 ↛ 689line 688 didn't jump to line 689, because the condition on line 688 was never true
689 if existing.plugin_metadata.plugin_name == self._plugin_name:
690 message = (
691 f"Bug in the plugin {self._plugin_name}: It tried to register the"
692 f' service manager "{service_manager}" twice.'
693 )
694 else:
695 message = (
696 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
697 f' both tried to provide the service manager "{service_manager}"'
698 )
699 raise PluginConflictError(
700 message, existing.plugin_metadata, self._plugin_metadata
701 )
702 service_managers[service_manager] = ServiceManagerDetails(
703 service_manager,
704 detector,
705 integrator,
706 self._plugin_metadata,
707 )
709 def _unload() -> None:
710 del service_managers[service_manager]
712 self._unloaders.append(_unload)
714 def manifest_variable(
715 self,
716 variable_name: str,
717 value: str,
718 variable_reference_documentation: Optional[str] = None,
719 ) -> None:
720 self._check_variable_name(variable_name)
721 manifest_variables = self._feature_set.manifest_variables
722 try:
723 resolved_value = self._substitution.substitute(
724 value, "Plugin initialization"
725 )
726 depends_on_variable = resolved_value != value
727 except DebputySubstitutionError:
728 depends_on_variable = True
729 if depends_on_variable:
730 raise ValueError(
731 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}."
732 f" This value depends on another variable, which is not supported. This restriction may be"
733 f" lifted in the future."
734 )
736 manifest_variables[variable_name] = PluginProvidedManifestVariable(
737 self._plugin_metadata,
738 variable_name,
739 value,
740 is_context_specific_variable=False,
741 variable_reference_documentation=variable_reference_documentation,
742 )
744 def _unload() -> None:
745 # We need to check it was never resolved
746 raise PluginInitializationError(
747 "Cannot unload manifest_variable (not implemented)"
748 )
750 self._unloaders.append(_unload)
752 @property
753 def _plugin_name(self) -> str:
754 return self._plugin_metadata.plugin_name
756 def provide_manifest_keyword(
757 self,
758 rule_type: TTP,
759 rule_name: Union[str, List[str]],
760 handler: DIPKWHandler,
761 *,
762 inline_reference_documentation: Optional[ParserDocumentation] = None,
763 ) -> None:
764 self._restricted_api()
765 parser_generator = self._feature_set.manifest_parser_generator
766 if rule_type not in parser_generator.dispatchable_table_parsers: 766 ↛ 767line 766 didn't jump to line 767, because the condition on line 766 was never true
767 types = ", ".join(
768 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers)
769 )
770 raise ValueError(
771 f"The rule_type was not a supported type. It must be one of {types}"
772 )
773 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
774 dispatching_parser.register_keyword(
775 rule_name,
776 handler,
777 self._plugin_metadata,
778 inline_reference_documentation=inline_reference_documentation,
779 )
781 def _unload() -> None:
782 raise PluginInitializationError(
783 "Cannot unload provide_manifest_keyword (not implemented)"
784 )
786 self._unloaders.append(_unload)
788 def pluggable_object_parser(
789 self,
790 rule_type: str,
791 rule_name: str,
792 *,
793 object_parser_key: Optional[str] = None,
794 on_end_parse_step: Optional[
795 Callable[
796 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData],
797 None,
798 ]
799 ] = None,
800 nested_in_package_context: bool = False,
801 ) -> None:
802 self._restricted_api()
803 if object_parser_key is None: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true
804 object_parser_key = rule_name
806 parser_generator = self._feature_set.manifest_parser_generator
807 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
808 if rule_type not in dispatchable_object_parsers: 808 ↛ 809line 808 didn't jump to line 809, because the condition on line 808 was never true
809 types = ", ".join(sorted(dispatchable_object_parsers))
810 raise ValueError(
811 f"The rule_type was not a supported type. It must be one of {types}"
812 )
813 if object_parser_key not in dispatchable_object_parsers: 813 ↛ 814line 813 didn't jump to line 814, because the condition on line 813 was never true
814 types = ", ".join(sorted(dispatchable_object_parsers))
815 raise ValueError(
816 f"The object_parser_key was not a supported type. It must be one of {types}"
817 )
818 parent_dispatcher = dispatchable_object_parsers[rule_type]
819 child_dispatcher = dispatchable_object_parsers[object_parser_key]
820 parent_dispatcher.register_child_parser(
821 rule_name,
822 child_dispatcher,
823 self._plugin_metadata,
824 on_end_parse_step=on_end_parse_step,
825 nested_in_package_context=nested_in_package_context,
826 )
828 def _unload() -> None:
829 raise PluginInitializationError(
830 "Cannot unload pluggable_object_parser (not implemented)"
831 )
833 self._unloaders.append(_unload)
835 def pluggable_manifest_rule(
836 self,
837 rule_type: Union[TTP, str],
838 rule_name: Union[str, List[str]],
839 parsed_format: Type[PF],
840 handler: DIPHandler,
841 *,
842 source_format: Optional[SF] = None,
843 inline_reference_documentation: Optional[ParserDocumentation] = None,
844 ) -> None:
845 self._restricted_api()
846 feature_set = self._feature_set
847 parser_generator = feature_set.manifest_parser_generator
848 if isinstance(rule_type, str):
849 if rule_type not in parser_generator.dispatchable_object_parsers: 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true
850 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers))
851 raise ValueError(
852 f"The rule_type was not a supported type. It must be one of {types}"
853 )
854 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type]
855 else:
856 if rule_type not in parser_generator.dispatchable_table_parsers: 856 ↛ 857line 856 didn't jump to line 857, because the condition on line 856 was never true
857 types = ", ".join(
858 sorted(
859 x.__name__ for x in parser_generator.dispatchable_table_parsers
860 )
861 )
862 raise ValueError(
863 f"The rule_type was not a supported type. It must be one of {types}"
864 )
865 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
867 parser = feature_set.manifest_parser_generator.generate_parser(
868 parsed_format,
869 source_content=source_format,
870 inline_reference_documentation=inline_reference_documentation,
871 )
872 dispatching_parser.register_parser(
873 rule_name,
874 parser,
875 handler,
876 self._plugin_metadata,
877 )
879 def _unload() -> None:
880 raise PluginInitializationError(
881 "Cannot unload pluggable_manifest_rule (not implemented)"
882 )
884 self._unloaders.append(_unload)
886 def known_packaging_files(
887 self,
888 packaging_file_details: KnownPackagingFileInfo,
889 ) -> None:
890 known_packaging_files = self._feature_set.known_packaging_files
891 detection_method = packaging_file_details.get(
892 "detection_method", cast("Literal['path']", "path")
893 )
894 path = packaging_file_details.get("path")
895 dhpkgfile = packaging_file_details.get("pkgfile")
897 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy()
899 if detection_method == "path":
900 if dhpkgfile is not None:
901 raise ValueError(
902 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)'
903 )
904 if path != _normalize_path(path, with_prefix=False):
905 raise ValueError(
906 f"The path for known packaging files must be normalized. Please replace"
907 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"'
908 )
909 detection_value = path
910 else:
911 assert detection_method == "dh.pkgfile"
912 if path is not None:
913 raise ValueError(
914 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"'
915 )
916 if "/" in dhpkgfile:
917 raise ValueError(
918 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)'
919 )
920 detection_value = dhpkgfile
921 key = f"{detection_method}::{detection_value}"
922 existing = known_packaging_files.get(key)
923 if existing is not None:
924 if existing.plugin_metadata.plugin_name != self._plugin_name:
925 message = (
926 f'The key "{key}" is registered twice for known packaging files.'
927 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
928 )
929 else:
930 message = (
931 f"Bug in the plugin {self._plugin_name}: It tried to register the"
932 f' key "{key}" twice for known packaging files.'
933 )
934 raise PluginConflictError(
935 message, existing.plugin_metadata, self._plugin_metadata
936 )
937 _validate_known_packaging_file_dh_compat_rules(
938 packaging_file_details.get("dh_compat_rules")
939 )
940 known_packaging_files[key] = PluginProvidedKnownPackagingFile(
941 packaging_file_details,
942 detection_method,
943 detection_value,
944 self._plugin_metadata,
945 )
947 def _unload() -> None:
948 del known_packaging_files[key]
950 self._unloaders.append(_unload)
952 def register_mapped_type(
953 self,
954 type_mapping: TypeMapping,
955 *,
956 reference_documentation: Optional[TypeMappingDocumentation] = None,
957 ) -> None:
958 self._restricted_api()
959 target_type = type_mapping.target_type
960 mapped_types = self._feature_set.mapped_types
961 existing = mapped_types.get(target_type)
962 if existing is not None: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true
963 if existing.plugin_metadata.plugin_name != self._plugin_name:
964 message = (
965 f'The key "{target_type.__name__}" is registered twice for known packaging files.'
966 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
967 )
968 else:
969 message = (
970 f"Bug in the plugin {self._plugin_name}: It tried to register the"
971 f' key "{target_type.__name__}" twice for known packaging files.'
972 )
973 raise PluginConflictError(
974 message, existing.plugin_metadata, self._plugin_metadata
975 )
976 parser_generator = self._feature_set.manifest_parser_generator
977 mapped_types[target_type] = PluginProvidedTypeMapping(
978 type_mapping, reference_documentation, self._plugin_metadata
979 )
980 parser_generator.register_mapped_type(type_mapping)
982 def _restricted_api(
983 self,
984 *,
985 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(),
986 ) -> None:
987 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true
988 raise PluginAPIViolationError(
989 f"Plugin {self._plugin_name} attempted to access a debputy-only API."
990 " If you are the maintainer of this plugin and want access to this"
991 " API, please file a feature request to make this public."
992 " (The API is currently private as it is unstable.)"
993 )
996class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC):
997 __slots__ = ()
999 def _append_script(
1000 self,
1001 caller_name: str,
1002 maintscript: Maintscript,
1003 full_script: str,
1004 /,
1005 perform_substitution: bool = True,
1006 ) -> None:
1007 raise NotImplementedError
1009 @classmethod
1010 def _apply_condition_to_script(
1011 cls,
1012 condition: str,
1013 run_snippet: str,
1014 /,
1015 indent: Optional[bool] = None,
1016 ) -> str:
1017 if indent is None:
1018 # We auto-determine this based on heredocs currently
1019 indent = "<<" not in run_snippet
1021 if indent:
1022 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True))
1023 if not run_snippet.endswith("\n"):
1024 run_snippet += "\n"
1025 condition_line = f"if {condition}; then\n"
1026 end_line = "fi\n"
1027 return "".join((condition_line, run_snippet, end_line))
1029 def on_configure(
1030 self,
1031 run_snippet: str,
1032 /,
1033 indent: Optional[bool] = None,
1034 perform_substitution: bool = True,
1035 skip_on_rollback: bool = False,
1036 ) -> None:
1037 condition = POSTINST_DEFAULT_CONDITION
1038 if skip_on_rollback: 1038 ↛ 1039line 1038 didn't jump to line 1039, because the condition on line 1038 was never true
1039 condition = '[ "$1" = "configure" ]'
1040 return self._append_script(
1041 "on_configure",
1042 "postinst",
1043 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1044 perform_substitution=perform_substitution,
1045 )
1047 def on_initial_install(
1048 self,
1049 run_snippet: str,
1050 /,
1051 indent: Optional[bool] = None,
1052 perform_substitution: bool = True,
1053 ) -> None:
1054 condition = '[ "$1" = "configure" -a -z "$2" ]'
1055 return self._append_script(
1056 "on_initial_install",
1057 "postinst",
1058 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1059 perform_substitution=perform_substitution,
1060 )
1062 def on_upgrade(
1063 self,
1064 run_snippet: str,
1065 /,
1066 indent: Optional[bool] = None,
1067 perform_substitution: bool = True,
1068 ) -> None:
1069 condition = '[ "$1" = "configure" -a -n "$2" ]'
1070 return self._append_script(
1071 "on_upgrade",
1072 "postinst",
1073 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1074 perform_substitution=perform_substitution,
1075 )
1077 def on_upgrade_from(
1078 self,
1079 version: str,
1080 run_snippet: str,
1081 /,
1082 indent: Optional[bool] = None,
1083 perform_substitution: bool = True,
1084 ) -> None:
1085 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"'
1086 return self._append_script(
1087 "on_upgrade_from",
1088 "postinst",
1089 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1090 perform_substitution=perform_substitution,
1091 )
1093 def on_before_removal(
1094 self,
1095 run_snippet: str,
1096 /,
1097 indent: Optional[bool] = None,
1098 perform_substitution: bool = True,
1099 ) -> None:
1100 condition = '[ "$1" = "remove" ]'
1101 return self._append_script(
1102 "on_before_removal",
1103 "prerm",
1104 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1105 perform_substitution=perform_substitution,
1106 )
1108 def on_removed(
1109 self,
1110 run_snippet: str,
1111 /,
1112 indent: Optional[bool] = None,
1113 perform_substitution: bool = True,
1114 ) -> None:
1115 condition = '[ "$1" = "remove" ]'
1116 return self._append_script(
1117 "on_removed",
1118 "postrm",
1119 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1120 perform_substitution=perform_substitution,
1121 )
1123 def on_purge(
1124 self,
1125 run_snippet: str,
1126 /,
1127 indent: Optional[bool] = None,
1128 perform_substitution: bool = True,
1129 ) -> None:
1130 condition = '[ "$1" = "purge" ]'
1131 return self._append_script(
1132 "on_purge",
1133 "postrm",
1134 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1135 perform_substitution=perform_substitution,
1136 )
1138 def unconditionally_in_script(
1139 self,
1140 maintscript: Maintscript,
1141 run_snippet: str,
1142 /,
1143 perform_substitution: bool = True,
1144 ) -> None:
1145 if maintscript not in STD_CONTROL_SCRIPTS: 1145 ↛ 1146line 1145 didn't jump to line 1146, because the condition on line 1145 was never true
1146 raise ValueError(
1147 f'Unknown script "{maintscript}". Should have been one of:'
1148 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}'
1149 )
1150 return self._append_script(
1151 "unconditionally_in_script",
1152 maintscript,
1153 run_snippet,
1154 perform_substitution=perform_substitution,
1155 )
1158class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase):
1159 __slots__ = (
1160 "_plugin_metadata",
1161 "_maintscript_snippets",
1162 "_plugin_source_id",
1163 "_package_substitution",
1164 "_default_snippet_order",
1165 )
1167 def __init__(
1168 self,
1169 plugin_metadata: DebputyPluginMetadata,
1170 plugin_source_id: str,
1171 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1172 package_substitution: Substitution,
1173 *,
1174 default_snippet_order: Optional[Literal["service"]] = None,
1175 ):
1176 self._plugin_metadata = plugin_metadata
1177 self._plugin_source_id = plugin_source_id
1178 self._maintscript_snippets = maintscript_snippets
1179 self._package_substitution = package_substitution
1180 self._default_snippet_order = default_snippet_order
1182 def _append_script(
1183 self,
1184 caller_name: str,
1185 maintscript: Maintscript,
1186 full_script: str,
1187 /,
1188 perform_substitution: bool = True,
1189 ) -> None:
1190 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})"
1191 if perform_substitution:
1192 full_script = self._package_substitution.substitute(full_script, def_source)
1194 snippet = MaintscriptSnippet(
1195 snippet=full_script,
1196 definition_source=def_source,
1197 snippet_order=self._default_snippet_order,
1198 )
1199 self._maintscript_snippets[maintscript].append(snippet)
1202class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor):
1203 __slots__ = (
1204 "_plugin_metadata",
1205 "_plugin_source_id",
1206 "_package_metadata_context",
1207 "_triggers",
1208 "_substvars",
1209 "_maintscript",
1210 "_shlibs_details",
1211 )
1213 def __init__(
1214 self,
1215 plugin_metadata: DebputyPluginMetadata,
1216 plugin_source_id: str,
1217 package_metadata_context: PackageProcessingContext,
1218 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1219 substvars: FlushableSubstvars,
1220 shlibs_details: Tuple[Optional[str], Optional[List[str]]],
1221 ) -> None:
1222 self._plugin_metadata = plugin_metadata
1223 self._plugin_source_id = plugin_source_id
1224 self._package_metadata_context = package_metadata_context
1225 self._triggers = triggers
1226 self._substvars = substvars
1227 self._maintscript: Optional[MaintscriptAccessor] = None
1228 self._shlibs_details = shlibs_details
1230 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1231 raise NotImplementedError
1233 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None:
1234 """Register a declarative dpkg level trigger
1236 The provided trigger will be added to the package's metadata (the triggers file of the control.tar).
1238 If the trigger has already been added previously, a second call with the same trigger data will be ignored.
1239 """
1240 key = (trigger_type, trigger_target)
1241 if key in self._triggers: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true
1242 return
1243 self._triggers[key] = PluginProvidedTrigger(
1244 dpkg_trigger_type=trigger_type,
1245 dpkg_trigger_target=trigger_target,
1246 provider=self._plugin_metadata,
1247 provider_source_id=self._plugin_source_id,
1248 )
1250 @property
1251 def maintscript(self) -> MaintscriptAccessor:
1252 maintscript = self._maintscript
1253 if maintscript is None:
1254 maintscript = self._create_maintscript_accessor()
1255 self._maintscript = maintscript
1256 return maintscript
1258 @property
1259 def substvars(self) -> FlushableSubstvars:
1260 return self._substvars
1262 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None:
1263 binary_package = self._package_metadata_context.binary_package
1264 with self.substvars.flush() as substvars_file:
1265 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"]
1266 if binary_package.is_udeb:
1267 dpkg_cmd.append("-tudeb")
1268 if binary_package.is_essential: 1268 ↛ 1269line 1268 didn't jump to line 1269, because the condition on line 1268 was never true
1269 dpkg_cmd.append("-dPre-Depends")
1270 shlibs_local, shlib_dirs = self._shlibs_details
1271 if shlibs_local is not None: 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true
1272 dpkg_cmd.append(f"-L{shlibs_local}")
1273 if shlib_dirs: 1273 ↛ 1274line 1273 didn't jump to line 1274, because the condition on line 1273 was never true
1274 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs)
1275 dpkg_cmd.extend(p.fs_path for p in paths)
1276 print_command(*dpkg_cmd)
1277 try:
1278 subprocess.check_call(dpkg_cmd)
1279 except subprocess.CalledProcessError:
1280 _error(
1281 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please"
1282 " review the output from dpkg-shlibdeps above to understand what went wrong."
1283 )
1286class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase):
1287 __slots__ = (
1288 "_maintscript",
1289 "_maintscript_snippets",
1290 "_package_substitution",
1291 )
1293 def __init__(
1294 self,
1295 plugin_metadata: DebputyPluginMetadata,
1296 plugin_source_id: str,
1297 package_metadata_context: PackageProcessingContext,
1298 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1299 substvars: FlushableSubstvars,
1300 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1301 package_substitution: Substitution,
1302 shlibs_details: Tuple[Optional[str], Optional[List[str]]],
1303 *,
1304 default_snippet_order: Optional[Literal["service"]] = None,
1305 ) -> None:
1306 super().__init__(
1307 plugin_metadata,
1308 plugin_source_id,
1309 package_metadata_context,
1310 triggers,
1311 substvars,
1312 shlibs_details,
1313 )
1314 self._maintscript_snippets = maintscript_snippets
1315 self._package_substitution = package_substitution
1316 self._maintscript = MaintscriptAccessorProvider(
1317 plugin_metadata,
1318 plugin_source_id,
1319 maintscript_snippets,
1320 package_substitution,
1321 default_snippet_order=default_snippet_order,
1322 )
1324 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1325 return MaintscriptAccessorProvider(
1326 self._plugin_metadata,
1327 self._plugin_source_id,
1328 self._maintscript_snippets,
1329 self._package_substitution,
1330 )
1333class BinaryCtrlAccessorProviderCreator:
1334 def __init__(
1335 self,
1336 package_metadata_context: PackageProcessingContext,
1337 substvars: FlushableSubstvars,
1338 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1339 substitution: Substitution,
1340 ) -> None:
1341 self._package_metadata_context = package_metadata_context
1342 self._substvars = substvars
1343 self._maintscript_snippets = maintscript_snippets
1344 self._substitution = substitution
1345 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
1346 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None
1348 def for_plugin(
1349 self,
1350 plugin_metadata: DebputyPluginMetadata,
1351 plugin_source_id: str,
1352 *,
1353 default_snippet_order: Optional[Literal["service"]] = None,
1354 ) -> BinaryCtrlAccessor:
1355 return BinaryCtrlAccessorProvider(
1356 plugin_metadata,
1357 plugin_source_id,
1358 self._package_metadata_context,
1359 self._triggers,
1360 self._substvars,
1361 self._maintscript_snippets,
1362 self._substitution,
1363 self.shlibs_details,
1364 default_snippet_order=default_snippet_order,
1365 )
1367 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]:
1368 return self._triggers.values()
1371def plugin_metadata_for_debputys_own_plugin(
1372 loader: Optional[PluginInitializationEntryPoint] = None,
1373) -> DebputyPluginMetadata:
1374 if loader is None:
1375 from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features
1377 loader = initialize_debputy_features
1378 return DebputyPluginMetadata(
1379 plugin_name="debputy",
1380 api_compat_version=1,
1381 plugin_initializer=loader,
1382 plugin_loader=None,
1383 plugin_path="<bundled>",
1384 )
1387def load_plugin_features(
1388 plugin_search_dirs: Sequence[str],
1389 substitution: Substitution,
1390 requested_plugins_only: Optional[Sequence[str]] = None,
1391 required_plugins: Optional[Set[str]] = None,
1392 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None,
1393 debug_mode: bool = False,
1394) -> PluginProvidedFeatureSet:
1395 if plugin_feature_set is None:
1396 plugin_feature_set = PluginProvidedFeatureSet()
1397 plugins = [plugin_metadata_for_debputys_own_plugin()]
1398 unloadable_plugins = set()
1399 if required_plugins:
1400 plugins.extend(
1401 find_json_plugins(
1402 plugin_search_dirs,
1403 required_plugins,
1404 )
1405 )
1406 if requested_plugins_only is not None:
1407 plugins.extend(
1408 find_json_plugins(
1409 plugin_search_dirs,
1410 requested_plugins_only,
1411 )
1412 )
1413 else:
1414 auto_loaded = _find_all_json_plugins(
1415 plugin_search_dirs,
1416 required_plugins if required_plugins is not None else frozenset(),
1417 debug_mode=debug_mode,
1418 )
1419 for plugin_metadata in auto_loaded:
1420 plugins.append(plugin_metadata)
1421 unloadable_plugins.add(plugin_metadata.plugin_name)
1423 for plugin_metadata in plugins:
1424 api = DebputyPluginInitializerProvider(
1425 plugin_metadata, plugin_feature_set, substitution
1426 )
1427 try:
1428 api.load_plugin()
1429 except PluginBaseError as e:
1430 if plugin_metadata.plugin_name not in unloadable_plugins:
1431 raise
1432 if debug_mode:
1433 raise
1434 try:
1435 api.unload_plugin()
1436 except Exception:
1437 _warn(
1438 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to"
1439 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded"
1440 " module might have tainted the feature set."
1441 )
1442 raise e from None
1443 else:
1444 if debug_mode:
1445 _warn(
1446 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due"
1447 f" to --debug/-d."
1448 )
1449 _warn(
1450 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was"
1451 f" deactivated. Use debug mode (--debug) to show the stacktrace (the warning will become an error)"
1452 )
1454 return plugin_feature_set
1457def find_json_plugin(
1458 search_dirs: Sequence[str],
1459 requested_plugin: str,
1460) -> DebputyPluginMetadata:
1461 r = list(find_json_plugins(search_dirs, [requested_plugin]))
1462 assert len(r) == 1
1463 return r[0]
1466def find_related_implementation_files_for_plugin(
1467 plugin_metadata: DebputyPluginMetadata,
1468) -> List[str]:
1469 plugin_path = plugin_metadata.plugin_path
1470 if not os.path.isfile(plugin_path):
1471 plugin_name = plugin_metadata.plugin_name
1472 _error(
1473 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled"
1474 " or loaded via a mechanism that does not support detecting its tests."
1475 )
1476 files = []
1477 module_name, module_file = _find_plugin_implementation_file(
1478 plugin_metadata.plugin_name,
1479 plugin_metadata.plugin_path,
1480 )
1481 if os.path.isfile(module_file):
1482 files.append(module_file)
1483 else:
1484 if not plugin_metadata.is_loaded:
1485 plugin_metadata.load_plugin()
1486 if module_name in sys.modules:
1487 _error(
1488 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its'
1489 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be "
1490 f" installed via this method. The related Python would not be installed"
1491 f" (which would result in a plugin that would fail to load)"
1492 )
1494 return files
1497def find_tests_for_plugin(
1498 plugin_metadata: DebputyPluginMetadata,
1499) -> List[str]:
1500 plugin_name = plugin_metadata.plugin_name
1501 plugin_path = plugin_metadata.plugin_path
1503 if not os.path.isfile(plugin_path):
1504 _error(
1505 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a"
1506 " mechanism that does not support detecting its tests."
1507 )
1509 plugin_dir = os.path.dirname(plugin_path)
1510 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_")
1511 tests = []
1512 with os.scandir(plugin_dir) as dir_iter:
1513 for p in dir_iter:
1514 if (
1515 p.is_file()
1516 and p.name.startswith(test_basename_prefix)
1517 and PLUGIN_TEST_SUFFIX.search(p.name)
1518 ):
1519 tests.append(p.path)
1520 return tests
1523def find_json_plugins(
1524 search_dirs: Sequence[str],
1525 requested_plugins: Iterable[str],
1526) -> Iterable[DebputyPluginMetadata]:
1527 for plugin_name_or_path in requested_plugins:
1528 found = False
1529 if "/" in plugin_name_or_path: 1529 ↛ 1530line 1529 didn't jump to line 1530, because the condition on line 1529 was never true
1530 if not os.path.isfile(plugin_name_or_path):
1531 raise PluginNotFoundError(
1532 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file."
1533 ' (Because the plugin name contains "/", it is assumed to be a path and search path'
1534 " is not used."
1535 )
1536 yield parse_json_plugin_desc(plugin_name_or_path)
1537 return
1538 for search_dir in search_dirs:
1539 path = os.path.join(
1540 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json"
1541 )
1542 if not os.path.isfile(path): 1542 ↛ 1543line 1542 didn't jump to line 1543, because the condition on line 1542 was never true
1543 continue
1544 found = True
1545 yield parse_json_plugin_desc(path)
1546 if not found: 1546 ↛ 1547line 1546 didn't jump to line 1547, because the condition on line 1546 was never true
1547 search_dir_str = ":".join(search_dirs)
1548 raise PluginNotFoundError(
1549 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the"
1550 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})"
1551 )
1554def _find_all_json_plugins(
1555 search_dirs: Sequence[str],
1556 required_plugins: AbstractSet[str],
1557 debug_mode: bool = False,
1558) -> Iterable[DebputyPluginMetadata]:
1559 seen = set(required_plugins)
1560 error_seen = False
1561 for search_dir in search_dirs:
1562 try:
1563 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins"))
1564 except FileNotFoundError:
1565 continue
1566 with dir_fd:
1567 for entry in dir_fd:
1568 if (
1569 not entry.is_file(follow_symlinks=True)
1570 or not entry.name.endswith(".json")
1571 or entry.name in seen
1572 ):
1573 continue
1574 try:
1575 plugin_metadata = parse_json_plugin_desc(entry.path)
1576 except PluginBaseError as e:
1577 if debug_mode:
1578 raise
1579 if not error_seen:
1580 error_seen = True
1581 _warn(
1582 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}"
1583 )
1584 else:
1585 _warn(
1586 f"Failed to load plugin in {entry.path} due to errors (not shown)."
1587 )
1588 else:
1589 yield plugin_metadata
1592def _find_plugin_implementation_file(
1593 plugin_name: str,
1594 json_file_path: str,
1595) -> Tuple[str, str]:
1596 guessed_module_basename = plugin_name.replace("-", "_")
1597 module_name = f"debputy.plugin.{guessed_module_basename}"
1598 module_fs_path = os.path.join(
1599 os.path.dirname(json_file_path), f"{guessed_module_basename}.py"
1600 )
1601 return module_name, module_fs_path
1604def _resolve_module_initializer(
1605 plugin_name: str,
1606 plugin_initializer_name: str,
1607 module_name: Optional[str],
1608 json_file_path: str,
1609) -> PluginInitializationEntryPoint:
1610 module = None
1611 module_fs_path = None
1612 if module_name is None: 1612 ↛ 1640line 1612 didn't jump to line 1640, because the condition on line 1612 was never false
1613 module_name, module_fs_path = _find_plugin_implementation_file(
1614 plugin_name, json_file_path
1615 )
1616 if os.path.isfile(module_fs_path): 1616 ↛ 1640line 1616 didn't jump to line 1640, because the condition on line 1616 was never false
1617 spec = importlib.util.spec_from_file_location(module_name, module_fs_path)
1618 if spec is None: 1618 ↛ 1619line 1618 didn't jump to line 1619, because the condition on line 1618 was never true
1619 raise PluginInitializationError(
1620 f"Failed to load {plugin_name} (path: {module_fs_path})."
1621 " The spec_from_file_location function returned None."
1622 )
1623 mod = importlib.util.module_from_spec(spec)
1624 loader = spec.loader
1625 if loader is None: 1625 ↛ 1626line 1625 didn't jump to line 1626, because the condition on line 1625 was never true
1626 raise PluginInitializationError(
1627 f"Failed to load {plugin_name} (path: {module_fs_path})."
1628 " Python could not find a suitable loader (spec.loader was None)"
1629 )
1630 sys.modules[module_name] = mod
1631 try:
1632 loader.exec_module(mod)
1633 except (Exception, GeneratorExit) as e:
1634 raise PluginInitializationError(
1635 f"Failed to load {plugin_name} (path: {module_fs_path})."
1636 " The module threw an exception while being loaded."
1637 ) from e
1638 module = mod
1640 if module is None: 1640 ↛ 1641line 1640 didn't jump to line 1641, because the condition on line 1640 was never true
1641 try:
1642 module = importlib.import_module(module_name)
1643 except ModuleNotFoundError as e:
1644 if module_fs_path is None:
1645 raise PluginMetadataError(
1646 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but'
1647 " this module is not available in the python search path"
1648 ) from e
1649 raise PluginInitializationError(
1650 f"Failed to load {plugin_name}. Tried loading it from"
1651 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as'
1652 f" {module_name} (where it was not found either). Please ensure"
1653 " the module code is installed in the correct spot or provide an"
1654 f' explicit "module" definition in {json_file_path}.'
1655 ) from e
1657 plugin_initializer = getattr(module, plugin_initializer_name)
1659 if plugin_initializer is None: 1659 ↛ 1660line 1659 didn't jump to line 1660, because the condition on line 1659 was never true
1660 raise PluginMetadataError(
1661 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
1662 f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin"
1663 f" metadata or initializer name in the Python module."
1664 )
1665 return cast("PluginInitializationEntryPoint", plugin_initializer)
1668def _json_plugin_loader(
1669 plugin_name: str,
1670 plugin_json_metadata: PluginJsonMetadata,
1671 json_file_path: str,
1672 attribute_path: AttributePath,
1673) -> Callable[["DebputyPluginInitializer"], None]:
1674 api_compat = plugin_json_metadata["api_compat_version"]
1675 module_name = plugin_json_metadata.get("module")
1676 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer")
1677 packager_provided_files_raw = plugin_json_metadata.get(
1678 "packager_provided_files", []
1679 )
1680 manifest_variables_raw = plugin_json_metadata.get("manifest_variables")
1681 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files")
1682 if api_compat != 1: 1682 ↛ 1683line 1682 didn't jump to line 1683, because the condition on line 1682 was never true
1683 raise PluginMetadataError(
1684 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this'
1685 f" version of debputy only supports API compat version of 1"
1686 )
1687 if plugin_initializer_name is not None and "." in plugin_initializer_name: 1687 ↛ 1688line 1687 didn't jump to line 1688, because the condition on line 1687 was never true
1688 p = attribute_path["plugin_initializer"]
1689 raise PluginMetadataError(
1690 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".'
1691 )
1693 plugin_initializers = []
1695 if plugin_initializer_name is not None:
1696 plugin_initializer = _resolve_module_initializer(
1697 plugin_name,
1698 plugin_initializer_name,
1699 module_name,
1700 json_file_path,
1701 )
1702 plugin_initializers.append(plugin_initializer)
1704 if known_packaging_files_raw: 1704 ↛ 1705line 1704 didn't jump to line 1705, because the condition on line 1704 was never true
1705 kpf_root_path = attribute_path["known_packaging_files"]
1706 known_packaging_files = []
1707 for k, v in enumerate(known_packaging_files_raw):
1708 kpf_path = kpf_root_path[k]
1709 p = v.get("path")
1710 if isinstance(p, str):
1711 kpf_path.path_hint = p
1712 if plugin_name.startswith("debputy-") and isinstance(v, dict):
1713 docs = v.get("documentation-uris")
1714 if docs is not None and isinstance(docs, list):
1715 docs = [
1716 (
1717 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR)
1718 if isinstance(d, str)
1719 else d
1720 )
1721 for d in docs
1722 ]
1723 v["documentation-uris"] = docs
1724 known_packaging_file: KnownPackagingFileInfo = (
1725 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input(
1726 v,
1727 kpf_path,
1728 )
1729 )
1730 known_packaging_files.append((kpf_path, known_packaging_file))
1732 def _initialize_json_provided_known_packaging_files(
1733 api: DebputyPluginInitializerProvider,
1734 ) -> None:
1735 for p, details in known_packaging_files:
1736 try:
1737 api.known_packaging_files(details)
1738 except ValueError as ex:
1739 raise PluginMetadataError(
1740 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}"
1741 )
1743 plugin_initializers.append(_initialize_json_provided_known_packaging_files)
1745 if manifest_variables_raw:
1746 manifest_var_path = attribute_path["manifest_variables"]
1747 manifest_variables = [
1748 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i])
1749 for i, p in enumerate(manifest_variables_raw)
1750 ]
1752 def _initialize_json_provided_manifest_vars(
1753 api: DebputyPluginInitializer,
1754 ) -> None:
1755 for idx, manifest_variable in enumerate(manifest_variables):
1756 name = manifest_variable["name"]
1757 value = manifest_variable["value"]
1758 doc = manifest_variable.get("reference_documentation")
1759 try:
1760 api.manifest_variable(
1761 name, value, variable_reference_documentation=doc
1762 )
1763 except ValueError as ex:
1764 var_path = manifest_var_path[idx]
1765 raise PluginMetadataError(
1766 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}"
1767 )
1769 plugin_initializers.append(_initialize_json_provided_manifest_vars)
1771 if packager_provided_files_raw:
1772 ppf_path = attribute_path["packager_provided_files"]
1773 ppfs = [
1774 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i])
1775 for i, p in enumerate(packager_provided_files_raw)
1776 ]
1778 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None:
1779 ppf: PackagerProvidedFileJsonDescription
1780 for idx, ppf in enumerate(ppfs):
1781 c = dict(ppf)
1782 stem = ppf["stem"]
1783 installed_path = ppf["installed_path"]
1784 default_mode = ppf.get("default_mode")
1785 ref_doc_dict = ppf.get("reference_documentation")
1786 if default_mode is not None: 1786 ↛ 1789line 1786 didn't jump to line 1789, because the condition on line 1786 was never false
1787 c["default_mode"] = default_mode.octal_mode
1789 if ref_doc_dict is not None: 1789 ↛ 1794line 1789 didn't jump to line 1794, because the condition on line 1789 was never false
1790 ref_doc = packager_provided_file_reference_documentation(
1791 **ref_doc_dict
1792 )
1793 else:
1794 ref_doc = None
1796 for k in [
1797 "stem",
1798 "installed_path",
1799 "reference_documentation",
1800 ]:
1801 try:
1802 del c[k]
1803 except KeyError:
1804 pass
1806 try:
1807 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore
1808 except ValueError as ex:
1809 p_path = ppf_path[idx]
1810 raise PluginMetadataError(
1811 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}"
1812 )
1814 plugin_initializers.append(_initialize_json_provided_ppfs)
1816 if not plugin_initializers: 1816 ↛ 1817line 1816 didn't jump to line 1817, because the condition on line 1816 was never true
1817 raise PluginMetadataError(
1818 f"The plugin defined in {json_file_path} does not seem to provide features, "
1819 f" such as module + plugin-initializer or packager-provided-files."
1820 )
1822 if len(plugin_initializers) == 1:
1823 return plugin_initializers[0]
1825 def _chain_loader(api: DebputyPluginInitializer) -> None:
1826 for initializer in plugin_initializers:
1827 initializer(api)
1829 return _chain_loader
1832@contextlib.contextmanager
1833def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]:
1834 if fd is not None:
1835 yield fd
1836 else:
1837 with open(path, "rb") as fd:
1838 yield fd
1841def parse_json_plugin_desc(
1842 path: str, *, fd: Optional[IO[bytes]] = None
1843) -> DebputyPluginMetadata:
1844 with _open(path, fd=fd) as rfd:
1845 try:
1846 raw = json.load(rfd)
1847 except JSONDecodeError as e:
1848 raise PluginMetadataError(
1849 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}'
1850 ) from e
1851 plugin_name = os.path.basename(path)
1852 if plugin_name.endswith(".json"):
1853 plugin_name = plugin_name[:-5]
1854 elif plugin_name.endswith(".json.in"):
1855 plugin_name = plugin_name[:-8]
1857 if plugin_name == "debputy": 1857 ↛ 1859line 1857 didn't jump to line 1859, because the condition on line 1857 was never true
1858 # Provide a better error message than "The plugin has already loaded!?"
1859 raise PluginMetadataError(
1860 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not'
1861 f" clash with the bundled plugin of same name."
1862 )
1864 attribute_path = AttributePath.root_path()
1866 try:
1867 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input(
1868 raw,
1869 attribute_path,
1870 )
1871 except ManifestParseException as e:
1872 raise PluginMetadataError(
1873 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}'
1874 ) from e
1875 api_compat = plugin_json_metadata["api_compat_version"]
1877 return DebputyPluginMetadata(
1878 plugin_name=plugin_name,
1879 plugin_loader=lambda: _json_plugin_loader(
1880 plugin_name,
1881 plugin_json_metadata,
1882 path,
1883 attribute_path,
1884 ),
1885 api_compat_version=api_compat,
1886 plugin_initializer=None,
1887 plugin_path=path,
1888 )
1891@dataclasses.dataclass(slots=True, frozen=True)
1892class ServiceDefinitionImpl(ServiceDefinition[DSD]):
1893 name: str
1894 names: Sequence[str]
1895 path: VirtualPath
1896 type_of_service: str
1897 service_scope: str
1898 auto_enable_on_install: bool
1899 auto_start_on_install: bool
1900 on_upgrade: ServiceUpgradeRule
1901 definition_source: str
1902 is_plugin_provided_definition: bool
1903 service_context: Optional[DSD]
1905 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]":
1906 return dataclasses.replace(self, **changes)
1909class ServiceRegistryImpl(ServiceRegistry[DSD]):
1910 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services")
1912 def __init__(self, service_manager_details: ServiceManagerDetails) -> None:
1913 self._service_manager_details = service_manager_details
1914 self._service_definitions: List[ServiceDefinition[DSD]] = []
1915 self._seen_services = set()
1917 @property
1918 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]:
1919 return self._service_definitions
1921 def register_service(
1922 self,
1923 path: VirtualPath,
1924 name: Union[str, List[str]],
1925 *,
1926 type_of_service: str = "service", # "timer", etc.
1927 service_scope: str = "system",
1928 enable_by_default: bool = True,
1929 start_by_default: bool = True,
1930 default_upgrade_rule: ServiceUpgradeRule = "restart",
1931 service_context: Optional[DSD] = None,
1932 ) -> None:
1933 names = name if isinstance(name, list) else [name]
1934 if len(names) < 1:
1935 raise ValueError(
1936 f"The service must have at least one name - {path.absolute} did not have any"
1937 )
1938 for n in names:
1939 key = (n, type_of_service, service_scope)
1940 if key in self._seen_services:
1941 raise PluginAPIViolationError(
1942 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used"
1943 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not"
1944 " allowed by the debputy plugin API."
1945 )
1946 # TODO: We cannot create a service definition immediate once the manifest is involved
1947 self._service_definitions.append(
1948 ServiceDefinitionImpl(
1949 names[0],
1950 names,
1951 path,
1952 type_of_service,
1953 service_scope,
1954 enable_by_default,
1955 start_by_default,
1956 default_upgrade_rule,
1957 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}",
1958 True,
1959 service_context,
1960 )
1961 )