diff options
Diffstat (limited to 'src/debputy/commands/debputy_cmd/context.py')
-rw-r--r-- | src/debputy/commands/debputy_cmd/context.py | 607 |
1 files changed, 607 insertions, 0 deletions
diff --git a/src/debputy/commands/debputy_cmd/context.py b/src/debputy/commands/debputy_cmd/context.py new file mode 100644 index 0000000..3363e96 --- /dev/null +++ b/src/debputy/commands/debputy_cmd/context.py @@ -0,0 +1,607 @@ +import argparse +import dataclasses +import errno +import os +from typing import ( + Optional, + Tuple, + Mapping, + FrozenSet, + Set, + Union, + Sequence, + Iterable, + Callable, + Dict, + TYPE_CHECKING, +) + +from debian.debian_support import DpkgArchTable + +from debputy._deb_options_profiles import DebBuildOptionsAndProfiles +from debputy.architecture_support import ( + DpkgArchitectureBuildProcessValuesTable, + dpkg_architecture_table, +) +from debputy.exceptions import DebputyRuntimeError +from debputy.filesystem_scan import FSROOverlay +from debputy.highlevel_manifest import HighLevelManifest +from debputy.highlevel_manifest_parser import YAMLManifestParser +from debputy.packages import SourcePackage, BinaryPackage, parse_source_debian_control +from debputy.plugin.api import VirtualPath +from debputy.plugin.api.impl import load_plugin_features +from debputy.plugin.api.feature_set import PluginProvidedFeatureSet +from debputy.substitution import ( + Substitution, + VariableContext, + SubstitutionImpl, + NULL_SUBSTITUTION, +) +from debputy.util import _error, PKGNAME_REGEX, resolve_source_date_epoch, setup_logging + +if TYPE_CHECKING: + from argparse import _SubParsersAction + + +CommandHandler = Callable[["CommandContext"], None] +ArgparserConfigurator = Callable[[argparse.ArgumentParser], None] + + +def add_arg( + *name_or_flags: str, + **kwargs, +) -> Callable[[argparse.ArgumentParser], None]: + def _configurator(argparser: argparse.ArgumentParser) -> None: + argparser.add_argument( + *name_or_flags, + **kwargs, + ) + + return _configurator + + +@dataclasses.dataclass(slots=True, frozen=True) +class CommandArg: + parsed_args: argparse.Namespace + plugin_search_dirs: Sequence[str] + + +@dataclasses.dataclass +class Command: + handler: Callable[["CommandContext"], None] + require_substitution: bool = True + requested_plugins_only: bool = False + + +class CommandContext: + def __init__( + self, + parsed_args: argparse.Namespace, + plugin_search_dirs: Sequence[str], + require_substitution: bool = True, + requested_plugins_only: bool = False, + ) -> None: + self.parsed_args = parsed_args + self.plugin_search_dirs = plugin_search_dirs + self._require_substitution = require_substitution + self._requested_plugins_only = requested_plugins_only + self._debputy_plugin_feature_set: PluginProvidedFeatureSet = ( + PluginProvidedFeatureSet() + ) + self._debian_dir = FSROOverlay.create_root_dir("debian", "debian") + self._mtime: Optional[int] = None + self._source_variables: Optional[Mapping[str, str]] = None + self._substitution: Optional[Substitution] = None + self._requested_plugins: Optional[Sequence[str]] = None + self._plugins_loaded = False + self._dctrl_data: Optional[ + Tuple[ + DpkgArchitectureBuildProcessValuesTable, + DpkgArchTable, + DebBuildOptionsAndProfiles, + "SourcePackage", + Mapping[str, "BinaryPackage"], + ] + ] = None + + @property + def debian_dir(self) -> VirtualPath: + return self._debian_dir + + @property + def mtime(self) -> int: + if self._mtime is None: + self._mtime = resolve_source_date_epoch( + None, + substitution=self.substitution, + ) + return self._mtime + + def source_package(self) -> SourcePackage: + _a, _b, _c, source, _d = self._parse_dctrl() + return source + + def binary_packages(self) -> Mapping[str, "BinaryPackage"]: + _a, _b, _c, _source, binary_package_table = self._parse_dctrl() + return binary_package_table + + def requested_plugins(self) -> Sequence[str]: + if self._requested_plugins is None: + self._requested_plugins = self._resolve_requested_plugins() + return self._requested_plugins + + def required_plugins(self) -> Set[str]: + return set(getattr(self.parsed_args, "required_plugins") or []) + + @property + def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles": + _a, _b, deb_build_options_and_profiles, _c, _d = self._parse_dctrl() + return deb_build_options_and_profiles + + @property + def deb_build_options(self) -> Mapping[str, Optional[str]]: + return self.deb_build_options_and_profiles.deb_build_options + + def _create_substitution( + self, + parsed_args: argparse.Namespace, + plugin_feature_set: PluginProvidedFeatureSet, + debian_dir: VirtualPath, + ) -> Substitution: + requested_subst = self._require_substitution + if hasattr(parsed_args, "substitution"): + requested_subst = parsed_args.substitution + if requested_subst is False and self._require_substitution: + _error(f"--no-substitution cannot be used with {parsed_args.command}") + if self._require_substitution or requested_subst is not False: + variable_context = VariableContext(debian_dir) + return SubstitutionImpl( + plugin_feature_set=plugin_feature_set, + unresolvable_substitutions=frozenset(["PACKAGE"]), + variable_context=variable_context, + ) + return NULL_SUBSTITUTION + + def load_plugins(self) -> PluginProvidedFeatureSet: + if not self._plugins_loaded: + requested_plugins = None + required_plugins = self.required_plugins() + if self._requested_plugins_only: + requested_plugins = self.requested_plugins() + debug_mode = getattr(self.parsed_args, "debug_mode", False) + load_plugin_features( + self.plugin_search_dirs, + self.substitution, + requested_plugins_only=requested_plugins, + required_plugins=required_plugins, + plugin_feature_set=self._debputy_plugin_feature_set, + debug_mode=debug_mode, + ) + self._plugins_loaded = True + return self._debputy_plugin_feature_set + + @staticmethod + def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]: + package_prefix = "debputy-plugin-" + for dep_clause in (d.strip() for d in dep_field.split(",")): + dep = dep_clause.split("|")[0].strip() + if not dep.startswith(package_prefix): + continue + m = PKGNAME_REGEX.search(dep) + assert m + package_name = m.group(0) + plugin_name = package_name[len(package_prefix) :] + yield plugin_name + + def _resolve_requested_plugins(self) -> Sequence[str]: + _a, _b, _c, source_package, _d = self._parse_dctrl() + bd = source_package.fields.get("Build-Depends", "") + plugins = list(self._plugin_from_dependency_field(bd)) + for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"): + f = source_package.fields.get(field_name) + if not f: + continue + for plugin in self._plugin_from_dependency_field(f): + raise DebputyRuntimeError( + f"Cannot load plugins via {field_name}:" + f" Please move debputy-plugin-{plugin} dependency to Build-Depends." + ) + + return plugins + + @property + def substitution(self) -> Substitution: + if self._substitution is None: + self._substitution = self._create_substitution( + self.parsed_args, + self._debputy_plugin_feature_set, + self.debian_dir, + ) + return self._substitution + + def _parse_dctrl( + self, + ) -> Tuple[ + DpkgArchitectureBuildProcessValuesTable, + DpkgArchTable, + DebBuildOptionsAndProfiles, + "SourcePackage", + Mapping[str, "BinaryPackage"], + ]: + if self._dctrl_data is None: + build_env = DebBuildOptionsAndProfiles.instance() + dpkg_architecture_variables = dpkg_architecture_table() + dpkg_arch_query_table = DpkgArchTable.load_arch_table() + + packages: Union[Set[str], FrozenSet[str]] = frozenset() + if hasattr(self.parsed_args, "packages"): + packages = self.parsed_args.packages + + try: + debian_control = self.debian_dir.get("control") + if debian_control is None: + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + os.path.join(self.debian_dir.fs_path, "control"), + ) + source_package, binary_packages = parse_source_debian_control( + debian_control, + packages, # -p/--package + set(), # -N/--no-package + False, # -i + False, # -a + dpkg_architecture_variables=dpkg_architecture_variables, + dpkg_arch_query_table=dpkg_arch_query_table, + build_env=build_env, + ) + assert packages <= binary_packages.keys() + except FileNotFoundError: + _error( + "This subcommand must be run from a source package root; expecting debian/control to exist." + ) + + self._dctrl_data = ( + dpkg_architecture_variables, + dpkg_arch_query_table, + build_env, + source_package, + binary_packages, + ) + + return self._dctrl_data + + @property + def has_dctrl_file(self) -> bool: + debian_control = self.debian_dir.get("control") + return debian_control is not None + + def manifest_parser( + self, + *, + manifest_path: Optional[str] = None, + ) -> YAMLManifestParser: + substitution = self.substitution + + ( + dpkg_architecture_variables, + dpkg_arch_query_table, + build_env, + source_package, + binary_packages, + ) = self._parse_dctrl() + + if self.parsed_args.debputy_manifest is not None: + manifest_path = self.parsed_args.debputy_manifest + if manifest_path is None: + manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") + return YAMLManifestParser( + manifest_path, + source_package, + binary_packages, + substitution, + dpkg_architecture_variables, + dpkg_arch_query_table, + build_env, + self.load_plugins(), + debian_dir=self.debian_dir, + ) + + def parse_manifest( + self, + *, + manifest_path: Optional[str] = None, + ) -> HighLevelManifest: + substitution = self.substitution + manifest_required = False + + ( + dpkg_architecture_variables, + dpkg_arch_query_table, + build_env, + _, + binary_packages, + ) = self._parse_dctrl() + + if self.parsed_args.debputy_manifest is not None: + manifest_path = self.parsed_args.debputy_manifest + manifest_required = True + if manifest_path is None: + manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") + parser = self.manifest_parser(manifest_path=manifest_path) + + os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute( + "{{SOURCE_DATE_EPOCH}}", + "Internal resolution", + ) + if os.path.isfile(manifest_path): + return parser.parse_manifest() + if manifest_required: + _error(f'The path "{manifest_path}" is not a file!') + return parser.build_manifest() + + +class CommandBase: + __slots__ = () + + def configure(self, argparser: argparse.ArgumentParser) -> None: + # Does nothing by default + pass + + def __call__(self, command_arg: CommandArg) -> None: + raise NotImplementedError + + +class SubcommandBase(CommandBase): + __slots__ = ("name", "aliases", "help_description") + + def __init__( + self, + name: str, + *, + aliases: Sequence[str] = tuple(), + help_description: Optional[str] = None, + ) -> None: + self.name = name + self.aliases = aliases + self.help_description = help_description + + def add_subcommand_to_subparser( + self, + subparser: "_SubParsersAction", + ) -> argparse.ArgumentParser: + parser = subparser.add_parser( + self.name, + aliases=self.aliases, + help=self.help_description, + allow_abbrev=False, + ) + self.configure(parser) + return parser + + +class GenericSubCommand(SubcommandBase): + __slots__ = ( + "_handler", + "_configure_handler", + "_require_substitution", + "_requested_plugins_only", + "_log_only_to_stderr", + ) + + def __init__( + self, + name: str, + handler: Callable[[CommandContext], None], + *, + aliases: Sequence[str] = tuple(), + help_description: Optional[str] = None, + configure_handler: Optional[Callable[[argparse.ArgumentParser], None]] = None, + require_substitution: bool = True, + requested_plugins_only: bool = False, + log_only_to_stderr: bool = False, + ) -> None: + super().__init__(name, aliases=aliases, help_description=help_description) + self._handler = handler + self._configure_handler = configure_handler + self._require_substitution = require_substitution + self._requested_plugins_only = requested_plugins_only + self._log_only_to_stderr = log_only_to_stderr + + def configure_handler( + self, + handler: Callable[[argparse.ArgumentParser], None], + ) -> None: + if self._configure_handler is not None: + raise TypeError("Only one argument handler can be provided") + self._configure_handler = handler + + def configure(self, argparser: argparse.ArgumentParser) -> None: + handler = self._configure_handler + if handler is not None: + handler(argparser) + + def __call__(self, command_arg: CommandArg) -> None: + context = CommandContext( + command_arg.parsed_args, + command_arg.plugin_search_dirs, + self._require_substitution, + self._requested_plugins_only, + ) + if self._log_only_to_stderr: + setup_logging(reconfigure_logging=True, log_only_to_stderr=True) + return self._handler(context) + + +class DispatchingCommandMixin(CommandBase): + __slots__ = () + + def add_subcommand(self, subcommand: SubcommandBase) -> None: + raise NotImplementedError + + def add_dispatching_subcommand( + self, + name: str, + dest: str, + *, + aliases: Sequence[str] = tuple(), + help_description: Optional[str] = None, + metavar: str = "command", + default_subcommand: Optional[str] = None, + ) -> "DispatcherCommand": + ds = DispatcherCommand( + name, + dest, + aliases=aliases, + help_description=help_description, + metavar=metavar, + default_subcommand=default_subcommand, + ) + self.add_subcommand(ds) + return ds + + def register_subcommand( + self, + name: Union[str, Sequence[str]], + *, + help_description: Optional[str] = None, + argparser: Optional[ + Union[ArgparserConfigurator, Sequence[ArgparserConfigurator]] + ] = None, + require_substitution: bool = True, + requested_plugins_only: bool = False, + log_only_to_stderr: bool = False, + ) -> Callable[[CommandHandler], GenericSubCommand]: + if isinstance(name, str): + cmd_name = name + aliases = [] + else: + cmd_name = name[0] + aliases = name[1:] + + if argparser is not None and not callable(argparser): + args = argparser + + def _wrapper(parser: argparse.ArgumentParser) -> None: + for configurator in args: + configurator(parser) + + argparser = _wrapper + + def _annotation_impl(func: CommandHandler) -> GenericSubCommand: + subcommand = GenericSubCommand( + cmd_name, + func, + aliases=aliases, + help_description=help_description, + require_substitution=require_substitution, + requested_plugins_only=requested_plugins_only, + log_only_to_stderr=log_only_to_stderr, + ) + self.add_subcommand(subcommand) + if argparser is not None: + subcommand.configure_handler(argparser) + + return subcommand + + return _annotation_impl + + +class DispatcherCommand(SubcommandBase, DispatchingCommandMixin): + __slots__ = ( + "_subcommands", + "_aliases", + "_dest", + "_metavar", + "_required", + "_default_subcommand", + "_argparser", + ) + + def __init__( + self, + name: str, + dest: str, + *, + aliases: Sequence[str] = tuple(), + help_description: Optional[str] = None, + metavar: str = "command", + default_subcommand: Optional[str] = None, + ) -> None: + super().__init__(name, aliases=aliases, help_description=help_description) + self._aliases: Dict[str, SubcommandBase] = {} + self._subcommands: Dict[str, SubcommandBase] = {} + self._dest = dest + self._metavar = metavar + self._default_subcommand = default_subcommand + self._argparser: Optional[argparse.ArgumentParser] = None + + def add_subcommand(self, subcommand: SubcommandBase) -> None: + all_names = [subcommand.name] + if subcommand.aliases: + all_names.extend(subcommand.aliases) + aliases = self._aliases + for n in all_names: + if n in aliases: + raise ValueError( + f"Internal error: Multiple handlers for {n} on topic {self.name}" + ) + + aliases[n] = subcommand + self._subcommands[subcommand.name] = subcommand + + def configure(self, argparser: argparse.ArgumentParser) -> None: + if self._argparser is not None: + raise TypeError("Cannot configure twice!") + self._argparser = argparser + subcommands = self._subcommands + if not subcommands: + raise ValueError( + f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)" + ) + default_subcommand = self._default_subcommand + required = default_subcommand is None + if ( + default_subcommand is not None + and default_subcommand not in ("--help", "-h") + and default_subcommand not in subcommands + ): + raise ValueError( + f"Internal error: Subcommand {self.name} should have {default_subcommand} as default," + " but it was not registered?" + ) + subparser = argparser.add_subparsers( + dest=self._dest, + required=required, + metavar=self._metavar, + ) + for subcommand in subcommands.values(): + subcommand.add_subcommand_to_subparser(subparser) + + def has_command(self, command: str) -> bool: + return command in self._aliases + + def __call__(self, command_arg: CommandArg) -> None: + argparser = self._argparser + assert argparser is not None + v = getattr(command_arg.parsed_args, self._dest, None) + if v is None: + v = self._default_subcommand + if v in ("--help", "-h"): + argparser.parse_args([v]) + _error("Missing command", prog=argparser.prog) + + assert ( + v is not None + ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?" + assert ( + v in self._aliases + ), f"Internal error: {v} was accepted as a topic, but it was not registered?" + self._aliases[v](command_arg) + + +ROOT_COMMAND = DispatcherCommand( + "root", + dest="command", + metavar="COMMAND", +) |