diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/parsers')
6 files changed, 1161 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/parsers/__init__.py b/test/lib/ansible_test/_internal/cli/parsers/__init__.py new file mode 100644 index 0000000..1aedf63 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/__init__.py @@ -0,0 +1,303 @@ +"""Composite argument parsers for ansible-test specific command-line arguments.""" +from __future__ import annotations + +import typing as t + +from ...constants import ( + SUPPORTED_PYTHON_VERSIONS, +) + +from ...ci import ( + get_ci_provider, +) + +from ...host_configs import ( + ControllerConfig, + NetworkConfig, + NetworkInventoryConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, +) + +from ..argparsing.parsers import ( + DocumentationState, + Parser, + ParserState, + TypeParser, +) + +from .value_parsers import ( + PythonParser, +) + +from .host_config_parsers import ( + ControllerParser, + DockerParser, + NetworkInventoryParser, + NetworkRemoteParser, + OriginParser, + PosixRemoteParser, + PosixSshParser, + WindowsInventoryParser, + WindowsRemoteParser, +) + + +from .base_argument_parsers import ( + ControllerNamespaceParser, + TargetNamespaceParser, + TargetsNamespaceParser, +) + + +class OriginControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is not supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return dict( + origin=OriginParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class DelegatedControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + origin=OriginParser(), + docker=DockerParser(controller=True), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=True), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PosixTargetParser(TargetNamespaceParser, TypeParser): + """Composite argument parser for a POSIX target.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + controller=ControllerParser(), + docker=DockerParser(controller=False), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=False), + ) + + parsers.update( + ssh=PosixSshParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class WindowsTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a Windows target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[WindowsConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=WindowsInventoryParser(), + ) + + if not targets or not any(isinstance(target, WindowsInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=WindowsRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class NetworkTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a network target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[NetworkConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=NetworkInventoryParser(), + ) + + if not targets or not any(isinstance(target, NetworkInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=NetworkRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PythonTargetParser(TargetsNamespaceParser, Parser): + """Composite argument parser for a Python target.""" + def __init__(self, allow_venv: bool) -> None: + super().__init__() + + self.allow_venv = allow_venv + + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-python' + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + versions = list(SUPPORTED_PYTHON_VERSIONS) + + for target in state.root_namespace.targets or []: # type: PosixConfig + versions.remove(target.python.version) + + parser = PythonParser(versions, allow_venv=self.allow_venv, allow_default=True) + python = parser.parse(state) + + value = ControllerConfig(python=python) + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '\n'.join([ + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller', + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller', + ]) + + return None + + +class SanityPythonTargetParser(PythonTargetParser): + """Composite argument parser for a sanity Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=False) + + +class UnitsPythonTargetParser(PythonTargetParser): + """Composite argument parser for a units Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=True) + + +class PosixSshTargetParser(PosixTargetParser): + """Composite argument parser for a POSIX SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-posix' + + +class WindowsSshTargetParser(WindowsTargetParser): + """Composite argument parser for a Windows SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-windows' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True + + +class NetworkSshTargetParser(NetworkTargetParser): + """Composite argument parser for a network SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-network' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True diff --git a/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py new file mode 100644 index 0000000..aac7a69 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py @@ -0,0 +1,73 @@ +"""Base classes for the primary parsers for composite command line arguments.""" +from __future__ import annotations + +import abc +import typing as t + +from ..argparsing.parsers import ( + CompletionError, + NamespaceParser, + ParserState, +) + + +class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for controller namespace parsers.""" + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'controller' + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + if state.root_namespace.targets: + raise ControllerRequiredFirstError() + + return super().parse(state) + + +class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for target namespace parsers involving a single target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target' + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'targets' + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return True + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True + + +class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for controller namespace parsers involving multiple targets.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target' + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'targets' + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return True + + +class ControllerRequiredFirstError(CompletionError): + """Exception raised when controller and target options are specified out-of-order.""" + def __init__(self) -> None: + super().__init__('The `--controller` option must be specified before `--target` option(s).') diff --git a/test/lib/ansible_test/_internal/cli/parsers/helpers.py b/test/lib/ansible_test/_internal/cli/parsers/helpers.py new file mode 100644 index 0000000..836a893 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/helpers.py @@ -0,0 +1,57 @@ +"""Helper functions for composite parsers.""" +from __future__ import annotations + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...completion import ( + docker_completion, + remote_completion, + filter_completion, +) + +from ...host_configs import ( + DockerConfig, + HostConfig, + PosixRemoteConfig, +) + + +def get_docker_pythons(name: str, controller: bool, strict: bool) -> list[str]: + """Return a list of docker instance Python versions supported by the specified host config.""" + image_config = filter_completion(docker_completion()).get(name) + available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS + + if not image_config: + return [] if strict else list(available_pythons) + + supported_pythons = [python for python in image_config.supported_pythons if python in available_pythons] + + return supported_pythons + + +def get_remote_pythons(name: str, controller: bool, strict: bool) -> list[str]: + """Return a list of remote instance Python versions supported by the specified host config.""" + platform_config = filter_completion(remote_completion()).get(name) + available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS + + if not platform_config: + return [] if strict else list(available_pythons) + + supported_pythons = [python for python in platform_config.supported_pythons if python in available_pythons] + + return supported_pythons + + +def get_controller_pythons(controller_config: HostConfig, strict: bool) -> list[str]: + """Return a list of controller Python versions supported by the specified host config.""" + if isinstance(controller_config, DockerConfig): + pythons = get_docker_pythons(controller_config.name, False, strict) + elif isinstance(controller_config, PosixRemoteConfig): + pythons = get_remote_pythons(controller_config.name, False, strict) + else: + pythons = list(SUPPORTED_PYTHON_VERSIONS) + + return pythons diff --git a/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py new file mode 100644 index 0000000..ee6f146 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py @@ -0,0 +1,310 @@ +"""Composite parsers for the various types of hosts.""" +from __future__ import annotations + +import typing as t + +from ...completion import ( + docker_completion, + network_completion, + remote_completion, + windows_completion, + filter_completion, +) + +from ...host_configs import ( + ControllerConfig, + DockerConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixRemoteConfig, + PosixSshConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ..compat import ( + get_fallback_remote_controller, +) + +from ..argparsing.parsers import ( + ChoicesParser, + DocumentationState, + FileParser, + MatchConditions, + NamespaceWrappedParser, + PairParser, + Parser, + ParserError, + ParserState, +) + +from .value_parsers import ( + PlatformParser, + SshConnectionParser, +) + +from .key_value_parsers import ( + ControllerKeyValueParser, + DockerKeyValueParser, + EmptyKeyValueParser, + NetworkRemoteKeyValueParser, + OriginKeyValueParser, + PosixRemoteKeyValueParser, + PosixSshKeyValueParser, + WindowsRemoteKeyValueParser, +) + +from .helpers import ( + get_docker_pythons, + get_remote_pythons, +) + + +class OriginParser(Parser): + """Composite argument parser for the origin.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = OriginConfig() + + state.set_namespace(namespace) + + parser = OriginKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return OriginKeyValueParser().document(state) + + +class ControllerParser(Parser): + """Composite argument parser for the controller.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = ControllerConfig() + + state.set_namespace(namespace) + + parser = ControllerKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return ControllerKeyValueParser().document(state) + + +class DockerParser(PairParser): + """Composite argument parser for a docker host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return DockerConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', ChoicesParser(list(filter_completion(docker_completion(), controller_only=self.controller)), + conditions=MatchConditions.CHOICE | MatchConditions.ANY)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return DockerKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: DockerConfig = super().parse(state) + + if not value.python and not get_docker_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for docker image: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = 'default' + content = '\n'.join([f' {image} ({", ".join(get_docker_pythons(image, self.controller, False))})' + for image, item in filter_completion(docker_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {image} # python must be specified for custom images', + ]) + + state.sections[f'{"controller" if self.controller else "target"} docker images and supported python version (choose one):'] = content + + return f'{{image}}[,{DockerKeyValueParser(default, self.controller).document(state)}]' + + +class PosixRemoteParser(PairParser): + """Composite argument parser for a POSIX remote host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', PlatformParser(list(filter_completion(remote_completion(), controller_only=self.controller)))) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixRemoteKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: PosixRemoteConfig = super().parse(state) + + if not value.python and not get_remote_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for remote: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = get_fallback_remote_controller() + content = '\n'.join([f' {name} ({", ".join(get_remote_pythons(name, self.controller, False))})' + for name, item in filter_completion(remote_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # python must be specified for unknown systems', + ]) + + state.sections[f'{"controller" if self.controller else "target"} remote systems and supported python versions (choose one):'] = content + + return f'{{system}}[,{PosixRemoteKeyValueParser(default, self.controller).document(state)}]' + + +class WindowsRemoteParser(PairParser): + """Composite argument parser for a Windows remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(windows_completion())) + + for target in state.root_namespace.targets or []: # type: WindowsRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return WindowsRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(windows_completion()).items()]) + + content += '\n'.join([ + '', + ' windows/{version} # use an unknown windows version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{WindowsRemoteKeyValueParser().document(state)}]' + + +class NetworkRemoteParser(PairParser): + """Composite argument parser for a network remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(network_completion())) + + for target in state.root_namespace.targets or []: # type: NetworkRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return NetworkRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(network_completion()).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # use an unknown platform and version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{NetworkRemoteKeyValueParser().document(state)}]' + + +class WindowsInventoryParser(PairParser): + """Composite argument parser for a Windows inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class NetworkInventoryParser(PairParser): + """Composite argument parser for a network inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class PosixSshParser(PairParser): + """Composite argument parser for a POSIX SSH host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixSshConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return SshConnectionParser() + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixSshKeyValueParser() + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return True + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return f'{SshConnectionParser().document(state)}[,{PosixSshKeyValueParser().document(state)}]' diff --git a/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py new file mode 100644 index 0000000..049b71e --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py @@ -0,0 +1,239 @@ +"""Composite argument key-value parsers used by other parsers.""" +from __future__ import annotations + +import typing as t + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_PROVIDERS, + SECCOMP_CHOICES, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...completion import ( + AuditMode, + CGroupVersion, +) + +from ...util import ( + REMOTE_ARCHITECTURES, +) + +from ...host_configs import ( + OriginConfig, +) + +from ...become import ( + SUPPORTED_BECOME_METHODS, +) + +from ..argparsing.parsers import ( + AnyParser, + BooleanParser, + ChoicesParser, + DocumentationState, + EnumValueChoicesParser, + IntegerParser, + KeyValueParser, + Parser, + ParserState, +) + +from .value_parsers import ( + PythonParser, +) + +from .helpers import ( + get_controller_pythons, + get_remote_pythons, + get_docker_pythons, +) + + +class OriginKeyValueParser(KeyValueParser): + """Composite argument parser for origin key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + versions = CONTROLLER_PYTHON_VERSIONS + + return dict( + python=PythonParser(versions=versions, allow_venv=True, allow_default=True), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=CONTROLLER_PYTHON_VERSIONS, allow_venv=True, allow_default=True) + + section_name = 'origin options' + + state.sections[f'controller {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}} # default' + + +class ControllerKeyValueParser(KeyValueParser): + """Composite argument parser for controller key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + versions = get_controller_pythons(state.root_namespace.controller, False) + allow_default = bool(get_controller_pythons(state.root_namespace.controller, True)) + allow_venv = isinstance(state.root_namespace.controller, OriginConfig) or not state.root_namespace.controller + + return dict( + python=PythonParser(versions=versions, allow_venv=allow_venv, allow_default=allow_default), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'controller options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller', + f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller', + ]) + + return f'{{{section_name}}} # default' + + +class DockerKeyValueParser(KeyValueParser): + """Composite argument parser for docker key/value pairs.""" + def __init__(self, image: str, controller: bool) -> None: + self.controller = controller + self.versions = get_docker_pythons(image, controller, False) + self.allow_default = bool(get_docker_pythons(image, controller, True)) + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default), + seccomp=ChoicesParser(SECCOMP_CHOICES), + cgroup=EnumValueChoicesParser(CGroupVersion), + audit=EnumValueChoicesParser(AuditMode), + privileged=BooleanParser(), + memory=IntegerParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default) + + section_name = 'docker options' + + state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + f' seccomp={ChoicesParser(SECCOMP_CHOICES).document(state)}', + f' cgroup={EnumValueChoicesParser(CGroupVersion).document(state)}', + f' audit={EnumValueChoicesParser(AuditMode).document(state)}', + f' privileged={BooleanParser().document(state)}', + f' memory={IntegerParser().document(state)} # bytes', + ]) + + return f'{{{section_name}}}' + + +class PosixRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for POSIX remote key/value pairs.""" + def __init__(self, name: str, controller: bool) -> None: + self.controller = controller + self.versions = get_remote_pythons(name, controller, False) + self.allow_default = bool(get_remote_pythons(name, controller, True)) + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + become=ChoicesParser(list(SUPPORTED_BECOME_METHODS)), + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default) + + section_name = 'remote options' + + state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([ + f' become={ChoicesParser(list(SUPPORTED_BECOME_METHODS)).document(state)}', + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}}' + + +class WindowsRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for Windows remote key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'remote options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + ]) + + return f'{{{section_name}}}' + + +class NetworkRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for network remote key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + collection=AnyParser(), + connection=AnyParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'remote options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + ' collection={collection}', + ' connection={connection}', + ]) + + return f'{{{section_name}}}' + + +class PosixSshKeyValueParser(KeyValueParser): + """Composite argument parser for POSIX SSH host key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + python=PythonParser(versions=list(SUPPORTED_PYTHON_VERSIONS), allow_venv=False, allow_default=False), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=False) + + section_name = 'ssh options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}}' + + +class EmptyKeyValueParser(KeyValueParser): + """Composite argument parser when a key/value parser is required but there are no keys available.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return {} diff --git a/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py new file mode 100644 index 0000000..9453b76 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py @@ -0,0 +1,179 @@ +"""Composite argument value parsers used by other parsers.""" +from __future__ import annotations + +import collections.abc as c +import typing as t + +from ...host_configs import ( + NativePythonConfig, + PythonConfig, + VirtualPythonConfig, +) + +from ..argparsing.parsers import ( + AbsolutePathParser, + AnyParser, + ChoicesParser, + DocumentationState, + IntegerParser, + MatchConditions, + Parser, + ParserError, + ParserState, + ParserBoundary, +) + + +class PythonParser(Parser): + """ + Composite argument parser for Python versions, with support for specifying paths and using virtual environments. + + Allowed formats: + + {version} + venv/{version} + venv/system-site-packages/{version} + + The `{version}` has two possible formats: + + X.Y + X.Y@{path} + + Where `X.Y` is the Python major and minor version number and `{path}` is an absolute path with one of the following formats: + + /path/to/python + /path/to/python/directory/ + + When a trailing slash is present, it is considered a directory, and `python{version}` will be appended to it automatically. + + The default path depends on the context: + + - Known docker/remote environments can declare their own path. + - The origin host uses `sys.executable` if `{version}` matches the current version in `sys.version_info`. + - The origin host (as a controller or target) use the `$PATH` environment variable to find `python{version}`. + - As a fallback/default, the path `/usr/bin/python{version}` is used. + + NOTE: The Python path determines where to find the Python interpreter. + In the case of an ansible-test managed virtual environment, that Python interpreter will be used to create the virtual environment. + So the path given will not be the one actually used for the controller or target. + + Known docker/remote environments limit the available Python versions to configured values known to be valid. + The origin host and unknown environments assume all relevant Python versions are available. + """ + def __init__(self, + versions: c.Sequence[str], + *, + allow_default: bool, + allow_venv: bool, + ): + version_choices = list(versions) + + if allow_default: + version_choices.append('default') + + first_choices = list(version_choices) + + if allow_venv: + first_choices.append('venv/') + + venv_choices = list(version_choices) + ['system-site-packages/'] + + self.versions = versions + self.allow_default = allow_default + self.allow_venv = allow_venv + self.version_choices = version_choices + self.first_choices = first_choices + self.venv_choices = venv_choices + self.venv_choices = venv_choices + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + boundary: ParserBoundary + + with state.delimit('@/', required=False) as boundary: + version = ChoicesParser(self.first_choices).parse(state) + + python: PythonConfig + + if version == 'venv': + with state.delimit('@/', required=False) as boundary: + version = ChoicesParser(self.venv_choices).parse(state) + + if version == 'system-site-packages': + system_site_packages = True + + with state.delimit('@', required=False) as boundary: + version = ChoicesParser(self.version_choices).parse(state) + else: + system_site_packages = False + + python = VirtualPythonConfig(version=version, system_site_packages=system_site_packages) + else: + python = NativePythonConfig(version=version) + + if boundary.match == '@': + # FUTURE: For OriginConfig or ControllerConfig->OriginConfig the path could be validated with an absolute path parser (file or directory). + python.path = AbsolutePathParser().parse(state) + + return python + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + + docs = '[venv/[system-site-packages/]]' if self.allow_venv else '' + + if self.versions: + docs += '|'.join(self.version_choices) + else: + docs += '{X.Y}' + + docs += '[@{path|dir/}]' + + return docs + + +class PlatformParser(ChoicesParser): + """Composite argument parser for "{platform}/{version}" formatted choices.""" + def __init__(self, choices: list[str]) -> None: + super().__init__(choices, conditions=MatchConditions.CHOICE | MatchConditions.ANY) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + + if len(value.split('/')) != 2: + raise ParserError(f'invalid platform format: {value}') + + return value + + +class SshConnectionParser(Parser): + """ + Composite argument parser for connecting to a host using SSH. + Format: user@host[:port] + """ + EXPECTED_FORMAT = '{user}@{host}[:{port}]' + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + + with state.delimit('@'): + user = AnyParser(no_match_message=f'Expected {{user}} from: {self.EXPECTED_FORMAT}').parse(state) + + setattr(namespace, 'user', user) + + with state.delimit(':', required=False) as colon: # type: ParserBoundary + host = AnyParser(no_match_message=f'Expected {{host}} from: {self.EXPECTED_FORMAT}').parse(state) + + setattr(namespace, 'host', host) + + if colon.match: + port = IntegerParser(65535).parse(state) + setattr(namespace, 'port', port) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return self.EXPECTED_FORMAT |