Coverage for src/debputy/commands/debputy_cmd/context.py: 42%
283 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import argparse
2import dataclasses
3import errno
4import os
5from typing import (
6 Optional,
7 Tuple,
8 Mapping,
9 FrozenSet,
10 Set,
11 Union,
12 Sequence,
13 Iterable,
14 Callable,
15 Dict,
16 TYPE_CHECKING,
17)
19from debian.debian_support import DpkgArchTable
21from debputy._deb_options_profiles import DebBuildOptionsAndProfiles
22from debputy.architecture_support import (
23 DpkgArchitectureBuildProcessValuesTable,
24 dpkg_architecture_table,
25)
26from debputy.exceptions import DebputyRuntimeError
27from debputy.filesystem_scan import FSROOverlay
28from debputy.highlevel_manifest import HighLevelManifest
29from debputy.highlevel_manifest_parser import YAMLManifestParser
30from debputy.packages import SourcePackage, BinaryPackage, parse_source_debian_control
31from debputy.plugin.api import VirtualPath
32from debputy.plugin.api.impl import load_plugin_features
33from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
34from debputy.substitution import (
35 Substitution,
36 VariableContext,
37 SubstitutionImpl,
38 NULL_SUBSTITUTION,
39)
40from debputy.util import _error, PKGNAME_REGEX, resolve_source_date_epoch, setup_logging
42if TYPE_CHECKING:
43 from argparse import _SubParsersAction
46CommandHandler = Callable[["CommandContext"], None]
47ArgparserConfigurator = Callable[[argparse.ArgumentParser], None]
50def add_arg(
51 *name_or_flags: str,
52 **kwargs,
53) -> Callable[[argparse.ArgumentParser], None]:
54 def _configurator(argparser: argparse.ArgumentParser) -> None:
55 argparser.add_argument(
56 *name_or_flags,
57 **kwargs,
58 )
60 return _configurator
63@dataclasses.dataclass(slots=True, frozen=True)
64class CommandArg:
65 parsed_args: argparse.Namespace
66 plugin_search_dirs: Sequence[str]
69@dataclasses.dataclass
70class Command:
71 handler: Callable[["CommandContext"], None]
72 require_substitution: bool = True
73 requested_plugins_only: bool = False
76class CommandContext:
77 def __init__(
78 self,
79 parsed_args: argparse.Namespace,
80 plugin_search_dirs: Sequence[str],
81 require_substitution: bool = True,
82 requested_plugins_only: bool = False,
83 ) -> None:
84 self.parsed_args = parsed_args
85 self.plugin_search_dirs = plugin_search_dirs
86 self._require_substitution = require_substitution
87 self._requested_plugins_only = requested_plugins_only
88 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = (
89 PluginProvidedFeatureSet()
90 )
91 self._debian_dir = FSROOverlay.create_root_dir("debian", "debian")
92 self._mtime: Optional[int] = None
93 self._source_variables: Optional[Mapping[str, str]] = None
94 self._substitution: Optional[Substitution] = None
95 self._requested_plugins: Optional[Sequence[str]] = None
96 self._plugins_loaded = False
97 self._dctrl_data: Optional[
98 Tuple[
99 DpkgArchitectureBuildProcessValuesTable,
100 DpkgArchTable,
101 DebBuildOptionsAndProfiles,
102 "SourcePackage",
103 Mapping[str, "BinaryPackage"],
104 ]
105 ] = None
107 @property
108 def debian_dir(self) -> VirtualPath:
109 return self._debian_dir
111 @property
112 def mtime(self) -> int:
113 if self._mtime is None:
114 self._mtime = resolve_source_date_epoch(
115 None,
116 substitution=self.substitution,
117 )
118 return self._mtime
120 def source_package(self) -> SourcePackage:
121 _a, _b, _c, source, _d = self._parse_dctrl()
122 return source
124 def binary_packages(self) -> Mapping[str, "BinaryPackage"]:
125 _a, _b, _c, _source, binary_package_table = self._parse_dctrl()
126 return binary_package_table
128 def requested_plugins(self) -> Sequence[str]:
129 if self._requested_plugins is None:
130 self._requested_plugins = self._resolve_requested_plugins()
131 return self._requested_plugins
133 def required_plugins(self) -> Set[str]:
134 return set(getattr(self.parsed_args, "required_plugins") or [])
136 @property
137 def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles":
138 _a, _b, deb_build_options_and_profiles, _c, _d = self._parse_dctrl()
139 return deb_build_options_and_profiles
141 @property
142 def deb_build_options(self) -> Mapping[str, Optional[str]]:
143 return self.deb_build_options_and_profiles.deb_build_options
145 def _create_substitution(
146 self,
147 parsed_args: argparse.Namespace,
148 plugin_feature_set: PluginProvidedFeatureSet,
149 debian_dir: VirtualPath,
150 ) -> Substitution:
151 requested_subst = self._require_substitution
152 if hasattr(parsed_args, "substitution"):
153 requested_subst = parsed_args.substitution
154 if requested_subst is False and self._require_substitution:
155 _error(f"--no-substitution cannot be used with {parsed_args.command}")
156 if self._require_substitution or requested_subst is not False:
157 variable_context = VariableContext(debian_dir)
158 return SubstitutionImpl(
159 plugin_feature_set=plugin_feature_set,
160 unresolvable_substitutions=frozenset(["PACKAGE"]),
161 variable_context=variable_context,
162 )
163 return NULL_SUBSTITUTION
165 def load_plugins(self) -> PluginProvidedFeatureSet:
166 if not self._plugins_loaded:
167 requested_plugins = None
168 required_plugins = self.required_plugins()
169 if self._requested_plugins_only:
170 requested_plugins = self.requested_plugins()
171 debug_mode = getattr(self.parsed_args, "debug_mode", False)
172 load_plugin_features(
173 self.plugin_search_dirs,
174 self.substitution,
175 requested_plugins_only=requested_plugins,
176 required_plugins=required_plugins,
177 plugin_feature_set=self._debputy_plugin_feature_set,
178 debug_mode=debug_mode,
179 )
180 self._plugins_loaded = True
181 return self._debputy_plugin_feature_set
183 @staticmethod
184 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]:
185 package_prefix = "debputy-plugin-"
186 for dep_clause in (d.strip() for d in dep_field.split(",")):
187 dep = dep_clause.split("|")[0].strip()
188 if not dep.startswith(package_prefix):
189 continue
190 m = PKGNAME_REGEX.search(dep)
191 assert m
192 package_name = m.group(0)
193 plugin_name = package_name[len(package_prefix) :]
194 yield plugin_name
196 def _resolve_requested_plugins(self) -> Sequence[str]:
197 _a, _b, _c, source_package, _d = self._parse_dctrl()
198 bd = source_package.fields.get("Build-Depends", "")
199 plugins = list(self._plugin_from_dependency_field(bd))
200 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"):
201 f = source_package.fields.get(field_name)
202 if not f:
203 continue
204 for plugin in self._plugin_from_dependency_field(f):
205 raise DebputyRuntimeError(
206 f"Cannot load plugins via {field_name}:"
207 f" Please move debputy-plugin-{plugin} dependency to Build-Depends."
208 )
210 return plugins
212 @property
213 def substitution(self) -> Substitution:
214 if self._substitution is None:
215 self._substitution = self._create_substitution(
216 self.parsed_args,
217 self._debputy_plugin_feature_set,
218 self.debian_dir,
219 )
220 return self._substitution
222 def must_be_called_in_source_root(self) -> None:
223 if self.debian_dir.get("control") is None:
224 _error(
225 "This subcommand must be run from a source package root; expecting debian/control to exist."
226 )
228 def _parse_dctrl(
229 self,
230 ) -> Tuple[
231 DpkgArchitectureBuildProcessValuesTable,
232 DpkgArchTable,
233 DebBuildOptionsAndProfiles,
234 "SourcePackage",
235 Mapping[str, "BinaryPackage"],
236 ]:
237 if self._dctrl_data is None:
238 build_env = DebBuildOptionsAndProfiles.instance()
239 dpkg_architecture_variables = dpkg_architecture_table()
240 dpkg_arch_query_table = DpkgArchTable.load_arch_table()
242 packages: Union[Set[str], FrozenSet[str]] = frozenset()
243 if hasattr(self.parsed_args, "packages"):
244 packages = self.parsed_args.packages
246 try:
247 debian_control = self.debian_dir.get("control")
248 if debian_control is None:
249 raise FileNotFoundError(
250 errno.ENOENT,
251 os.strerror(errno.ENOENT),
252 os.path.join(self.debian_dir.fs_path, "control"),
253 )
254 source_package, binary_packages = parse_source_debian_control(
255 debian_control,
256 packages, # -p/--package
257 set(), # -N/--no-package
258 False, # -i
259 False, # -a
260 dpkg_architecture_variables=dpkg_architecture_variables,
261 dpkg_arch_query_table=dpkg_arch_query_table,
262 build_env=build_env,
263 )
264 assert packages <= binary_packages.keys()
265 except FileNotFoundError:
266 # We are not using `must_be_called_in_source_root`, because we (in this case) require
267 # the file to be readable (that is, parse_source_debian_control can also raise a
268 # FileNotFoundError when trying to open the file).
269 _error(
270 "This subcommand must be run from a source package root; expecting debian/control to exist."
271 )
273 self._dctrl_data = (
274 dpkg_architecture_variables,
275 dpkg_arch_query_table,
276 build_env,
277 source_package,
278 binary_packages,
279 )
281 return self._dctrl_data
283 @property
284 def has_dctrl_file(self) -> bool:
285 debian_control = self.debian_dir.get("control")
286 return debian_control is not None
288 def manifest_parser(
289 self,
290 *,
291 manifest_path: Optional[str] = None,
292 ) -> YAMLManifestParser:
293 substitution = self.substitution
295 (
296 dpkg_architecture_variables,
297 dpkg_arch_query_table,
298 build_env,
299 source_package,
300 binary_packages,
301 ) = self._parse_dctrl()
303 if self.parsed_args.debputy_manifest is not None:
304 manifest_path = self.parsed_args.debputy_manifest
305 if manifest_path is None:
306 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
307 return YAMLManifestParser(
308 manifest_path,
309 source_package,
310 binary_packages,
311 substitution,
312 dpkg_architecture_variables,
313 dpkg_arch_query_table,
314 build_env,
315 self.load_plugins(),
316 debian_dir=self.debian_dir,
317 )
319 def parse_manifest(
320 self,
321 *,
322 manifest_path: Optional[str] = None,
323 ) -> HighLevelManifest:
324 substitution = self.substitution
325 manifest_required = False
327 (
328 dpkg_architecture_variables,
329 dpkg_arch_query_table,
330 build_env,
331 _,
332 binary_packages,
333 ) = self._parse_dctrl()
335 if self.parsed_args.debputy_manifest is not None:
336 manifest_path = self.parsed_args.debputy_manifest
337 manifest_required = True
338 if manifest_path is None:
339 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
340 parser = self.manifest_parser(manifest_path=manifest_path)
342 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute(
343 "{{SOURCE_DATE_EPOCH}}",
344 "Internal resolution",
345 )
346 if os.path.isfile(manifest_path):
347 return parser.parse_manifest()
348 if manifest_required:
349 _error(f'The path "{manifest_path}" is not a file!')
350 return parser.build_manifest()
353class CommandBase:
354 __slots__ = ()
356 def configure(self, argparser: argparse.ArgumentParser) -> None:
357 # Does nothing by default
358 pass
360 def __call__(self, command_arg: CommandArg) -> None:
361 raise NotImplementedError
364class SubcommandBase(CommandBase):
365 __slots__ = ("name", "aliases", "help_description")
367 def __init__(
368 self,
369 name: str,
370 *,
371 aliases: Sequence[str] = tuple(),
372 help_description: Optional[str] = None,
373 ) -> None:
374 self.name = name
375 self.aliases = aliases
376 self.help_description = help_description
378 def add_subcommand_to_subparser(
379 self,
380 subparser: "_SubParsersAction",
381 ) -> argparse.ArgumentParser:
382 parser = subparser.add_parser(
383 self.name,
384 aliases=self.aliases,
385 help=self.help_description,
386 allow_abbrev=False,
387 )
388 self.configure(parser)
389 return parser
392class GenericSubCommand(SubcommandBase):
393 __slots__ = (
394 "_handler",
395 "_configure_handler",
396 "_require_substitution",
397 "_requested_plugins_only",
398 "_log_only_to_stderr",
399 )
401 def __init__(
402 self,
403 name: str,
404 handler: Callable[[CommandContext], None],
405 *,
406 aliases: Sequence[str] = tuple(),
407 help_description: Optional[str] = None,
408 configure_handler: Optional[Callable[[argparse.ArgumentParser], None]] = None,
409 require_substitution: bool = True,
410 requested_plugins_only: bool = False,
411 log_only_to_stderr: bool = False,
412 ) -> None:
413 super().__init__(name, aliases=aliases, help_description=help_description)
414 self._handler = handler
415 self._configure_handler = configure_handler
416 self._require_substitution = require_substitution
417 self._requested_plugins_only = requested_plugins_only
418 self._log_only_to_stderr = log_only_to_stderr
420 def configure_handler(
421 self,
422 handler: Callable[[argparse.ArgumentParser], None],
423 ) -> None:
424 if self._configure_handler is not None: 424 ↛ 425line 424 didn't jump to line 425, because the condition on line 424 was never true
425 raise TypeError("Only one argument handler can be provided")
426 self._configure_handler = handler
428 def configure(self, argparser: argparse.ArgumentParser) -> None:
429 handler = self._configure_handler
430 if handler is not None:
431 handler(argparser)
433 def __call__(self, command_arg: CommandArg) -> None:
434 context = CommandContext(
435 command_arg.parsed_args,
436 command_arg.plugin_search_dirs,
437 self._require_substitution,
438 self._requested_plugins_only,
439 )
440 if self._log_only_to_stderr:
441 setup_logging(reconfigure_logging=True, log_only_to_stderr=True)
442 return self._handler(context)
445class DispatchingCommandMixin(CommandBase):
446 __slots__ = ()
448 def add_subcommand(self, subcommand: SubcommandBase) -> None:
449 raise NotImplementedError
451 def add_dispatching_subcommand(
452 self,
453 name: str,
454 dest: str,
455 *,
456 aliases: Sequence[str] = tuple(),
457 help_description: Optional[str] = None,
458 metavar: str = "command",
459 default_subcommand: Optional[str] = None,
460 ) -> "DispatcherCommand":
461 ds = DispatcherCommand(
462 name,
463 dest,
464 aliases=aliases,
465 help_description=help_description,
466 metavar=metavar,
467 default_subcommand=default_subcommand,
468 )
469 self.add_subcommand(ds)
470 return ds
472 def register_subcommand(
473 self,
474 name: Union[str, Sequence[str]],
475 *,
476 help_description: Optional[str] = None,
477 argparser: Optional[
478 Union[ArgparserConfigurator, Sequence[ArgparserConfigurator]]
479 ] = None,
480 require_substitution: bool = True,
481 requested_plugins_only: bool = False,
482 log_only_to_stderr: bool = False,
483 ) -> Callable[[CommandHandler], GenericSubCommand]:
484 if isinstance(name, str):
485 cmd_name = name
486 aliases = []
487 else:
488 cmd_name = name[0]
489 aliases = name[1:]
491 if argparser is not None and not callable(argparser):
492 args = argparser
494 def _wrapper(parser: argparse.ArgumentParser) -> None:
495 for configurator in args:
496 configurator(parser)
498 argparser = _wrapper
500 def _annotation_impl(func: CommandHandler) -> GenericSubCommand:
501 subcommand = GenericSubCommand(
502 cmd_name,
503 func,
504 aliases=aliases,
505 help_description=help_description,
506 require_substitution=require_substitution,
507 requested_plugins_only=requested_plugins_only,
508 log_only_to_stderr=log_only_to_stderr,
509 )
510 self.add_subcommand(subcommand)
511 if argparser is not None:
512 subcommand.configure_handler(argparser)
514 return subcommand
516 return _annotation_impl
519class DispatcherCommand(SubcommandBase, DispatchingCommandMixin):
520 __slots__ = (
521 "_subcommands",
522 "_aliases",
523 "_dest",
524 "_metavar",
525 "_required",
526 "_default_subcommand",
527 "_argparser",
528 )
530 def __init__(
531 self,
532 name: str,
533 dest: str,
534 *,
535 aliases: Sequence[str] = tuple(),
536 help_description: Optional[str] = None,
537 metavar: str = "command",
538 default_subcommand: Optional[str] = None,
539 ) -> None:
540 super().__init__(name, aliases=aliases, help_description=help_description)
541 self._aliases: Dict[str, SubcommandBase] = {}
542 self._subcommands: Dict[str, SubcommandBase] = {}
543 self._dest = dest
544 self._metavar = metavar
545 self._default_subcommand = default_subcommand
546 self._argparser: Optional[argparse.ArgumentParser] = None
548 def add_subcommand(self, subcommand: SubcommandBase) -> None:
549 all_names = [subcommand.name]
550 if subcommand.aliases:
551 all_names.extend(subcommand.aliases)
552 aliases = self._aliases
553 for n in all_names:
554 if n in aliases: 554 ↛ 555line 554 didn't jump to line 555, because the condition on line 554 was never true
555 raise ValueError(
556 f"Internal error: Multiple handlers for {n} on topic {self.name}"
557 )
559 aliases[n] = subcommand
560 self._subcommands[subcommand.name] = subcommand
562 def configure(self, argparser: argparse.ArgumentParser) -> None:
563 if self._argparser is not None:
564 raise TypeError("Cannot configure twice!")
565 self._argparser = argparser
566 subcommands = self._subcommands
567 if not subcommands:
568 raise ValueError(
569 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)"
570 )
571 default_subcommand = self._default_subcommand
572 required = default_subcommand is None
573 if (
574 default_subcommand is not None
575 and default_subcommand not in ("--help", "-h")
576 and default_subcommand not in subcommands
577 ):
578 raise ValueError(
579 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default,"
580 " but it was not registered?"
581 )
582 subparser = argparser.add_subparsers(
583 dest=self._dest,
584 required=required,
585 metavar=self._metavar,
586 )
587 for subcommand in subcommands.values():
588 subcommand.add_subcommand_to_subparser(subparser)
590 def has_command(self, command: str) -> bool:
591 return command in self._aliases
593 def __call__(self, command_arg: CommandArg) -> None:
594 argparser = self._argparser
595 assert argparser is not None
596 v = getattr(command_arg.parsed_args, self._dest, None)
597 if v is None:
598 v = self._default_subcommand
599 if v in ("--help", "-h"):
600 argparser.parse_args([v])
601 _error("Missing command", prog=argparser.prog)
603 assert (
604 v is not None
605 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?"
606 assert (
607 v in self._aliases
608 ), f"Internal error: {v} was accepted as a topic, but it was not registered?"
609 self._aliases[v](command_arg)
612ROOT_COMMAND = DispatcherCommand(
613 "root",
614 dest="command",
615 metavar="COMMAND",
616)