Coverage for src/debputy/plugin/api/test_api/test_impl.py: 82%
296 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from io import BytesIO
6from typing import (
7 Mapping,
8 Dict,
9 Optional,
10 Tuple,
11 List,
12 cast,
13 FrozenSet,
14 Sequence,
15 Union,
16 Type,
17 Iterator,
18 Set,
19 KeysView,
20 Callable,
21)
23from debian.deb822 import Deb822
24from debian.substvars import Substvars
26from debputy import DEBPUTY_PLUGIN_ROOT_DIR
27from debputy.architecture_support import faked_arch_table
28from debputy.filesystem_scan import FSROOverlay, FSRootDir
29from debputy.packages import BinaryPackage
30from debputy.plugin.api import (
31 PluginInitializationEntryPoint,
32 VirtualPath,
33 PackageProcessingContext,
34 DpkgTriggerType,
35 Maintscript,
36)
37from debputy.plugin.api.example_processing import process_discard_rule_example
38from debputy.plugin.api.impl import (
39 plugin_metadata_for_debputys_own_plugin,
40 DebputyPluginInitializerProvider,
41 parse_json_plugin_desc,
42 MaintscriptAccessorProviderBase,
43 BinaryCtrlAccessorProviderBase,
44 PLUGIN_TEST_SUFFIX,
45 find_json_plugin,
46 ServiceDefinitionImpl,
47)
48from debputy.plugin.api.impl_types import (
49 PackagerProvidedFileClassSpec,
50 DebputyPluginMetadata,
51 PluginProvidedTrigger,
52 ServiceManagerDetails,
53)
54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
55from debputy.plugin.api.spec import (
56 MaintscriptAccessor,
57 FlushableSubstvars,
58 ServiceRegistry,
59 DSD,
60 ServiceUpgradeRule,
61)
62from debputy.plugin.api.test_api.test_spec import (
63 InitializedPluginUnderTest,
64 RegisteredPackagerProvidedFile,
65 RegisteredTrigger,
66 RegisteredMaintscript,
67 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
68 ADRExampleIssue,
69 DetectedService,
70 RegisteredMetadata,
71)
72from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features
73from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
74from debputy.util import package_cross_check_precheck
76RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
79@dataclasses.dataclass(frozen=True, slots=True)
80class PackageProcessingContextTestProvider(PackageProcessingContext):
81 binary_package: BinaryPackage
82 binary_package_version: str
83 related_udeb_package: Optional[BinaryPackage]
84 related_udeb_package_version: Optional[str]
85 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]]
88def _initialize_plugin_under_test(
89 plugin_metadata: DebputyPluginMetadata,
90 load_debputy_plugin: bool = True,
91) -> "InitializedPluginUnderTest":
92 feature_set = PluginProvidedFeatureSet()
93 substitution = SubstitutionImpl(
94 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
95 variable_context=VariableContext(
96 FSROOverlay.create_root_dir("debian", "debian"),
97 ),
98 plugin_feature_set=feature_set,
99 )
101 if load_debputy_plugin:
102 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
103 initialize_debputy_features
104 )
105 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
106 debputy_provider = DebputyPluginInitializerProvider(
107 debputy_plugin_metadata,
108 feature_set,
109 substitution,
110 )
111 debputy_provider.load_plugin()
113 plugin_under_test_provider = DebputyPluginInitializerProvider(
114 plugin_metadata,
115 feature_set,
116 substitution,
117 )
118 plugin_under_test_provider.load_plugin()
120 return InitializedPluginUnderTestImpl(
121 plugin_metadata.plugin_name,
122 feature_set,
123 substitution,
124 )
127def _auto_load_plugin_from_filename(
128 py_test_filename: str,
129) -> "InitializedPluginUnderTest":
130 dirname, basename = os.path.split(py_test_filename)
131 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
133 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
134 if test_location == "uninstalled":
135 json_basename = f"{plugin_name}.json"
136 json_desc_file = os.path.join(dirname, json_basename)
137 if "/" not in json_desc_file: 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true
138 json_desc_file = f"./{json_desc_file}"
140 if os.path.isfile(json_desc_file): 140 ↛ 143line 140 didn't jump to line 143, because the condition on line 140 was never false
141 return _initialize_plugin_from_desc(json_desc_file)
143 json_desc_file_in = f"{json_desc_file}.in"
144 if os.path.isfile(json_desc_file_in):
145 return _initialize_plugin_from_desc(json_desc_file)
146 raise FileNotFoundError(
147 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
148 f" {json_desc_file} or {json_desc_file_in}"
149 )
151 if test_location == "installed": 151 ↛ 155line 151 didn't jump to line 155, because the condition on line 151 was never false
152 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
153 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
155 raise ValueError(
156 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
157 ' unset OR one of "installed", "uninstalled".'
158 )
161def initialize_plugin_under_test(
162 *,
163 plugin_desc_file: Optional[str] = None,
164) -> "InitializedPluginUnderTest":
165 """Load and initialize a plugin for testing it
167 This method will load the plugin via plugin description, which is the method that `debputy` does at
168 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
169 of the flow).
171 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
172 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
173 This works for "single-file" plugins, where the description file and the test are right next to
174 each other.
176 Note that the description file is *not* required to a valid version at this stage (e.g., "N/A" or
177 "@PLACEHOLDER@") is fine. So you still use this method if you substitute in the version during
178 build after running the tests. To support this flow, the file name can also end with `.json.in`
179 (instead of `.json`).
180 :return: The loaded plugin for testing
181 """
182 if plugin_desc_file is None:
183 caller_file = inspect.stack()[1].filename
184 return _auto_load_plugin_from_filename(caller_file)
185 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true
186 raise RuntimeError(
187 "Running the test against an installed plugin does not work when"
188 " plugin_desc_file is provided. Please skip this test. You can "
189 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
190 " conditional for this purpose."
191 )
192 return _initialize_plugin_from_desc(plugin_desc_file)
195def _initialize_plugin_from_desc(
196 desc_file: str,
197) -> "InitializedPluginUnderTest":
198 if not desc_file.endswith((".json", ".json.in")): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true
199 raise ValueError("The plugin file must end with .json or .json.in")
201 plugin_metadata = parse_json_plugin_desc(desc_file)
203 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
206def initialize_plugin_under_test_from_inline_json(
207 plugin_name: str,
208 json_content: str,
209) -> "InitializedPluginUnderTest":
210 with BytesIO(json_content.encode("utf-8")) as fd:
211 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
213 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
216def initialize_plugin_under_test_preloaded(
217 api_compat_version: int,
218 plugin_initializer: PluginInitializationEntryPoint,
219 /,
220 plugin_name: str = "plugin-under-test",
221 load_debputy_plugin: bool = True,
222) -> "InitializedPluginUnderTest":
223 """Internal API: Initialize a plugin for testing without loading it from a file
225 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
226 description file is correct. Notably, any feature provided via the JSON description file will
227 **NOT** be visible for the test.
229 This API is mostly useful for testing parts of debputy itself.
231 :param api_compat_version: The API version the plugin was written for. Use the same version as the
232 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
233 :param plugin_initializer: The entry point of the plugin
234 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
235 use a test name and version. However, you can explicitly set if you want the real name/version.
236 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
237 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
238 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
239 to enable debputy to use this method for self testing.
240 :return: The loaded plugin for testing
241 """
243 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true
244 raise RuntimeError(
245 "Running the test against an installed plugin does not work when"
246 " the plugin is preload. Please skip this test. You can "
247 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
248 " conditional for this purpose."
249 )
251 plugin_metadata = DebputyPluginMetadata(
252 plugin_name=plugin_name,
253 api_compat_version=api_compat_version,
254 plugin_initializer=plugin_initializer,
255 plugin_loader=None,
256 plugin_path="<loaded-via-test>",
257 )
259 return _initialize_plugin_under_test(
260 plugin_metadata,
261 load_debputy_plugin=load_debputy_plugin,
262 )
265class _MockArchTable:
266 @staticmethod
267 def matches_architecture(_a: str, _b: str) -> bool:
268 return True
271FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable())
272del _MockArchTable
275def package_metadata_context(
276 *,
277 host_arch: str = "amd64",
278 package_fields: Optional[Dict[str, str]] = None,
279 related_udeb_package_fields: Optional[Dict[str, str]] = None,
280 binary_package_version: str = "1.0-1",
281 related_udeb_package_version: Optional[str] = None,
282 should_be_acted_on: bool = True,
283 related_udeb_fs_root: Optional[VirtualPath] = None,
284 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(),
285) -> PackageProcessingContext:
286 process_table = faked_arch_table(host_arch)
287 f = {
288 "Package": "foo",
289 "Architecture": "any",
290 }
291 if package_fields is not None:
292 f.update(package_fields)
294 bin_package = BinaryPackage(
295 Deb822(f),
296 process_table,
297 FAKE_DPKG_QUERY_TABLE,
298 is_main_package=True,
299 should_be_acted_on=should_be_acted_on,
300 )
301 udeb_package = None
302 if related_udeb_package_fields is not None: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true
303 uf = dict(related_udeb_package_fields)
304 uf.setdefault("Package", f'{f["Package"]}-udeb')
305 uf.setdefault("Architecture", f["Architecture"])
306 uf.setdefault("Package-Type", "udeb")
307 udeb_package = BinaryPackage(
308 Deb822(uf),
309 process_table,
310 FAKE_DPKG_QUERY_TABLE,
311 is_main_package=False,
312 should_be_acted_on=True,
313 )
314 if related_udeb_package_version is None:
315 related_udeb_package_version = binary_package_version
316 if accessible_package_roots:
317 apr = []
318 for fields, apr_fs_root in accessible_package_roots:
319 apr_fields = Deb822(dict(fields))
320 if "Package" not in apr_fields: 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true
321 raise ValueError(
322 "Missing mandatory Package field in member of accessible_package_roots"
323 )
324 if "Architecture" not in apr_fields: 324 ↛ 325line 324 didn't jump to line 325, because the condition on line 324 was never true
325 raise ValueError(
326 "Missing mandatory Architecture field in member of accessible_package_roots"
327 )
328 apr_package = BinaryPackage(
329 apr_fields,
330 process_table,
331 FAKE_DPKG_QUERY_TABLE,
332 is_main_package=False,
333 should_be_acted_on=True,
334 )
335 r = package_cross_check_precheck(bin_package, apr_package)
336 if not r[0]: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 raise ValueError(
338 f"{apr_package.name} would not be accessible for {bin_package.name}"
339 )
340 apr.append((apr_package, apr_fs_root))
342 if related_udeb_fs_root is not None: 342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true
343 if udeb_package is None:
344 raise ValueError(
345 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
346 )
347 r = package_cross_check_precheck(bin_package, udeb_package)
348 if not r[0]:
349 raise ValueError(
350 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
351 " related_udeb_fs_root is irrelevant"
352 )
353 apr.append(udeb_package)
354 apr = tuple(apr)
355 else:
356 apr = tuple()
358 return PackageProcessingContextTestProvider(
359 binary_package=bin_package,
360 related_udeb_package=udeb_package,
361 binary_package_version=binary_package_version,
362 related_udeb_package_version=related_udeb_package_version,
363 accessible_package_roots=lambda: apr,
364 )
367def manifest_variable_resolution_context(
368 *,
369 debian_dir: Optional[VirtualPath] = None,
370) -> VariableContext:
371 if debian_dir is None:
372 debian_dir = FSRootDir()
374 return VariableContext(debian_dir)
377class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
378 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
380 def __init__(
381 self,
382 plugin_metadata: DebputyPluginMetadata,
383 plugin_source_id: str,
384 maintscript_container: Dict[str, List[RegisteredMaintscript]],
385 ):
386 self._plugin_metadata = plugin_metadata
387 self._plugin_source_id = plugin_source_id
388 self._maintscript_container = maintscript_container
390 @classmethod
391 def _apply_condition_to_script(
392 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None
393 ) -> str:
394 return run_snippet
396 def _append_script(
397 self,
398 caller_name: str,
399 maintscript: Maintscript,
400 full_script: str,
401 /,
402 perform_substitution: bool = True,
403 ) -> None:
404 if self._plugin_source_id not in self._maintscript_container:
405 self._maintscript_container[self._plugin_source_id] = []
406 self._maintscript_container[self._plugin_source_id].append(
407 RegisteredMaintscript(
408 maintscript,
409 caller_name,
410 full_script,
411 perform_substitution,
412 )
413 )
416class RegisteredMetadataImpl(RegisteredMetadata):
417 __slots__ = (
418 "_substvars",
419 "_triggers",
420 "_maintscripts",
421 )
423 def __init__(
424 self,
425 substvars: Substvars,
426 triggers: List[RegisteredTrigger],
427 maintscripts: List[RegisteredMaintscript],
428 ) -> None:
429 self._substvars = substvars
430 self._triggers = triggers
431 self._maintscripts = maintscripts
433 @property
434 def substvars(self) -> Substvars:
435 return self._substvars
437 @property
438 def triggers(self) -> List[RegisteredTrigger]:
439 return self._triggers
441 def maintscripts(
442 self,
443 *,
444 maintscript: Optional[Maintscript] = None,
445 ) -> List[RegisteredMaintscript]:
446 if maintscript is None:
447 return self._maintscripts
448 return [m for m in self._maintscripts if m.maintscript == maintscript]
451class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
452 __slots__ = ("_maintscript_container",)
454 def __init__(
455 self,
456 plugin_metadata: DebputyPluginMetadata,
457 plugin_source_id: str,
458 context: PackageProcessingContext,
459 ) -> None:
460 super().__init__(
461 plugin_metadata,
462 plugin_source_id,
463 context,
464 {},
465 FlushableSubstvars(),
466 (None, None),
467 )
468 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
470 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
471 return MaintscriptAccessorTestProvider(
472 self._plugin_metadata,
473 self._plugin_source_id,
474 self._maintscript_container,
475 )
477 def registered_metadata(self) -> RegisteredMetadata:
478 return RegisteredMetadataImpl(
479 self._substvars,
480 [
481 RegisteredTrigger.from_plugin_provided_trigger(t)
482 for t in self._triggers.values()
483 if t.provider_source_id == self._plugin_source_id
484 ],
485 self._maintscript_container.get(self._plugin_source_id, []),
486 )
489class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
490 __slots__ = ("_service_manager_details", "_service_definitions")
492 def __init__(
493 self,
494 service_manager_details: ServiceManagerDetails,
495 detected_services: List[DetectedService[DSD]],
496 ) -> None:
497 self._service_manager_details = service_manager_details
498 self._service_definitions = detected_services
500 def register_service(
501 self,
502 path: VirtualPath,
503 name: Union[str, List[str]],
504 *,
505 type_of_service: str = "service", # "timer", etc.
506 service_scope: str = "system",
507 enable_by_default: bool = True,
508 start_by_default: bool = True,
509 default_upgrade_rule: ServiceUpgradeRule = "restart",
510 service_context: Optional[DSD] = None,
511 ) -> None:
512 names = name if isinstance(name, list) else [name]
513 if len(names) < 1: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true
514 raise ValueError(
515 f"The service must have at least one name - {path.absolute} did not have any"
516 )
517 self._service_definitions.append(
518 DetectedService(
519 path,
520 names,
521 type_of_service,
522 service_scope,
523 enable_by_default,
524 start_by_default,
525 default_upgrade_rule,
526 service_context,
527 )
528 )
531@contextlib.contextmanager
532def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
533 if fs_root.is_read_write: 533 ↛ 539line 533 didn't jump to line 539, because the condition on line 533 was never false
534 assert isinstance(fs_root, FSRootDir)
535 fs_root.is_read_write = False
536 yield fs_root
537 fs_root.is_read_write = True
538 else:
539 yield fs_root
542class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
543 def __init__(
544 self,
545 plugin_name: str,
546 feature_set: PluginProvidedFeatureSet,
547 substitution: SubstitutionImpl,
548 ) -> None:
549 self._feature_set = feature_set
550 self._plugin_name = plugin_name
551 self._packager_provided_files: Optional[
552 Dict[str, RegisteredPackagerProvidedFile]
553 ] = None
554 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
555 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
556 self._substitution = substitution
557 assert plugin_name in self._feature_set.plugin_data
559 @property
560 def _plugin_metadata(self) -> DebputyPluginMetadata:
561 return self._feature_set.plugin_data[self._plugin_name]
563 def packager_provided_files_by_stem(
564 self,
565 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
566 ppf = self._packager_provided_files
567 if ppf is None:
568 result: Dict[str, RegisteredPackagerProvidedFile] = {}
569 for spec in self._feature_set.packager_provided_files.values():
570 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
571 continue
572 # Registered as a virtual subclass, so this should always be True
573 assert isinstance(spec, RegisteredPackagerProvidedFile)
574 result[spec.stem] = spec
575 self._packager_provided_files = result
576 ppf = result
577 return ppf
579 def run_metadata_detector(
580 self,
581 metadata_detector_id: str,
582 fs_root: VirtualPath,
583 context: Optional[PackageProcessingContext] = None,
584 ) -> RegisteredMetadata:
585 if fs_root.parent_dir is not None: 585 ↛ 586line 585 didn't jump to line 586, because the condition on line 585 was never true
586 raise ValueError("Provided path must be the file system root.")
587 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
588 matching_detectors = [
589 d for d in detectors if d.detector_id == metadata_detector_id
590 ]
591 if len(matching_detectors) != 1: 591 ↛ 592line 591 didn't jump to line 592, because the condition on line 591 was never true
592 assert not matching_detectors
593 raise ValueError(
594 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
595 f' "{metadata_detector_id}"'
596 )
597 if context is None:
598 context = package_metadata_context()
599 detector = matching_detectors[0]
600 if not detector.applies_to(context.binary_package):
601 raise ValueError(
602 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
603 " given package. Consider using `package_metadata_context()` to emulate a binary package"
604 " with the correct specification. As an example: "
605 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
606 " package."
607 )
609 ctrl = BinaryCtrlAccessorTestProvider(
610 self._plugin_metadata,
611 metadata_detector_id,
612 context,
613 )
614 with _read_only_fs_root(fs_root) as ro_root:
615 detector.run_detector(
616 ro_root,
617 ctrl,
618 context,
619 )
620 return ctrl.registered_metadata()
622 def run_package_processor(
623 self,
624 package_processor_id: str,
625 fs_root: VirtualPath,
626 context: Optional[PackageProcessingContext] = None,
627 ) -> None:
628 if fs_root.parent_dir is not None: 628 ↛ 629line 628 didn't jump to line 629, because the condition on line 628 was never true
629 raise ValueError("Provided path must be the file system root.")
630 pp_key = (self._plugin_name, package_processor_id)
631 package_processor = self._feature_set.all_package_processors.get(pp_key)
632 if package_processor is None: 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true
633 raise ValueError(
634 f"The plugin {self._plugin_name} did not provide a package processor with ID"
635 f' "{package_processor_id}"'
636 )
637 if context is None: 637 ↛ 639line 637 didn't jump to line 639, because the condition on line 637 was never false
638 context = package_metadata_context()
639 if not fs_root.is_read_write: 639 ↛ 640line 639 didn't jump to line 640, because the condition on line 639 was never true
640 raise ValueError(
641 "The provided fs_root is read-only and it must be read-write for package processor"
642 )
643 if not package_processor.applies_to(context.binary_package): 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true
644 raise ValueError(
645 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
646 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
647 " package with the correct specification. As an example: "
648 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
649 " package."
650 )
651 package_processor.run_package_processor(
652 fs_root,
653 None,
654 context,
655 )
657 @property
658 def declared_manifest_variables(self) -> FrozenSet[str]:
659 return frozenset(
660 {
661 k
662 for k, v in self._feature_set.manifest_variables.items()
663 if v.plugin_metadata.plugin_name == self._plugin_name
664 }
665 )
667 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
668 issues = []
669 for adr in self._feature_set.auto_discard_rules.values():
670 if adr.plugin_metadata.plugin_name != self._plugin_name: 670 ↛ 671line 670 didn't jump to line 671, because the condition on line 670 was never true
671 continue
672 for idx, example in enumerate(adr.examples):
673 result = process_discard_rule_example(
674 adr,
675 example,
676 )
677 if result.inconsistent_paths:
678 issues.append(
679 ADRExampleIssue(
680 adr.name,
681 idx,
682 [
683 x.absolute + ("/" if x.is_dir else "")
684 for x in result.inconsistent_paths
685 ],
686 )
687 )
688 return issues
690 def run_service_detection_and_integrations(
691 self,
692 service_manager: str,
693 fs_root: VirtualPath,
694 context: Optional[PackageProcessingContext] = None,
695 *,
696 service_context_type_hint: Optional[Type[DSD]] = None,
697 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]:
698 if fs_root.parent_dir is not None: 698 ↛ 699line 698 didn't jump to line 699, because the condition on line 698 was never true
699 raise ValueError("Provided path must be the file system root.")
700 try:
701 service_manager_details = self._feature_set.service_managers[
702 service_manager
703 ]
704 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 704 ↛ 705line 704 didn't jump to line 705, because the condition on line 704 was never true
705 raise KeyError(service_manager)
706 except KeyError:
707 raise ValueError(
708 f"The plugin {self._plugin_name} does not provide a"
709 f" service manager called {service_manager}"
710 ) from None
712 if context is None: 712 ↛ 714line 712 didn't jump to line 714, because the condition on line 712 was never false
713 context = package_metadata_context()
714 detected_services: List[DetectedService[DSD]] = []
715 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
716 service_manager_details.service_detector(
717 fs_root,
718 registry,
719 context,
720 )
721 ctrl = BinaryCtrlAccessorTestProvider(
722 self._plugin_metadata,
723 service_manager_details.service_manager,
724 context,
725 )
726 if detected_services:
727 service_definitions = [
728 ServiceDefinitionImpl(
729 ds.names[0],
730 ds.names,
731 ds.path,
732 ds.type_of_service,
733 ds.service_scope,
734 ds.enable_by_default,
735 ds.start_by_default,
736 ds.default_upgrade_rule,
737 self._plugin_name,
738 True,
739 ds.service_context,
740 )
741 for ds in detected_services
742 ]
743 service_manager_details.service_integrator(
744 service_definitions,
745 ctrl,
746 context,
747 )
748 return detected_services, ctrl.registered_metadata()
750 def manifest_variables(
751 self,
752 *,
753 resolution_context: Optional[VariableContext] = None,
754 mocked_variables: Optional[Mapping[str, str]] = None,
755 ) -> Mapping[str, str]:
756 valid_manifest_variables = frozenset(
757 {
758 n
759 for n, v in self._feature_set.manifest_variables.items()
760 if v.plugin_metadata.plugin_name == self._plugin_name
761 }
762 )
763 if resolution_context is None:
764 resolution_context = manifest_variable_resolution_context()
765 substitution = self._substitution.copy_for_subst_test(
766 self._feature_set,
767 resolution_context,
768 extra_substitutions=mocked_variables,
769 )
770 return SubstitutionTable(
771 valid_manifest_variables,
772 substitution,
773 )
776class SubstitutionTable(Mapping[str, str]):
777 def __init__(
778 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution
779 ) -> None:
780 self._valid_manifest_variables = valid_manifest_variables
781 self._resolved: Set[str] = set()
782 self._substitution = substitution
784 def __contains__(self, item: object) -> bool:
785 return item in self._valid_manifest_variables
787 def __getitem__(self, key: str) -> str:
788 if key not in self._valid_manifest_variables: 788 ↛ 789line 788 didn't jump to line 789, because the condition on line 788 was never true
789 raise KeyError(key)
790 v = self._substitution.substitute(
791 "{{" + key + "}}", f"test of manifest variable `{key}`"
792 )
793 self._resolved.add(key)
794 return v
796 def __len__(self) -> int:
797 return len(self._valid_manifest_variables)
799 def __iter__(self) -> Iterator[str]:
800 return iter(self._valid_manifest_variables)
802 def keys(self) -> KeysView[str]:
803 return cast("KeysView[str]", self._valid_manifest_variables)