Coverage for src/debputy/commands/debputy_cmd/context.py: 42%

283 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import argparse 

2import dataclasses 

3import errno 

4import os 

5from typing import ( 

6 Optional, 

7 Tuple, 

8 Mapping, 

9 FrozenSet, 

10 Set, 

11 Union, 

12 Sequence, 

13 Iterable, 

14 Callable, 

15 Dict, 

16 TYPE_CHECKING, 

17) 

18 

19from debian.debian_support import DpkgArchTable 

20 

21from debputy._deb_options_profiles import DebBuildOptionsAndProfiles 

22from debputy.architecture_support import ( 

23 DpkgArchitectureBuildProcessValuesTable, 

24 dpkg_architecture_table, 

25) 

26from debputy.exceptions import DebputyRuntimeError 

27from debputy.filesystem_scan import FSROOverlay 

28from debputy.highlevel_manifest import HighLevelManifest 

29from debputy.highlevel_manifest_parser import YAMLManifestParser 

30from debputy.packages import SourcePackage, BinaryPackage, parse_source_debian_control 

31from debputy.plugin.api import VirtualPath 

32from debputy.plugin.api.impl import load_plugin_features 

33from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

34from debputy.substitution import ( 

35 Substitution, 

36 VariableContext, 

37 SubstitutionImpl, 

38 NULL_SUBSTITUTION, 

39) 

40from debputy.util import _error, PKGNAME_REGEX, resolve_source_date_epoch, setup_logging 

41 

42if TYPE_CHECKING: 

43 from argparse import _SubParsersAction 

44 

45 

46CommandHandler = Callable[["CommandContext"], None] 

47ArgparserConfigurator = Callable[[argparse.ArgumentParser], None] 

48 

49 

50def add_arg( 

51 *name_or_flags: str, 

52 **kwargs, 

53) -> Callable[[argparse.ArgumentParser], None]: 

54 def _configurator(argparser: argparse.ArgumentParser) -> None: 

55 argparser.add_argument( 

56 *name_or_flags, 

57 **kwargs, 

58 ) 

59 

60 return _configurator 

61 

62 

63@dataclasses.dataclass(slots=True, frozen=True) 

64class CommandArg: 

65 parsed_args: argparse.Namespace 

66 plugin_search_dirs: Sequence[str] 

67 

68 

69@dataclasses.dataclass 

70class Command: 

71 handler: Callable[["CommandContext"], None] 

72 require_substitution: bool = True 

73 requested_plugins_only: bool = False 

74 

75 

76class CommandContext: 

77 def __init__( 

78 self, 

79 parsed_args: argparse.Namespace, 

80 plugin_search_dirs: Sequence[str], 

81 require_substitution: bool = True, 

82 requested_plugins_only: bool = False, 

83 ) -> None: 

84 self.parsed_args = parsed_args 

85 self.plugin_search_dirs = plugin_search_dirs 

86 self._require_substitution = require_substitution 

87 self._requested_plugins_only = requested_plugins_only 

88 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = ( 

89 PluginProvidedFeatureSet() 

90 ) 

91 self._debian_dir = FSROOverlay.create_root_dir("debian", "debian") 

92 self._mtime: Optional[int] = None 

93 self._source_variables: Optional[Mapping[str, str]] = None 

94 self._substitution: Optional[Substitution] = None 

95 self._requested_plugins: Optional[Sequence[str]] = None 

96 self._plugins_loaded = False 

97 self._dctrl_data: Optional[ 

98 Tuple[ 

99 DpkgArchitectureBuildProcessValuesTable, 

100 DpkgArchTable, 

101 DebBuildOptionsAndProfiles, 

102 "SourcePackage", 

103 Mapping[str, "BinaryPackage"], 

104 ] 

105 ] = None 

106 

107 @property 

108 def debian_dir(self) -> VirtualPath: 

109 return self._debian_dir 

110 

111 @property 

112 def mtime(self) -> int: 

113 if self._mtime is None: 

114 self._mtime = resolve_source_date_epoch( 

115 None, 

116 substitution=self.substitution, 

117 ) 

118 return self._mtime 

119 

120 def source_package(self) -> SourcePackage: 

121 _a, _b, _c, source, _d = self._parse_dctrl() 

122 return source 

123 

124 def binary_packages(self) -> Mapping[str, "BinaryPackage"]: 

125 _a, _b, _c, _source, binary_package_table = self._parse_dctrl() 

126 return binary_package_table 

127 

128 def requested_plugins(self) -> Sequence[str]: 

129 if self._requested_plugins is None: 

130 self._requested_plugins = self._resolve_requested_plugins() 

131 return self._requested_plugins 

132 

133 def required_plugins(self) -> Set[str]: 

134 return set(getattr(self.parsed_args, "required_plugins") or []) 

135 

136 @property 

137 def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles": 

138 _a, _b, deb_build_options_and_profiles, _c, _d = self._parse_dctrl() 

139 return deb_build_options_and_profiles 

140 

141 @property 

142 def deb_build_options(self) -> Mapping[str, Optional[str]]: 

143 return self.deb_build_options_and_profiles.deb_build_options 

144 

145 def _create_substitution( 

146 self, 

147 parsed_args: argparse.Namespace, 

148 plugin_feature_set: PluginProvidedFeatureSet, 

149 debian_dir: VirtualPath, 

150 ) -> Substitution: 

151 requested_subst = self._require_substitution 

152 if hasattr(parsed_args, "substitution"): 

153 requested_subst = parsed_args.substitution 

154 if requested_subst is False and self._require_substitution: 

155 _error(f"--no-substitution cannot be used with {parsed_args.command}") 

156 if self._require_substitution or requested_subst is not False: 

157 variable_context = VariableContext(debian_dir) 

158 return SubstitutionImpl( 

159 plugin_feature_set=plugin_feature_set, 

160 unresolvable_substitutions=frozenset(["PACKAGE"]), 

161 variable_context=variable_context, 

162 ) 

163 return NULL_SUBSTITUTION 

164 

165 def load_plugins(self) -> PluginProvidedFeatureSet: 

166 if not self._plugins_loaded: 

167 requested_plugins = None 

168 required_plugins = self.required_plugins() 

169 if self._requested_plugins_only: 

170 requested_plugins = self.requested_plugins() 

171 debug_mode = getattr(self.parsed_args, "debug_mode", False) 

172 load_plugin_features( 

173 self.plugin_search_dirs, 

174 self.substitution, 

175 requested_plugins_only=requested_plugins, 

176 required_plugins=required_plugins, 

177 plugin_feature_set=self._debputy_plugin_feature_set, 

178 debug_mode=debug_mode, 

179 ) 

180 self._plugins_loaded = True 

181 return self._debputy_plugin_feature_set 

182 

183 @staticmethod 

184 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]: 

185 package_prefix = "debputy-plugin-" 

186 for dep_clause in (d.strip() for d in dep_field.split(",")): 

187 dep = dep_clause.split("|")[0].strip() 

188 if not dep.startswith(package_prefix): 

189 continue 

190 m = PKGNAME_REGEX.search(dep) 

191 assert m 

192 package_name = m.group(0) 

193 plugin_name = package_name[len(package_prefix) :] 

194 yield plugin_name 

195 

196 def _resolve_requested_plugins(self) -> Sequence[str]: 

197 _a, _b, _c, source_package, _d = self._parse_dctrl() 

198 bd = source_package.fields.get("Build-Depends", "") 

199 plugins = list(self._plugin_from_dependency_field(bd)) 

200 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"): 

201 f = source_package.fields.get(field_name) 

202 if not f: 

203 continue 

204 for plugin in self._plugin_from_dependency_field(f): 

205 raise DebputyRuntimeError( 

206 f"Cannot load plugins via {field_name}:" 

207 f" Please move debputy-plugin-{plugin} dependency to Build-Depends." 

208 ) 

209 

210 return plugins 

211 

212 @property 

213 def substitution(self) -> Substitution: 

214 if self._substitution is None: 

215 self._substitution = self._create_substitution( 

216 self.parsed_args, 

217 self._debputy_plugin_feature_set, 

218 self.debian_dir, 

219 ) 

220 return self._substitution 

221 

222 def must_be_called_in_source_root(self) -> None: 

223 if self.debian_dir.get("control") is None: 

224 _error( 

225 "This subcommand must be run from a source package root; expecting debian/control to exist." 

226 ) 

227 

228 def _parse_dctrl( 

229 self, 

230 ) -> Tuple[ 

231 DpkgArchitectureBuildProcessValuesTable, 

232 DpkgArchTable, 

233 DebBuildOptionsAndProfiles, 

234 "SourcePackage", 

235 Mapping[str, "BinaryPackage"], 

236 ]: 

237 if self._dctrl_data is None: 

238 build_env = DebBuildOptionsAndProfiles.instance() 

239 dpkg_architecture_variables = dpkg_architecture_table() 

240 dpkg_arch_query_table = DpkgArchTable.load_arch_table() 

241 

242 packages: Union[Set[str], FrozenSet[str]] = frozenset() 

243 if hasattr(self.parsed_args, "packages"): 

244 packages = self.parsed_args.packages 

245 

246 try: 

247 debian_control = self.debian_dir.get("control") 

248 if debian_control is None: 

249 raise FileNotFoundError( 

250 errno.ENOENT, 

251 os.strerror(errno.ENOENT), 

252 os.path.join(self.debian_dir.fs_path, "control"), 

253 ) 

254 source_package, binary_packages = parse_source_debian_control( 

255 debian_control, 

256 packages, # -p/--package 

257 set(), # -N/--no-package 

258 False, # -i 

259 False, # -a 

260 dpkg_architecture_variables=dpkg_architecture_variables, 

261 dpkg_arch_query_table=dpkg_arch_query_table, 

262 build_env=build_env, 

263 ) 

264 assert packages <= binary_packages.keys() 

265 except FileNotFoundError: 

266 # We are not using `must_be_called_in_source_root`, because we (in this case) require 

267 # the file to be readable (that is, parse_source_debian_control can also raise a 

268 # FileNotFoundError when trying to open the file). 

269 _error( 

270 "This subcommand must be run from a source package root; expecting debian/control to exist." 

271 ) 

272 

273 self._dctrl_data = ( 

274 dpkg_architecture_variables, 

275 dpkg_arch_query_table, 

276 build_env, 

277 source_package, 

278 binary_packages, 

279 ) 

280 

281 return self._dctrl_data 

282 

283 @property 

284 def has_dctrl_file(self) -> bool: 

285 debian_control = self.debian_dir.get("control") 

286 return debian_control is not None 

287 

288 def manifest_parser( 

289 self, 

290 *, 

291 manifest_path: Optional[str] = None, 

292 ) -> YAMLManifestParser: 

293 substitution = self.substitution 

294 

295 ( 

296 dpkg_architecture_variables, 

297 dpkg_arch_query_table, 

298 build_env, 

299 source_package, 

300 binary_packages, 

301 ) = self._parse_dctrl() 

302 

303 if self.parsed_args.debputy_manifest is not None: 

304 manifest_path = self.parsed_args.debputy_manifest 

305 if manifest_path is None: 

306 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

307 return YAMLManifestParser( 

308 manifest_path, 

309 source_package, 

310 binary_packages, 

311 substitution, 

312 dpkg_architecture_variables, 

313 dpkg_arch_query_table, 

314 build_env, 

315 self.load_plugins(), 

316 debian_dir=self.debian_dir, 

317 ) 

318 

319 def parse_manifest( 

320 self, 

321 *, 

322 manifest_path: Optional[str] = None, 

323 ) -> HighLevelManifest: 

324 substitution = self.substitution 

325 manifest_required = False 

326 

327 ( 

328 dpkg_architecture_variables, 

329 dpkg_arch_query_table, 

330 build_env, 

331 _, 

332 binary_packages, 

333 ) = self._parse_dctrl() 

334 

335 if self.parsed_args.debputy_manifest is not None: 

336 manifest_path = self.parsed_args.debputy_manifest 

337 manifest_required = True 

338 if manifest_path is None: 

339 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

340 parser = self.manifest_parser(manifest_path=manifest_path) 

341 

342 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute( 

343 "{{SOURCE_DATE_EPOCH}}", 

344 "Internal resolution", 

345 ) 

346 if os.path.isfile(manifest_path): 

347 return parser.parse_manifest() 

348 if manifest_required: 

349 _error(f'The path "{manifest_path}" is not a file!') 

350 return parser.build_manifest() 

351 

352 

353class CommandBase: 

354 __slots__ = () 

355 

356 def configure(self, argparser: argparse.ArgumentParser) -> None: 

357 # Does nothing by default 

358 pass 

359 

360 def __call__(self, command_arg: CommandArg) -> None: 

361 raise NotImplementedError 

362 

363 

364class SubcommandBase(CommandBase): 

365 __slots__ = ("name", "aliases", "help_description") 

366 

367 def __init__( 

368 self, 

369 name: str, 

370 *, 

371 aliases: Sequence[str] = tuple(), 

372 help_description: Optional[str] = None, 

373 ) -> None: 

374 self.name = name 

375 self.aliases = aliases 

376 self.help_description = help_description 

377 

378 def add_subcommand_to_subparser( 

379 self, 

380 subparser: "_SubParsersAction", 

381 ) -> argparse.ArgumentParser: 

382 parser = subparser.add_parser( 

383 self.name, 

384 aliases=self.aliases, 

385 help=self.help_description, 

386 allow_abbrev=False, 

387 ) 

388 self.configure(parser) 

389 return parser 

390 

391 

392class GenericSubCommand(SubcommandBase): 

393 __slots__ = ( 

394 "_handler", 

395 "_configure_handler", 

396 "_require_substitution", 

397 "_requested_plugins_only", 

398 "_log_only_to_stderr", 

399 ) 

400 

401 def __init__( 

402 self, 

403 name: str, 

404 handler: Callable[[CommandContext], None], 

405 *, 

406 aliases: Sequence[str] = tuple(), 

407 help_description: Optional[str] = None, 

408 configure_handler: Optional[Callable[[argparse.ArgumentParser], None]] = None, 

409 require_substitution: bool = True, 

410 requested_plugins_only: bool = False, 

411 log_only_to_stderr: bool = False, 

412 ) -> None: 

413 super().__init__(name, aliases=aliases, help_description=help_description) 

414 self._handler = handler 

415 self._configure_handler = configure_handler 

416 self._require_substitution = require_substitution 

417 self._requested_plugins_only = requested_plugins_only 

418 self._log_only_to_stderr = log_only_to_stderr 

419 

420 def configure_handler( 

421 self, 

422 handler: Callable[[argparse.ArgumentParser], None], 

423 ) -> None: 

424 if self._configure_handler is not None: 424 ↛ 425line 424 didn't jump to line 425, because the condition on line 424 was never true

425 raise TypeError("Only one argument handler can be provided") 

426 self._configure_handler = handler 

427 

428 def configure(self, argparser: argparse.ArgumentParser) -> None: 

429 handler = self._configure_handler 

430 if handler is not None: 

431 handler(argparser) 

432 

433 def __call__(self, command_arg: CommandArg) -> None: 

434 context = CommandContext( 

435 command_arg.parsed_args, 

436 command_arg.plugin_search_dirs, 

437 self._require_substitution, 

438 self._requested_plugins_only, 

439 ) 

440 if self._log_only_to_stderr: 

441 setup_logging(reconfigure_logging=True, log_only_to_stderr=True) 

442 return self._handler(context) 

443 

444 

445class DispatchingCommandMixin(CommandBase): 

446 __slots__ = () 

447 

448 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

449 raise NotImplementedError 

450 

451 def add_dispatching_subcommand( 

452 self, 

453 name: str, 

454 dest: str, 

455 *, 

456 aliases: Sequence[str] = tuple(), 

457 help_description: Optional[str] = None, 

458 metavar: str = "command", 

459 default_subcommand: Optional[str] = None, 

460 ) -> "DispatcherCommand": 

461 ds = DispatcherCommand( 

462 name, 

463 dest, 

464 aliases=aliases, 

465 help_description=help_description, 

466 metavar=metavar, 

467 default_subcommand=default_subcommand, 

468 ) 

469 self.add_subcommand(ds) 

470 return ds 

471 

472 def register_subcommand( 

473 self, 

474 name: Union[str, Sequence[str]], 

475 *, 

476 help_description: Optional[str] = None, 

477 argparser: Optional[ 

478 Union[ArgparserConfigurator, Sequence[ArgparserConfigurator]] 

479 ] = None, 

480 require_substitution: bool = True, 

481 requested_plugins_only: bool = False, 

482 log_only_to_stderr: bool = False, 

483 ) -> Callable[[CommandHandler], GenericSubCommand]: 

484 if isinstance(name, str): 

485 cmd_name = name 

486 aliases = [] 

487 else: 

488 cmd_name = name[0] 

489 aliases = name[1:] 

490 

491 if argparser is not None and not callable(argparser): 

492 args = argparser 

493 

494 def _wrapper(parser: argparse.ArgumentParser) -> None: 

495 for configurator in args: 

496 configurator(parser) 

497 

498 argparser = _wrapper 

499 

500 def _annotation_impl(func: CommandHandler) -> GenericSubCommand: 

501 subcommand = GenericSubCommand( 

502 cmd_name, 

503 func, 

504 aliases=aliases, 

505 help_description=help_description, 

506 require_substitution=require_substitution, 

507 requested_plugins_only=requested_plugins_only, 

508 log_only_to_stderr=log_only_to_stderr, 

509 ) 

510 self.add_subcommand(subcommand) 

511 if argparser is not None: 

512 subcommand.configure_handler(argparser) 

513 

514 return subcommand 

515 

516 return _annotation_impl 

517 

518 

519class DispatcherCommand(SubcommandBase, DispatchingCommandMixin): 

520 __slots__ = ( 

521 "_subcommands", 

522 "_aliases", 

523 "_dest", 

524 "_metavar", 

525 "_required", 

526 "_default_subcommand", 

527 "_argparser", 

528 ) 

529 

530 def __init__( 

531 self, 

532 name: str, 

533 dest: str, 

534 *, 

535 aliases: Sequence[str] = tuple(), 

536 help_description: Optional[str] = None, 

537 metavar: str = "command", 

538 default_subcommand: Optional[str] = None, 

539 ) -> None: 

540 super().__init__(name, aliases=aliases, help_description=help_description) 

541 self._aliases: Dict[str, SubcommandBase] = {} 

542 self._subcommands: Dict[str, SubcommandBase] = {} 

543 self._dest = dest 

544 self._metavar = metavar 

545 self._default_subcommand = default_subcommand 

546 self._argparser: Optional[argparse.ArgumentParser] = None 

547 

548 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

549 all_names = [subcommand.name] 

550 if subcommand.aliases: 

551 all_names.extend(subcommand.aliases) 

552 aliases = self._aliases 

553 for n in all_names: 

554 if n in aliases: 554 ↛ 555line 554 didn't jump to line 555, because the condition on line 554 was never true

555 raise ValueError( 

556 f"Internal error: Multiple handlers for {n} on topic {self.name}" 

557 ) 

558 

559 aliases[n] = subcommand 

560 self._subcommands[subcommand.name] = subcommand 

561 

562 def configure(self, argparser: argparse.ArgumentParser) -> None: 

563 if self._argparser is not None: 

564 raise TypeError("Cannot configure twice!") 

565 self._argparser = argparser 

566 subcommands = self._subcommands 

567 if not subcommands: 

568 raise ValueError( 

569 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)" 

570 ) 

571 default_subcommand = self._default_subcommand 

572 required = default_subcommand is None 

573 if ( 

574 default_subcommand is not None 

575 and default_subcommand not in ("--help", "-h") 

576 and default_subcommand not in subcommands 

577 ): 

578 raise ValueError( 

579 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default," 

580 " but it was not registered?" 

581 ) 

582 subparser = argparser.add_subparsers( 

583 dest=self._dest, 

584 required=required, 

585 metavar=self._metavar, 

586 ) 

587 for subcommand in subcommands.values(): 

588 subcommand.add_subcommand_to_subparser(subparser) 

589 

590 def has_command(self, command: str) -> bool: 

591 return command in self._aliases 

592 

593 def __call__(self, command_arg: CommandArg) -> None: 

594 argparser = self._argparser 

595 assert argparser is not None 

596 v = getattr(command_arg.parsed_args, self._dest, None) 

597 if v is None: 

598 v = self._default_subcommand 

599 if v in ("--help", "-h"): 

600 argparser.parse_args([v]) 

601 _error("Missing command", prog=argparser.prog) 

602 

603 assert ( 

604 v is not None 

605 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?" 

606 assert ( 

607 v in self._aliases 

608 ), f"Internal error: {v} was accepted as a topic, but it was not registered?" 

609 self._aliases[v](command_arg) 

610 

611 

612ROOT_COMMAND = DispatcherCommand( 

613 "root", 

614 dest="command", 

615 metavar="COMMAND", 

616)