diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/parsers/__init__.py')
-rw-r--r-- | test/lib/ansible_test/_internal/cli/parsers/__init__.py | 303 |
1 files changed, 303 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/parsers/__init__.py b/test/lib/ansible_test/_internal/cli/parsers/__init__.py new file mode 100644 index 0000000..1aedf63 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/__init__.py @@ -0,0 +1,303 @@ +"""Composite argument parsers for ansible-test specific command-line arguments.""" +from __future__ import annotations + +import typing as t + +from ...constants import ( + SUPPORTED_PYTHON_VERSIONS, +) + +from ...ci import ( + get_ci_provider, +) + +from ...host_configs import ( + ControllerConfig, + NetworkConfig, + NetworkInventoryConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, +) + +from ..argparsing.parsers import ( + DocumentationState, + Parser, + ParserState, + TypeParser, +) + +from .value_parsers import ( + PythonParser, +) + +from .host_config_parsers import ( + ControllerParser, + DockerParser, + NetworkInventoryParser, + NetworkRemoteParser, + OriginParser, + PosixRemoteParser, + PosixSshParser, + WindowsInventoryParser, + WindowsRemoteParser, +) + + +from .base_argument_parsers import ( + ControllerNamespaceParser, + TargetNamespaceParser, + TargetsNamespaceParser, +) + + +class OriginControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is not supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return dict( + origin=OriginParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class DelegatedControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + origin=OriginParser(), + docker=DockerParser(controller=True), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=True), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PosixTargetParser(TargetNamespaceParser, TypeParser): + """Composite argument parser for a POSIX target.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + controller=ControllerParser(), + docker=DockerParser(controller=False), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=False), + ) + + parsers.update( + ssh=PosixSshParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class WindowsTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a Windows target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[WindowsConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=WindowsInventoryParser(), + ) + + if not targets or not any(isinstance(target, WindowsInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=WindowsRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class NetworkTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a network target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[NetworkConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=NetworkInventoryParser(), + ) + + if not targets or not any(isinstance(target, NetworkInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=NetworkRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PythonTargetParser(TargetsNamespaceParser, Parser): + """Composite argument parser for a Python target.""" + def __init__(self, allow_venv: bool) -> None: + super().__init__() + + self.allow_venv = allow_venv + + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-python' + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + versions = list(SUPPORTED_PYTHON_VERSIONS) + + for target in state.root_namespace.targets or []: # type: PosixConfig + versions.remove(target.python.version) + + parser = PythonParser(versions, allow_venv=self.allow_venv, allow_default=True) + python = parser.parse(state) + + value = ControllerConfig(python=python) + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '\n'.join([ + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller', + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller', + ]) + + return None + + +class SanityPythonTargetParser(PythonTargetParser): + """Composite argument parser for a sanity Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=False) + + +class UnitsPythonTargetParser(PythonTargetParser): + """Composite argument parser for a units Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=True) + + +class PosixSshTargetParser(PosixTargetParser): + """Composite argument parser for a POSIX SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-posix' + + +class WindowsSshTargetParser(WindowsTargetParser): + """Composite argument parser for a Windows SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-windows' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True + + +class NetworkSshTargetParser(NetworkTargetParser): + """Composite argument parser for a network SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-network' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True |