From ec14b3742103754d9022782587d0b5cf2f9fd1e3 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 25 Apr 2024 04:59:48 +0200 Subject: Merging upstream version 0.1.29. Signed-off-by: Daniel Baumann --- .../d_4b9be07fb6071cd2_test_impl_py.html | 902 --------------------- 1 file changed, 902 deletions(-) delete mode 100644 coverage-report/d_4b9be07fb6071cd2_test_impl_py.html (limited to 'coverage-report/d_4b9be07fb6071cd2_test_impl_py.html') diff --git a/coverage-report/d_4b9be07fb6071cd2_test_impl_py.html b/coverage-report/d_4b9be07fb6071cd2_test_impl_py.html deleted file mode 100644 index 3e1b2fc..0000000 --- a/coverage-report/d_4b9be07fb6071cd2_test_impl_py.html +++ /dev/null @@ -1,902 +0,0 @@ - - - - - Coverage for src/debputy/plugin/api/test_api/test_impl.py: 82% - - - - - -
-
-

- Coverage for src/debputy/plugin/api/test_api/test_impl.py: - 82% -

- -

- 296 statements   - - - - -

-

- « prev     - ^ index     - » next -       - coverage.py v7.2.7, - created at 2024-04-07 12:14 +0200 -

- -
-
-
-

1import contextlib 

-

2import dataclasses 

-

3import inspect 

-

4import os.path 

-

5from io import BytesIO 

-

6from typing import ( 

-

7 Mapping, 

-

8 Dict, 

-

9 Optional, 

-

10 Tuple, 

-

11 List, 

-

12 cast, 

-

13 FrozenSet, 

-

14 Sequence, 

-

15 Union, 

-

16 Type, 

-

17 Iterator, 

-

18 Set, 

-

19 KeysView, 

-

20 Callable, 

-

21) 

-

22 

-

23from debian.deb822 import Deb822 

-

24from debian.substvars import Substvars 

-

25 

-

26from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

-

27from debputy.architecture_support import faked_arch_table 

-

28from debputy.filesystem_scan import FSROOverlay, FSRootDir 

-

29from debputy.packages import BinaryPackage 

-

30from debputy.plugin.api import ( 

-

31 PluginInitializationEntryPoint, 

-

32 VirtualPath, 

-

33 PackageProcessingContext, 

-

34 DpkgTriggerType, 

-

35 Maintscript, 

-

36) 

-

37from debputy.plugin.api.example_processing import process_discard_rule_example 

-

38from debputy.plugin.api.impl import ( 

-

39 plugin_metadata_for_debputys_own_plugin, 

-

40 DebputyPluginInitializerProvider, 

-

41 parse_json_plugin_desc, 

-

42 MaintscriptAccessorProviderBase, 

-

43 BinaryCtrlAccessorProviderBase, 

-

44 PLUGIN_TEST_SUFFIX, 

-

45 find_json_plugin, 

-

46 ServiceDefinitionImpl, 

-

47) 

-

48from debputy.plugin.api.impl_types import ( 

-

49 PackagerProvidedFileClassSpec, 

-

50 DebputyPluginMetadata, 

-

51 PluginProvidedTrigger, 

-

52 ServiceManagerDetails, 

-

53) 

-

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

-

55from debputy.plugin.api.spec import ( 

-

56 MaintscriptAccessor, 

-

57 FlushableSubstvars, 

-

58 ServiceRegistry, 

-

59 DSD, 

-

60 ServiceUpgradeRule, 

-

61) 

-

62from debputy.plugin.api.test_api.test_spec import ( 

-

63 InitializedPluginUnderTest, 

-

64 RegisteredPackagerProvidedFile, 

-

65 RegisteredTrigger, 

-

66 RegisteredMaintscript, 

-

67 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

-

68 ADRExampleIssue, 

-

69 DetectedService, 

-

70 RegisteredMetadata, 

-

71) 

-

72from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

-

73from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

-

74from debputy.util import package_cross_check_precheck 

-

75 

-

76RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

-

77 

-

78 

-

79@dataclasses.dataclass(frozen=True, slots=True) 

-

80class PackageProcessingContextTestProvider(PackageProcessingContext): 

-

81 binary_package: BinaryPackage 

-

82 binary_package_version: str 

-

83 related_udeb_package: Optional[BinaryPackage] 

-

84 related_udeb_package_version: Optional[str] 

-

85 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]] 

-

86 

-

87 

-

88def _initialize_plugin_under_test( 

-

89 plugin_metadata: DebputyPluginMetadata, 

-

90 load_debputy_plugin: bool = True, 

-

91) -> "InitializedPluginUnderTest": 

-

92 feature_set = PluginProvidedFeatureSet() 

-

93 substitution = SubstitutionImpl( 

-

94 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

-

95 variable_context=VariableContext( 

-

96 FSROOverlay.create_root_dir("debian", "debian"), 

-

97 ), 

-

98 plugin_feature_set=feature_set, 

-

99 ) 

-

100 

-

101 if load_debputy_plugin: 

-

102 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

-

103 initialize_debputy_features 

-

104 ) 

-

105 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

-

106 debputy_provider = DebputyPluginInitializerProvider( 

-

107 debputy_plugin_metadata, 

-

108 feature_set, 

-

109 substitution, 

-

110 ) 

-

111 debputy_provider.load_plugin() 

-

112 

-

113 plugin_under_test_provider = DebputyPluginInitializerProvider( 

-

114 plugin_metadata, 

-

115 feature_set, 

-

116 substitution, 

-

117 ) 

-

118 plugin_under_test_provider.load_plugin() 

-

119 

-

120 return InitializedPluginUnderTestImpl( 

-

121 plugin_metadata.plugin_name, 

-

122 feature_set, 

-

123 substitution, 

-

124 ) 

-

125 

-

126 

-

127def _auto_load_plugin_from_filename( 

-

128 py_test_filename: str, 

-

129) -> "InitializedPluginUnderTest": 

-

130 dirname, basename = os.path.split(py_test_filename) 

-

131 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

-

132 

-

133 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

-

134 if test_location == "uninstalled": 

-

135 json_basename = f"{plugin_name}.json" 

-

136 json_desc_file = os.path.join(dirname, json_basename) 

-

137 if "/" not in json_desc_file: 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

-

138 json_desc_file = f"./{json_desc_file}" 

-

139 

-

140 if os.path.isfile(json_desc_file): 140 ↛ 143line 140 didn't jump to line 143, because the condition on line 140 was never false

-

141 return _initialize_plugin_from_desc(json_desc_file) 

-

142 

-

143 json_desc_file_in = f"{json_desc_file}.in" 

-

144 if os.path.isfile(json_desc_file_in): 

-

145 return _initialize_plugin_from_desc(json_desc_file) 

-

146 raise FileNotFoundError( 

-

147 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

-

148 f" {json_desc_file} or {json_desc_file_in}" 

-

149 ) 

-

150 

-

151 if test_location == "installed": 151 ↛ 155line 151 didn't jump to line 155, because the condition on line 151 was never false

-

152 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

-

153 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

-

154 

-

155 raise ValueError( 

-

156 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

-

157 ' unset OR one of "installed", "uninstalled".' 

-

158 ) 

-

159 

-

160 

-

161def initialize_plugin_under_test( 

-

162 *, 

-

163 plugin_desc_file: Optional[str] = None, 

-

164) -> "InitializedPluginUnderTest": 

-

165 """Load and initialize a plugin for testing it 

-

166 

-

167 This method will load the plugin via plugin description, which is the method that `debputy` does at 

-

168 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

-

169 of the flow). 

-

170 

-

171 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

-

172 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

-

173 This works for "single-file" plugins, where the description file and the test are right next to 

-

174 each other. 

-

175 

-

176 Note that the description file is *not* required to a valid version at this stage (e.g., "N/A" or 

-

177 "@PLACEHOLDER@") is fine. So you still use this method if you substitute in the version during 

-

178 build after running the tests. To support this flow, the file name can also end with `.json.in` 

-

179 (instead of `.json`). 

-

180 :return: The loaded plugin for testing 

-

181 """ 

-

182 if plugin_desc_file is None: 

-

183 caller_file = inspect.stack()[1].filename 

-

184 return _auto_load_plugin_from_filename(caller_file) 

-

185 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

-

186 raise RuntimeError( 

-

187 "Running the test against an installed plugin does not work when" 

-

188 " plugin_desc_file is provided. Please skip this test. You can " 

-

189 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

-

190 " conditional for this purpose." 

-

191 ) 

-

192 return _initialize_plugin_from_desc(plugin_desc_file) 

-

193 

-

194 

-

195def _initialize_plugin_from_desc( 

-

196 desc_file: str, 

-

197) -> "InitializedPluginUnderTest": 

-

198 if not desc_file.endswith((".json", ".json.in")): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true

-

199 raise ValueError("The plugin file must end with .json or .json.in") 

-

200 

-

201 plugin_metadata = parse_json_plugin_desc(desc_file) 

-

202 

-

203 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

-

204 

-

205 

-

206def initialize_plugin_under_test_from_inline_json( 

-

207 plugin_name: str, 

-

208 json_content: str, 

-

209) -> "InitializedPluginUnderTest": 

-

210 with BytesIO(json_content.encode("utf-8")) as fd: 

-

211 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

-

212 

-

213 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

-

214 

-

215 

-

216def initialize_plugin_under_test_preloaded( 

-

217 api_compat_version: int, 

-

218 plugin_initializer: PluginInitializationEntryPoint, 

-

219 /, 

-

220 plugin_name: str = "plugin-under-test", 

-

221 load_debputy_plugin: bool = True, 

-

222) -> "InitializedPluginUnderTest": 

-

223 """Internal API: Initialize a plugin for testing without loading it from a file 

-

224 

-

225 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

-

226 description file is correct. Notably, any feature provided via the JSON description file will 

-

227 **NOT** be visible for the test. 

-

228 

-

229 This API is mostly useful for testing parts of debputy itself. 

-

230 

-

231 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

-

232 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

-

233 :param plugin_initializer: The entry point of the plugin 

-

234 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

-

235 use a test name and version. However, you can explicitly set if you want the real name/version. 

-

236 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

-

237 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

-

238 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

-

239 to enable debputy to use this method for self testing. 

-

240 :return: The loaded plugin for testing 

-

241 """ 

-

242 

-

243 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true

-

244 raise RuntimeError( 

-

245 "Running the test against an installed plugin does not work when" 

-

246 " the plugin is preload. Please skip this test. You can " 

-

247 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

-

248 " conditional for this purpose." 

-

249 ) 

-

250 

-

251 plugin_metadata = DebputyPluginMetadata( 

-

252 plugin_name=plugin_name, 

-

253 api_compat_version=api_compat_version, 

-

254 plugin_initializer=plugin_initializer, 

-

255 plugin_loader=None, 

-

256 plugin_path="<loaded-via-test>", 

-

257 ) 

-

258 

-

259 return _initialize_plugin_under_test( 

-

260 plugin_metadata, 

-

261 load_debputy_plugin=load_debputy_plugin, 

-

262 ) 

-

263 

-

264 

-

265class _MockArchTable: 

-

266 @staticmethod 

-

267 def matches_architecture(_a: str, _b: str) -> bool: 

-

268 return True 

-

269 

-

270 

-

271FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable()) 

-

272del _MockArchTable 

-

273 

-

274 

-

275def package_metadata_context( 

-

276 *, 

-

277 host_arch: str = "amd64", 

-

278 package_fields: Optional[Dict[str, str]] = None, 

-

279 related_udeb_package_fields: Optional[Dict[str, str]] = None, 

-

280 binary_package_version: str = "1.0-1", 

-

281 related_udeb_package_version: Optional[str] = None, 

-

282 should_be_acted_on: bool = True, 

-

283 related_udeb_fs_root: Optional[VirtualPath] = None, 

-

284 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(), 

-

285) -> PackageProcessingContext: 

-

286 process_table = faked_arch_table(host_arch) 

-

287 f = { 

-

288 "Package": "foo", 

-

289 "Architecture": "any", 

-

290 } 

-

291 if package_fields is not None: 

-

292 f.update(package_fields) 

-

293 

-

294 bin_package = BinaryPackage( 

-

295 Deb822(f), 

-

296 process_table, 

-

297 FAKE_DPKG_QUERY_TABLE, 

-

298 is_main_package=True, 

-

299 should_be_acted_on=should_be_acted_on, 

-

300 ) 

-

301 udeb_package = None 

-

302 if related_udeb_package_fields is not None: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

-

303 uf = dict(related_udeb_package_fields) 

-

304 uf.setdefault("Package", f'{f["Package"]}-udeb') 

-

305 uf.setdefault("Architecture", f["Architecture"]) 

-

306 uf.setdefault("Package-Type", "udeb") 

-

307 udeb_package = BinaryPackage( 

-

308 Deb822(uf), 

-

309 process_table, 

-

310 FAKE_DPKG_QUERY_TABLE, 

-

311 is_main_package=False, 

-

312 should_be_acted_on=True, 

-

313 ) 

-

314 if related_udeb_package_version is None: 

-

315 related_udeb_package_version = binary_package_version 

-

316 if accessible_package_roots: 

-

317 apr = [] 

-

318 for fields, apr_fs_root in accessible_package_roots: 

-

319 apr_fields = Deb822(dict(fields)) 

-

320 if "Package" not in apr_fields: 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true

-

321 raise ValueError( 

-

322 "Missing mandatory Package field in member of accessible_package_roots" 

-

323 ) 

-

324 if "Architecture" not in apr_fields: 324 ↛ 325line 324 didn't jump to line 325, because the condition on line 324 was never true

-

325 raise ValueError( 

-

326 "Missing mandatory Architecture field in member of accessible_package_roots" 

-

327 ) 

-

328 apr_package = BinaryPackage( 

-

329 apr_fields, 

-

330 process_table, 

-

331 FAKE_DPKG_QUERY_TABLE, 

-

332 is_main_package=False, 

-

333 should_be_acted_on=True, 

-

334 ) 

-

335 r = package_cross_check_precheck(bin_package, apr_package) 

-

336 if not r[0]: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

-

337 raise ValueError( 

-

338 f"{apr_package.name} would not be accessible for {bin_package.name}" 

-

339 ) 

-

340 apr.append((apr_package, apr_fs_root)) 

-

341 

-

342 if related_udeb_fs_root is not None: 342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true

-

343 if udeb_package is None: 

-

344 raise ValueError( 

-

345 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

-

346 ) 

-

347 r = package_cross_check_precheck(bin_package, udeb_package) 

-

348 if not r[0]: 

-

349 raise ValueError( 

-

350 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

-

351 " related_udeb_fs_root is irrelevant" 

-

352 ) 

-

353 apr.append(udeb_package) 

-

354 apr = tuple(apr) 

-

355 else: 

-

356 apr = tuple() 

-

357 

-

358 return PackageProcessingContextTestProvider( 

-

359 binary_package=bin_package, 

-

360 related_udeb_package=udeb_package, 

-

361 binary_package_version=binary_package_version, 

-

362 related_udeb_package_version=related_udeb_package_version, 

-

363 accessible_package_roots=lambda: apr, 

-

364 ) 

-

365 

-

366 

-

367def manifest_variable_resolution_context( 

-

368 *, 

-

369 debian_dir: Optional[VirtualPath] = None, 

-

370) -> VariableContext: 

-

371 if debian_dir is None: 

-

372 debian_dir = FSRootDir() 

-

373 

-

374 return VariableContext(debian_dir) 

-

375 

-

376 

-

377class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

-

378 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

-

379 

-

380 def __init__( 

-

381 self, 

-

382 plugin_metadata: DebputyPluginMetadata, 

-

383 plugin_source_id: str, 

-

384 maintscript_container: Dict[str, List[RegisteredMaintscript]], 

-

385 ): 

-

386 self._plugin_metadata = plugin_metadata 

-

387 self._plugin_source_id = plugin_source_id 

-

388 self._maintscript_container = maintscript_container 

-

389 

-

390 @classmethod 

-

391 def _apply_condition_to_script( 

-

392 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None 

-

393 ) -> str: 

-

394 return run_snippet 

-

395 

-

396 def _append_script( 

-

397 self, 

-

398 caller_name: str, 

-

399 maintscript: Maintscript, 

-

400 full_script: str, 

-

401 /, 

-

402 perform_substitution: bool = True, 

-

403 ) -> None: 

-

404 if self._plugin_source_id not in self._maintscript_container: 

-

405 self._maintscript_container[self._plugin_source_id] = [] 

-

406 self._maintscript_container[self._plugin_source_id].append( 

-

407 RegisteredMaintscript( 

-

408 maintscript, 

-

409 caller_name, 

-

410 full_script, 

-

411 perform_substitution, 

-

412 ) 

-

413 ) 

-

414 

-

415 

-

416class RegisteredMetadataImpl(RegisteredMetadata): 

-

417 __slots__ = ( 

-

418 "_substvars", 

-

419 "_triggers", 

-

420 "_maintscripts", 

-

421 ) 

-

422 

-

423 def __init__( 

-

424 self, 

-

425 substvars: Substvars, 

-

426 triggers: List[RegisteredTrigger], 

-

427 maintscripts: List[RegisteredMaintscript], 

-

428 ) -> None: 

-

429 self._substvars = substvars 

-

430 self._triggers = triggers 

-

431 self._maintscripts = maintscripts 

-

432 

-

433 @property 

-

434 def substvars(self) -> Substvars: 

-

435 return self._substvars 

-

436 

-

437 @property 

-

438 def triggers(self) -> List[RegisteredTrigger]: 

-

439 return self._triggers 

-

440 

-

441 def maintscripts( 

-

442 self, 

-

443 *, 

-

444 maintscript: Optional[Maintscript] = None, 

-

445 ) -> List[RegisteredMaintscript]: 

-

446 if maintscript is None: 

-

447 return self._maintscripts 

-

448 return [m for m in self._maintscripts if m.maintscript == maintscript] 

-

449 

-

450 

-

451class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

-

452 __slots__ = ("_maintscript_container",) 

-

453 

-

454 def __init__( 

-

455 self, 

-

456 plugin_metadata: DebputyPluginMetadata, 

-

457 plugin_source_id: str, 

-

458 context: PackageProcessingContext, 

-

459 ) -> None: 

-

460 super().__init__( 

-

461 plugin_metadata, 

-

462 plugin_source_id, 

-

463 context, 

-

464 {}, 

-

465 FlushableSubstvars(), 

-

466 (None, None), 

-

467 ) 

-

468 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

-

469 

-

470 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

-

471 return MaintscriptAccessorTestProvider( 

-

472 self._plugin_metadata, 

-

473 self._plugin_source_id, 

-

474 self._maintscript_container, 

-

475 ) 

-

476 

-

477 def registered_metadata(self) -> RegisteredMetadata: 

-

478 return RegisteredMetadataImpl( 

-

479 self._substvars, 

-

480 [ 

-

481 RegisteredTrigger.from_plugin_provided_trigger(t) 

-

482 for t in self._triggers.values() 

-

483 if t.provider_source_id == self._plugin_source_id 

-

484 ], 

-

485 self._maintscript_container.get(self._plugin_source_id, []), 

-

486 ) 

-

487 

-

488 

-

489class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

-

490 __slots__ = ("_service_manager_details", "_service_definitions") 

-

491 

-

492 def __init__( 

-

493 self, 

-

494 service_manager_details: ServiceManagerDetails, 

-

495 detected_services: List[DetectedService[DSD]], 

-

496 ) -> None: 

-

497 self._service_manager_details = service_manager_details 

-

498 self._service_definitions = detected_services 

-

499 

-

500 def register_service( 

-

501 self, 

-

502 path: VirtualPath, 

-

503 name: Union[str, List[str]], 

-

504 *, 

-

505 type_of_service: str = "service", # "timer", etc. 

-

506 service_scope: str = "system", 

-

507 enable_by_default: bool = True, 

-

508 start_by_default: bool = True, 

-

509 default_upgrade_rule: ServiceUpgradeRule = "restart", 

-

510 service_context: Optional[DSD] = None, 

-

511 ) -> None: 

-

512 names = name if isinstance(name, list) else [name] 

-

513 if len(names) < 1: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true

-

514 raise ValueError( 

-

515 f"The service must have at least one name - {path.absolute} did not have any" 

-

516 ) 

-

517 self._service_definitions.append( 

-

518 DetectedService( 

-

519 path, 

-

520 names, 

-

521 type_of_service, 

-

522 service_scope, 

-

523 enable_by_default, 

-

524 start_by_default, 

-

525 default_upgrade_rule, 

-

526 service_context, 

-

527 ) 

-

528 ) 

-

529 

-

530 

-

531@contextlib.contextmanager 

-

532def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

-

533 if fs_root.is_read_write: 533 ↛ 539line 533 didn't jump to line 539, because the condition on line 533 was never false

-

534 assert isinstance(fs_root, FSRootDir) 

-

535 fs_root.is_read_write = False 

-

536 yield fs_root 

-

537 fs_root.is_read_write = True 

-

538 else: 

-

539 yield fs_root 

-

540 

-

541 

-

542class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

-

543 def __init__( 

-

544 self, 

-

545 plugin_name: str, 

-

546 feature_set: PluginProvidedFeatureSet, 

-

547 substitution: SubstitutionImpl, 

-

548 ) -> None: 

-

549 self._feature_set = feature_set 

-

550 self._plugin_name = plugin_name 

-

551 self._packager_provided_files: Optional[ 

-

552 Dict[str, RegisteredPackagerProvidedFile] 

-

553 ] = None 

-

554 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

-

555 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

-

556 self._substitution = substitution 

-

557 assert plugin_name in self._feature_set.plugin_data 

-

558 

-

559 @property 

-

560 def _plugin_metadata(self) -> DebputyPluginMetadata: 

-

561 return self._feature_set.plugin_data[self._plugin_name] 

-

562 

-

563 def packager_provided_files_by_stem( 

-

564 self, 

-

565 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

-

566 ppf = self._packager_provided_files 

-

567 if ppf is None: 

-

568 result: Dict[str, RegisteredPackagerProvidedFile] = {} 

-

569 for spec in self._feature_set.packager_provided_files.values(): 

-

570 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

-

571 continue 

-

572 # Registered as a virtual subclass, so this should always be True 

-

573 assert isinstance(spec, RegisteredPackagerProvidedFile) 

-

574 result[spec.stem] = spec 

-

575 self._packager_provided_files = result 

-

576 ppf = result 

-

577 return ppf 

-

578 

-

579 def run_metadata_detector( 

-

580 self, 

-

581 metadata_detector_id: str, 

-

582 fs_root: VirtualPath, 

-

583 context: Optional[PackageProcessingContext] = None, 

-

584 ) -> RegisteredMetadata: 

-

585 if fs_root.parent_dir is not None: 585 ↛ 586line 585 didn't jump to line 586, because the condition on line 585 was never true

-

586 raise ValueError("Provided path must be the file system root.") 

-

587 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

-

588 matching_detectors = [ 

-

589 d for d in detectors if d.detector_id == metadata_detector_id 

-

590 ] 

-

591 if len(matching_detectors) != 1: 591 ↛ 592line 591 didn't jump to line 592, because the condition on line 591 was never true

-

592 assert not matching_detectors 

-

593 raise ValueError( 

-

594 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

-

595 f' "{metadata_detector_id}"' 

-

596 ) 

-

597 if context is None: 

-

598 context = package_metadata_context() 

-

599 detector = matching_detectors[0] 

-

600 if not detector.applies_to(context.binary_package): 

-

601 raise ValueError( 

-

602 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

-

603 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

-

604 " with the correct specification. As an example: " 

-

605 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

-

606 " package." 

-

607 ) 

-

608 

-

609 ctrl = BinaryCtrlAccessorTestProvider( 

-

610 self._plugin_metadata, 

-

611 metadata_detector_id, 

-

612 context, 

-

613 ) 

-

614 with _read_only_fs_root(fs_root) as ro_root: 

-

615 detector.run_detector( 

-

616 ro_root, 

-

617 ctrl, 

-

618 context, 

-

619 ) 

-

620 return ctrl.registered_metadata() 

-

621 

-

622 def run_package_processor( 

-

623 self, 

-

624 package_processor_id: str, 

-

625 fs_root: VirtualPath, 

-

626 context: Optional[PackageProcessingContext] = None, 

-

627 ) -> None: 

-

628 if fs_root.parent_dir is not None: 628 ↛ 629line 628 didn't jump to line 629, because the condition on line 628 was never true

-

629 raise ValueError("Provided path must be the file system root.") 

-

630 pp_key = (self._plugin_name, package_processor_id) 

-

631 package_processor = self._feature_set.all_package_processors.get(pp_key) 

-

632 if package_processor is None: 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true

-

633 raise ValueError( 

-

634 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

-

635 f' "{package_processor_id}"' 

-

636 ) 

-

637 if context is None: 637 ↛ 639line 637 didn't jump to line 639, because the condition on line 637 was never false

-

638 context = package_metadata_context() 

-

639 if not fs_root.is_read_write: 639 ↛ 640line 639 didn't jump to line 640, because the condition on line 639 was never true

-

640 raise ValueError( 

-

641 "The provided fs_root is read-only and it must be read-write for package processor" 

-

642 ) 

-

643 if not package_processor.applies_to(context.binary_package): 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true

-

644 raise ValueError( 

-

645 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

-

646 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

-

647 " package with the correct specification. As an example: " 

-

648 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

-

649 " package." 

-

650 ) 

-

651 package_processor.run_package_processor( 

-

652 fs_root, 

-

653 None, 

-

654 context, 

-

655 ) 

-

656 

-

657 @property 

-

658 def declared_manifest_variables(self) -> FrozenSet[str]: 

-

659 return frozenset( 

-

660 { 

-

661 k 

-

662 for k, v in self._feature_set.manifest_variables.items() 

-

663 if v.plugin_metadata.plugin_name == self._plugin_name 

-

664 } 

-

665 ) 

-

666 

-

667 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

-

668 issues = [] 

-

669 for adr in self._feature_set.auto_discard_rules.values(): 

-

670 if adr.plugin_metadata.plugin_name != self._plugin_name: 670 ↛ 671line 670 didn't jump to line 671, because the condition on line 670 was never true

-

671 continue 

-

672 for idx, example in enumerate(adr.examples): 

-

673 result = process_discard_rule_example( 

-

674 adr, 

-

675 example, 

-

676 ) 

-

677 if result.inconsistent_paths: 

-

678 issues.append( 

-

679 ADRExampleIssue( 

-

680 adr.name, 

-

681 idx, 

-

682 [ 

-

683 x.absolute + ("/" if x.is_dir else "") 

-

684 for x in result.inconsistent_paths 

-

685 ], 

-

686 ) 

-

687 ) 

-

688 return issues 

-

689 

-

690 def run_service_detection_and_integrations( 

-

691 self, 

-

692 service_manager: str, 

-

693 fs_root: VirtualPath, 

-

694 context: Optional[PackageProcessingContext] = None, 

-

695 *, 

-

696 service_context_type_hint: Optional[Type[DSD]] = None, 

-

697 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]: 

-

698 if fs_root.parent_dir is not None: 698 ↛ 699line 698 didn't jump to line 699, because the condition on line 698 was never true

-

699 raise ValueError("Provided path must be the file system root.") 

-

700 try: 

-

701 service_manager_details = self._feature_set.service_managers[ 

-

702 service_manager 

-

703 ] 

-

704 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 704 ↛ 705line 704 didn't jump to line 705, because the condition on line 704 was never true

-

705 raise KeyError(service_manager) 

-

706 except KeyError: 

-

707 raise ValueError( 

-

708 f"The plugin {self._plugin_name} does not provide a" 

-

709 f" service manager called {service_manager}" 

-

710 ) from None 

-

711 

-

712 if context is None: 712 ↛ 714line 712 didn't jump to line 714, because the condition on line 712 was never false

-

713 context = package_metadata_context() 

-

714 detected_services: List[DetectedService[DSD]] = [] 

-

715 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

-

716 service_manager_details.service_detector( 

-

717 fs_root, 

-

718 registry, 

-

719 context, 

-

720 ) 

-

721 ctrl = BinaryCtrlAccessorTestProvider( 

-

722 self._plugin_metadata, 

-

723 service_manager_details.service_manager, 

-

724 context, 

-

725 ) 

-

726 if detected_services: 

-

727 service_definitions = [ 

-

728 ServiceDefinitionImpl( 

-

729 ds.names[0], 

-

730 ds.names, 

-

731 ds.path, 

-

732 ds.type_of_service, 

-

733 ds.service_scope, 

-

734 ds.enable_by_default, 

-

735 ds.start_by_default, 

-

736 ds.default_upgrade_rule, 

-

737 self._plugin_name, 

-

738 True, 

-

739 ds.service_context, 

-

740 ) 

-

741 for ds in detected_services 

-

742 ] 

-

743 service_manager_details.service_integrator( 

-

744 service_definitions, 

-

745 ctrl, 

-

746 context, 

-

747 ) 

-

748 return detected_services, ctrl.registered_metadata() 

-

749 

-

750 def manifest_variables( 

-

751 self, 

-

752 *, 

-

753 resolution_context: Optional[VariableContext] = None, 

-

754 mocked_variables: Optional[Mapping[str, str]] = None, 

-

755 ) -> Mapping[str, str]: 

-

756 valid_manifest_variables = frozenset( 

-

757 { 

-

758 n 

-

759 for n, v in self._feature_set.manifest_variables.items() 

-

760 if v.plugin_metadata.plugin_name == self._plugin_name 

-

761 } 

-

762 ) 

-

763 if resolution_context is None: 

-

764 resolution_context = manifest_variable_resolution_context() 

-

765 substitution = self._substitution.copy_for_subst_test( 

-

766 self._feature_set, 

-

767 resolution_context, 

-

768 extra_substitutions=mocked_variables, 

-

769 ) 

-

770 return SubstitutionTable( 

-

771 valid_manifest_variables, 

-

772 substitution, 

-

773 ) 

-

774 

-

775 

-

776class SubstitutionTable(Mapping[str, str]): 

-

777 def __init__( 

-

778 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution 

-

779 ) -> None: 

-

780 self._valid_manifest_variables = valid_manifest_variables 

-

781 self._resolved: Set[str] = set() 

-

782 self._substitution = substitution 

-

783 

-

784 def __contains__(self, item: object) -> bool: 

-

785 return item in self._valid_manifest_variables 

-

786 

-

787 def __getitem__(self, key: str) -> str: 

-

788 if key not in self._valid_manifest_variables: 788 ↛ 789line 788 didn't jump to line 789, because the condition on line 788 was never true

-

789 raise KeyError(key) 

-

790 v = self._substitution.substitute( 

-

791 "{{" + key + "}}", f"test of manifest variable `{key}`" 

-

792 ) 

-

793 self._resolved.add(key) 

-

794 return v 

-

795 

-

796 def __len__(self) -> int: 

-

797 return len(self._valid_manifest_variables) 

-

798 

-

799 def __iter__(self) -> Iterator[str]: 

-

800 return iter(self._valid_manifest_variables) 

-

801 

-

802 def keys(self) -> KeysView[str]: 

-

803 return cast("KeysView[str]", self._valid_manifest_variables) 

-
- - - -- cgit v1.2.3