Coverage for src/debputy/plugin/api/test_api/test_impl.py: 82%

296 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from io import BytesIO 

6from typing import ( 

7 Mapping, 

8 Dict, 

9 Optional, 

10 Tuple, 

11 List, 

12 cast, 

13 FrozenSet, 

14 Sequence, 

15 Union, 

16 Type, 

17 Iterator, 

18 Set, 

19 KeysView, 

20 Callable, 

21) 

22 

23from debian.deb822 import Deb822 

24from debian.substvars import Substvars 

25 

26from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

27from debputy.architecture_support import faked_arch_table 

28from debputy.filesystem_scan import FSROOverlay, FSRootDir 

29from debputy.packages import BinaryPackage 

30from debputy.plugin.api import ( 

31 PluginInitializationEntryPoint, 

32 VirtualPath, 

33 PackageProcessingContext, 

34 DpkgTriggerType, 

35 Maintscript, 

36) 

37from debputy.plugin.api.example_processing import process_discard_rule_example 

38from debputy.plugin.api.impl import ( 

39 plugin_metadata_for_debputys_own_plugin, 

40 DebputyPluginInitializerProvider, 

41 parse_json_plugin_desc, 

42 MaintscriptAccessorProviderBase, 

43 BinaryCtrlAccessorProviderBase, 

44 PLUGIN_TEST_SUFFIX, 

45 find_json_plugin, 

46 ServiceDefinitionImpl, 

47) 

48from debputy.plugin.api.impl_types import ( 

49 PackagerProvidedFileClassSpec, 

50 DebputyPluginMetadata, 

51 PluginProvidedTrigger, 

52 ServiceManagerDetails, 

53) 

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

55from debputy.plugin.api.spec import ( 

56 MaintscriptAccessor, 

57 FlushableSubstvars, 

58 ServiceRegistry, 

59 DSD, 

60 ServiceUpgradeRule, 

61) 

62from debputy.plugin.api.test_api.test_spec import ( 

63 InitializedPluginUnderTest, 

64 RegisteredPackagerProvidedFile, 

65 RegisteredTrigger, 

66 RegisteredMaintscript, 

67 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

68 ADRExampleIssue, 

69 DetectedService, 

70 RegisteredMetadata, 

71) 

72from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

73from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

74from debputy.util import package_cross_check_precheck 

75 

76RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

77 

78 

79@dataclasses.dataclass(frozen=True, slots=True) 

80class PackageProcessingContextTestProvider(PackageProcessingContext): 

81 binary_package: BinaryPackage 

82 binary_package_version: str 

83 related_udeb_package: Optional[BinaryPackage] 

84 related_udeb_package_version: Optional[str] 

85 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]] 

86 

87 

88def _initialize_plugin_under_test( 

89 plugin_metadata: DebputyPluginMetadata, 

90 load_debputy_plugin: bool = True, 

91) -> "InitializedPluginUnderTest": 

92 feature_set = PluginProvidedFeatureSet() 

93 substitution = SubstitutionImpl( 

94 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

95 variable_context=VariableContext( 

96 FSROOverlay.create_root_dir("debian", "debian"), 

97 ), 

98 plugin_feature_set=feature_set, 

99 ) 

100 

101 if load_debputy_plugin: 

102 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

103 initialize_debputy_features 

104 ) 

105 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

106 debputy_provider = DebputyPluginInitializerProvider( 

107 debputy_plugin_metadata, 

108 feature_set, 

109 substitution, 

110 ) 

111 debputy_provider.load_plugin() 

112 

113 plugin_under_test_provider = DebputyPluginInitializerProvider( 

114 plugin_metadata, 

115 feature_set, 

116 substitution, 

117 ) 

118 plugin_under_test_provider.load_plugin() 

119 

120 return InitializedPluginUnderTestImpl( 

121 plugin_metadata.plugin_name, 

122 feature_set, 

123 substitution, 

124 ) 

125 

126 

127def _auto_load_plugin_from_filename( 

128 py_test_filename: str, 

129) -> "InitializedPluginUnderTest": 

130 dirname, basename = os.path.split(py_test_filename) 

131 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

132 

133 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

134 if test_location == "uninstalled": 

135 json_basename = f"{plugin_name}.json" 

136 json_desc_file = os.path.join(dirname, json_basename) 

137 if "/" not in json_desc_file: 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

138 json_desc_file = f"./{json_desc_file}" 

139 

140 if os.path.isfile(json_desc_file): 140 ↛ 143line 140 didn't jump to line 143, because the condition on line 140 was never false

141 return _initialize_plugin_from_desc(json_desc_file) 

142 

143 json_desc_file_in = f"{json_desc_file}.in" 

144 if os.path.isfile(json_desc_file_in): 

145 return _initialize_plugin_from_desc(json_desc_file) 

146 raise FileNotFoundError( 

147 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

148 f" {json_desc_file} or {json_desc_file_in}" 

149 ) 

150 

151 if test_location == "installed": 151 ↛ 155line 151 didn't jump to line 155, because the condition on line 151 was never false

152 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

153 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

154 

155 raise ValueError( 

156 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

157 ' unset OR one of "installed", "uninstalled".' 

158 ) 

159 

160 

161def initialize_plugin_under_test( 

162 *, 

163 plugin_desc_file: Optional[str] = None, 

164) -> "InitializedPluginUnderTest": 

165 """Load and initialize a plugin for testing it 

166 

167 This method will load the plugin via plugin description, which is the method that `debputy` does at 

168 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

169 of the flow). 

170 

171 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

172 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

173 This works for "single-file" plugins, where the description file and the test are right next to 

174 each other. 

175 

176 Note that the description file is *not* required to a valid version at this stage (e.g., "N/A" or 

177 "@PLACEHOLDER@") is fine. So you still use this method if you substitute in the version during 

178 build after running the tests. To support this flow, the file name can also end with `.json.in` 

179 (instead of `.json`). 

180 :return: The loaded plugin for testing 

181 """ 

182 if plugin_desc_file is None: 

183 caller_file = inspect.stack()[1].filename 

184 return _auto_load_plugin_from_filename(caller_file) 

185 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 raise RuntimeError( 

187 "Running the test against an installed plugin does not work when" 

188 " plugin_desc_file is provided. Please skip this test. You can " 

189 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

190 " conditional for this purpose." 

191 ) 

192 return _initialize_plugin_from_desc(plugin_desc_file) 

193 

194 

195def _initialize_plugin_from_desc( 

196 desc_file: str, 

197) -> "InitializedPluginUnderTest": 

198 if not desc_file.endswith((".json", ".json.in")): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true

199 raise ValueError("The plugin file must end with .json or .json.in") 

200 

201 plugin_metadata = parse_json_plugin_desc(desc_file) 

202 

203 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

204 

205 

206def initialize_plugin_under_test_from_inline_json( 

207 plugin_name: str, 

208 json_content: str, 

209) -> "InitializedPluginUnderTest": 

210 with BytesIO(json_content.encode("utf-8")) as fd: 

211 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

212 

213 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

214 

215 

216def initialize_plugin_under_test_preloaded( 

217 api_compat_version: int, 

218 plugin_initializer: PluginInitializationEntryPoint, 

219 /, 

220 plugin_name: str = "plugin-under-test", 

221 load_debputy_plugin: bool = True, 

222) -> "InitializedPluginUnderTest": 

223 """Internal API: Initialize a plugin for testing without loading it from a file 

224 

225 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

226 description file is correct. Notably, any feature provided via the JSON description file will 

227 **NOT** be visible for the test. 

228 

229 This API is mostly useful for testing parts of debputy itself. 

230 

231 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

232 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

233 :param plugin_initializer: The entry point of the plugin 

234 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

235 use a test name and version. However, you can explicitly set if you want the real name/version. 

236 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

237 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

238 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

239 to enable debputy to use this method for self testing. 

240 :return: The loaded plugin for testing 

241 """ 

242 

243 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true

244 raise RuntimeError( 

245 "Running the test against an installed plugin does not work when" 

246 " the plugin is preload. Please skip this test. You can " 

247 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

248 " conditional for this purpose." 

249 ) 

250 

251 plugin_metadata = DebputyPluginMetadata( 

252 plugin_name=plugin_name, 

253 api_compat_version=api_compat_version, 

254 plugin_initializer=plugin_initializer, 

255 plugin_loader=None, 

256 plugin_path="<loaded-via-test>", 

257 ) 

258 

259 return _initialize_plugin_under_test( 

260 plugin_metadata, 

261 load_debputy_plugin=load_debputy_plugin, 

262 ) 

263 

264 

265class _MockArchTable: 

266 @staticmethod 

267 def matches_architecture(_a: str, _b: str) -> bool: 

268 return True 

269 

270 

271FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable()) 

272del _MockArchTable 

273 

274 

275def package_metadata_context( 

276 *, 

277 host_arch: str = "amd64", 

278 package_fields: Optional[Dict[str, str]] = None, 

279 related_udeb_package_fields: Optional[Dict[str, str]] = None, 

280 binary_package_version: str = "1.0-1", 

281 related_udeb_package_version: Optional[str] = None, 

282 should_be_acted_on: bool = True, 

283 related_udeb_fs_root: Optional[VirtualPath] = None, 

284 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(), 

285) -> PackageProcessingContext: 

286 process_table = faked_arch_table(host_arch) 

287 f = { 

288 "Package": "foo", 

289 "Architecture": "any", 

290 } 

291 if package_fields is not None: 

292 f.update(package_fields) 

293 

294 bin_package = BinaryPackage( 

295 Deb822(f), 

296 process_table, 

297 FAKE_DPKG_QUERY_TABLE, 

298 is_main_package=True, 

299 should_be_acted_on=should_be_acted_on, 

300 ) 

301 udeb_package = None 

302 if related_udeb_package_fields is not None: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 uf = dict(related_udeb_package_fields) 

304 uf.setdefault("Package", f'{f["Package"]}-udeb') 

305 uf.setdefault("Architecture", f["Architecture"]) 

306 uf.setdefault("Package-Type", "udeb") 

307 udeb_package = BinaryPackage( 

308 Deb822(uf), 

309 process_table, 

310 FAKE_DPKG_QUERY_TABLE, 

311 is_main_package=False, 

312 should_be_acted_on=True, 

313 ) 

314 if related_udeb_package_version is None: 

315 related_udeb_package_version = binary_package_version 

316 if accessible_package_roots: 

317 apr = [] 

318 for fields, apr_fs_root in accessible_package_roots: 

319 apr_fields = Deb822(dict(fields)) 

320 if "Package" not in apr_fields: 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true

321 raise ValueError( 

322 "Missing mandatory Package field in member of accessible_package_roots" 

323 ) 

324 if "Architecture" not in apr_fields: 324 ↛ 325line 324 didn't jump to line 325, because the condition on line 324 was never true

325 raise ValueError( 

326 "Missing mandatory Architecture field in member of accessible_package_roots" 

327 ) 

328 apr_package = BinaryPackage( 

329 apr_fields, 

330 process_table, 

331 FAKE_DPKG_QUERY_TABLE, 

332 is_main_package=False, 

333 should_be_acted_on=True, 

334 ) 

335 r = package_cross_check_precheck(bin_package, apr_package) 

336 if not r[0]: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 raise ValueError( 

338 f"{apr_package.name} would not be accessible for {bin_package.name}" 

339 ) 

340 apr.append((apr_package, apr_fs_root)) 

341 

342 if related_udeb_fs_root is not None: 342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true

343 if udeb_package is None: 

344 raise ValueError( 

345 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

346 ) 

347 r = package_cross_check_precheck(bin_package, udeb_package) 

348 if not r[0]: 

349 raise ValueError( 

350 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

351 " related_udeb_fs_root is irrelevant" 

352 ) 

353 apr.append(udeb_package) 

354 apr = tuple(apr) 

355 else: 

356 apr = tuple() 

357 

358 return PackageProcessingContextTestProvider( 

359 binary_package=bin_package, 

360 related_udeb_package=udeb_package, 

361 binary_package_version=binary_package_version, 

362 related_udeb_package_version=related_udeb_package_version, 

363 accessible_package_roots=lambda: apr, 

364 ) 

365 

366 

367def manifest_variable_resolution_context( 

368 *, 

369 debian_dir: Optional[VirtualPath] = None, 

370) -> VariableContext: 

371 if debian_dir is None: 

372 debian_dir = FSRootDir() 

373 

374 return VariableContext(debian_dir) 

375 

376 

377class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

378 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

379 

380 def __init__( 

381 self, 

382 plugin_metadata: DebputyPluginMetadata, 

383 plugin_source_id: str, 

384 maintscript_container: Dict[str, List[RegisteredMaintscript]], 

385 ): 

386 self._plugin_metadata = plugin_metadata 

387 self._plugin_source_id = plugin_source_id 

388 self._maintscript_container = maintscript_container 

389 

390 @classmethod 

391 def _apply_condition_to_script( 

392 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None 

393 ) -> str: 

394 return run_snippet 

395 

396 def _append_script( 

397 self, 

398 caller_name: str, 

399 maintscript: Maintscript, 

400 full_script: str, 

401 /, 

402 perform_substitution: bool = True, 

403 ) -> None: 

404 if self._plugin_source_id not in self._maintscript_container: 

405 self._maintscript_container[self._plugin_source_id] = [] 

406 self._maintscript_container[self._plugin_source_id].append( 

407 RegisteredMaintscript( 

408 maintscript, 

409 caller_name, 

410 full_script, 

411 perform_substitution, 

412 ) 

413 ) 

414 

415 

416class RegisteredMetadataImpl(RegisteredMetadata): 

417 __slots__ = ( 

418 "_substvars", 

419 "_triggers", 

420 "_maintscripts", 

421 ) 

422 

423 def __init__( 

424 self, 

425 substvars: Substvars, 

426 triggers: List[RegisteredTrigger], 

427 maintscripts: List[RegisteredMaintscript], 

428 ) -> None: 

429 self._substvars = substvars 

430 self._triggers = triggers 

431 self._maintscripts = maintscripts 

432 

433 @property 

434 def substvars(self) -> Substvars: 

435 return self._substvars 

436 

437 @property 

438 def triggers(self) -> List[RegisteredTrigger]: 

439 return self._triggers 

440 

441 def maintscripts( 

442 self, 

443 *, 

444 maintscript: Optional[Maintscript] = None, 

445 ) -> List[RegisteredMaintscript]: 

446 if maintscript is None: 

447 return self._maintscripts 

448 return [m for m in self._maintscripts if m.maintscript == maintscript] 

449 

450 

451class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

452 __slots__ = ("_maintscript_container",) 

453 

454 def __init__( 

455 self, 

456 plugin_metadata: DebputyPluginMetadata, 

457 plugin_source_id: str, 

458 context: PackageProcessingContext, 

459 ) -> None: 

460 super().__init__( 

461 plugin_metadata, 

462 plugin_source_id, 

463 context, 

464 {}, 

465 FlushableSubstvars(), 

466 (None, None), 

467 ) 

468 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

469 

470 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

471 return MaintscriptAccessorTestProvider( 

472 self._plugin_metadata, 

473 self._plugin_source_id, 

474 self._maintscript_container, 

475 ) 

476 

477 def registered_metadata(self) -> RegisteredMetadata: 

478 return RegisteredMetadataImpl( 

479 self._substvars, 

480 [ 

481 RegisteredTrigger.from_plugin_provided_trigger(t) 

482 for t in self._triggers.values() 

483 if t.provider_source_id == self._plugin_source_id 

484 ], 

485 self._maintscript_container.get(self._plugin_source_id, []), 

486 ) 

487 

488 

489class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

490 __slots__ = ("_service_manager_details", "_service_definitions") 

491 

492 def __init__( 

493 self, 

494 service_manager_details: ServiceManagerDetails, 

495 detected_services: List[DetectedService[DSD]], 

496 ) -> None: 

497 self._service_manager_details = service_manager_details 

498 self._service_definitions = detected_services 

499 

500 def register_service( 

501 self, 

502 path: VirtualPath, 

503 name: Union[str, List[str]], 

504 *, 

505 type_of_service: str = "service", # "timer", etc. 

506 service_scope: str = "system", 

507 enable_by_default: bool = True, 

508 start_by_default: bool = True, 

509 default_upgrade_rule: ServiceUpgradeRule = "restart", 

510 service_context: Optional[DSD] = None, 

511 ) -> None: 

512 names = name if isinstance(name, list) else [name] 

513 if len(names) < 1: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true

514 raise ValueError( 

515 f"The service must have at least one name - {path.absolute} did not have any" 

516 ) 

517 self._service_definitions.append( 

518 DetectedService( 

519 path, 

520 names, 

521 type_of_service, 

522 service_scope, 

523 enable_by_default, 

524 start_by_default, 

525 default_upgrade_rule, 

526 service_context, 

527 ) 

528 ) 

529 

530 

531@contextlib.contextmanager 

532def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

533 if fs_root.is_read_write: 533 ↛ 539line 533 didn't jump to line 539, because the condition on line 533 was never false

534 assert isinstance(fs_root, FSRootDir) 

535 fs_root.is_read_write = False 

536 yield fs_root 

537 fs_root.is_read_write = True 

538 else: 

539 yield fs_root 

540 

541 

542class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

543 def __init__( 

544 self, 

545 plugin_name: str, 

546 feature_set: PluginProvidedFeatureSet, 

547 substitution: SubstitutionImpl, 

548 ) -> None: 

549 self._feature_set = feature_set 

550 self._plugin_name = plugin_name 

551 self._packager_provided_files: Optional[ 

552 Dict[str, RegisteredPackagerProvidedFile] 

553 ] = None 

554 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

555 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

556 self._substitution = substitution 

557 assert plugin_name in self._feature_set.plugin_data 

558 

559 @property 

560 def _plugin_metadata(self) -> DebputyPluginMetadata: 

561 return self._feature_set.plugin_data[self._plugin_name] 

562 

563 def packager_provided_files_by_stem( 

564 self, 

565 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

566 ppf = self._packager_provided_files 

567 if ppf is None: 

568 result: Dict[str, RegisteredPackagerProvidedFile] = {} 

569 for spec in self._feature_set.packager_provided_files.values(): 

570 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

571 continue 

572 # Registered as a virtual subclass, so this should always be True 

573 assert isinstance(spec, RegisteredPackagerProvidedFile) 

574 result[spec.stem] = spec 

575 self._packager_provided_files = result 

576 ppf = result 

577 return ppf 

578 

579 def run_metadata_detector( 

580 self, 

581 metadata_detector_id: str, 

582 fs_root: VirtualPath, 

583 context: Optional[PackageProcessingContext] = None, 

584 ) -> RegisteredMetadata: 

585 if fs_root.parent_dir is not None: 585 ↛ 586line 585 didn't jump to line 586, because the condition on line 585 was never true

586 raise ValueError("Provided path must be the file system root.") 

587 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

588 matching_detectors = [ 

589 d for d in detectors if d.detector_id == metadata_detector_id 

590 ] 

591 if len(matching_detectors) != 1: 591 ↛ 592line 591 didn't jump to line 592, because the condition on line 591 was never true

592 assert not matching_detectors 

593 raise ValueError( 

594 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

595 f' "{metadata_detector_id}"' 

596 ) 

597 if context is None: 

598 context = package_metadata_context() 

599 detector = matching_detectors[0] 

600 if not detector.applies_to(context.binary_package): 

601 raise ValueError( 

602 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

603 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

604 " with the correct specification. As an example: " 

605 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

606 " package." 

607 ) 

608 

609 ctrl = BinaryCtrlAccessorTestProvider( 

610 self._plugin_metadata, 

611 metadata_detector_id, 

612 context, 

613 ) 

614 with _read_only_fs_root(fs_root) as ro_root: 

615 detector.run_detector( 

616 ro_root, 

617 ctrl, 

618 context, 

619 ) 

620 return ctrl.registered_metadata() 

621 

622 def run_package_processor( 

623 self, 

624 package_processor_id: str, 

625 fs_root: VirtualPath, 

626 context: Optional[PackageProcessingContext] = None, 

627 ) -> None: 

628 if fs_root.parent_dir is not None: 628 ↛ 629line 628 didn't jump to line 629, because the condition on line 628 was never true

629 raise ValueError("Provided path must be the file system root.") 

630 pp_key = (self._plugin_name, package_processor_id) 

631 package_processor = self._feature_set.all_package_processors.get(pp_key) 

632 if package_processor is None: 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true

633 raise ValueError( 

634 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

635 f' "{package_processor_id}"' 

636 ) 

637 if context is None: 637 ↛ 639line 637 didn't jump to line 639, because the condition on line 637 was never false

638 context = package_metadata_context() 

639 if not fs_root.is_read_write: 639 ↛ 640line 639 didn't jump to line 640, because the condition on line 639 was never true

640 raise ValueError( 

641 "The provided fs_root is read-only and it must be read-write for package processor" 

642 ) 

643 if not package_processor.applies_to(context.binary_package): 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true

644 raise ValueError( 

645 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

646 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

647 " package with the correct specification. As an example: " 

648 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

649 " package." 

650 ) 

651 package_processor.run_package_processor( 

652 fs_root, 

653 None, 

654 context, 

655 ) 

656 

657 @property 

658 def declared_manifest_variables(self) -> FrozenSet[str]: 

659 return frozenset( 

660 { 

661 k 

662 for k, v in self._feature_set.manifest_variables.items() 

663 if v.plugin_metadata.plugin_name == self._plugin_name 

664 } 

665 ) 

666 

667 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

668 issues = [] 

669 for adr in self._feature_set.auto_discard_rules.values(): 

670 if adr.plugin_metadata.plugin_name != self._plugin_name: 670 ↛ 671line 670 didn't jump to line 671, because the condition on line 670 was never true

671 continue 

672 for idx, example in enumerate(adr.examples): 

673 result = process_discard_rule_example( 

674 adr, 

675 example, 

676 ) 

677 if result.inconsistent_paths: 

678 issues.append( 

679 ADRExampleIssue( 

680 adr.name, 

681 idx, 

682 [ 

683 x.absolute + ("/" if x.is_dir else "") 

684 for x in result.inconsistent_paths 

685 ], 

686 ) 

687 ) 

688 return issues 

689 

690 def run_service_detection_and_integrations( 

691 self, 

692 service_manager: str, 

693 fs_root: VirtualPath, 

694 context: Optional[PackageProcessingContext] = None, 

695 *, 

696 service_context_type_hint: Optional[Type[DSD]] = None, 

697 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]: 

698 if fs_root.parent_dir is not None: 698 ↛ 699line 698 didn't jump to line 699, because the condition on line 698 was never true

699 raise ValueError("Provided path must be the file system root.") 

700 try: 

701 service_manager_details = self._feature_set.service_managers[ 

702 service_manager 

703 ] 

704 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 704 ↛ 705line 704 didn't jump to line 705, because the condition on line 704 was never true

705 raise KeyError(service_manager) 

706 except KeyError: 

707 raise ValueError( 

708 f"The plugin {self._plugin_name} does not provide a" 

709 f" service manager called {service_manager}" 

710 ) from None 

711 

712 if context is None: 712 ↛ 714line 712 didn't jump to line 714, because the condition on line 712 was never false

713 context = package_metadata_context() 

714 detected_services: List[DetectedService[DSD]] = [] 

715 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

716 service_manager_details.service_detector( 

717 fs_root, 

718 registry, 

719 context, 

720 ) 

721 ctrl = BinaryCtrlAccessorTestProvider( 

722 self._plugin_metadata, 

723 service_manager_details.service_manager, 

724 context, 

725 ) 

726 if detected_services: 

727 service_definitions = [ 

728 ServiceDefinitionImpl( 

729 ds.names[0], 

730 ds.names, 

731 ds.path, 

732 ds.type_of_service, 

733 ds.service_scope, 

734 ds.enable_by_default, 

735 ds.start_by_default, 

736 ds.default_upgrade_rule, 

737 self._plugin_name, 

738 True, 

739 ds.service_context, 

740 ) 

741 for ds in detected_services 

742 ] 

743 service_manager_details.service_integrator( 

744 service_definitions, 

745 ctrl, 

746 context, 

747 ) 

748 return detected_services, ctrl.registered_metadata() 

749 

750 def manifest_variables( 

751 self, 

752 *, 

753 resolution_context: Optional[VariableContext] = None, 

754 mocked_variables: Optional[Mapping[str, str]] = None, 

755 ) -> Mapping[str, str]: 

756 valid_manifest_variables = frozenset( 

757 { 

758 n 

759 for n, v in self._feature_set.manifest_variables.items() 

760 if v.plugin_metadata.plugin_name == self._plugin_name 

761 } 

762 ) 

763 if resolution_context is None: 

764 resolution_context = manifest_variable_resolution_context() 

765 substitution = self._substitution.copy_for_subst_test( 

766 self._feature_set, 

767 resolution_context, 

768 extra_substitutions=mocked_variables, 

769 ) 

770 return SubstitutionTable( 

771 valid_manifest_variables, 

772 substitution, 

773 ) 

774 

775 

776class SubstitutionTable(Mapping[str, str]): 

777 def __init__( 

778 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution 

779 ) -> None: 

780 self._valid_manifest_variables = valid_manifest_variables 

781 self._resolved: Set[str] = set() 

782 self._substitution = substitution 

783 

784 def __contains__(self, item: object) -> bool: 

785 return item in self._valid_manifest_variables 

786 

787 def __getitem__(self, key: str) -> str: 

788 if key not in self._valid_manifest_variables: 788 ↛ 789line 788 didn't jump to line 789, because the condition on line 788 was never true

789 raise KeyError(key) 

790 v = self._substitution.substitute( 

791 "{{" + key + "}}", f"test of manifest variable `{key}`" 

792 ) 

793 self._resolved.add(key) 

794 return v 

795 

796 def __len__(self) -> int: 

797 return len(self._valid_manifest_variables) 

798 

799 def __iter__(self) -> Iterator[str]: 

800 return iter(self._valid_manifest_variables) 

801 

802 def keys(self) -> KeysView[str]: 

803 return cast("KeysView[str]", self._valid_manifest_variables)