summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli/parsers
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/parsers')
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/__init__.py303
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py73
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/helpers.py57
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py310
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py239
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/value_parsers.py179
6 files changed, 1161 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/parsers/__init__.py b/test/lib/ansible_test/_internal/cli/parsers/__init__.py
new file mode 100644
index 0000000..1aedf63
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/__init__.py
@@ -0,0 +1,303 @@
+"""Composite argument parsers for ansible-test specific command-line arguments."""
+from __future__ import annotations
+
+import typing as t
+
+from ...constants import (
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...ci import (
+ get_ci_provider,
+)
+
+from ...host_configs import (
+ ControllerConfig,
+ NetworkConfig,
+ NetworkInventoryConfig,
+ PosixConfig,
+ WindowsConfig,
+ WindowsInventoryConfig,
+)
+
+from ..argparsing.parsers import (
+ DocumentationState,
+ Parser,
+ ParserState,
+ TypeParser,
+)
+
+from .value_parsers import (
+ PythonParser,
+)
+
+from .host_config_parsers import (
+ ControllerParser,
+ DockerParser,
+ NetworkInventoryParser,
+ NetworkRemoteParser,
+ OriginParser,
+ PosixRemoteParser,
+ PosixSshParser,
+ WindowsInventoryParser,
+ WindowsRemoteParser,
+)
+
+
+from .base_argument_parsers import (
+ ControllerNamespaceParser,
+ TargetNamespaceParser,
+ TargetsNamespaceParser,
+)
+
+
+class OriginControllerParser(ControllerNamespaceParser, TypeParser):
+ """Composite argument parser for the controller when delegation is not supported."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return dict(
+ origin=OriginParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = '--controller options:'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class DelegatedControllerParser(ControllerNamespaceParser, TypeParser):
+ """Composite argument parser for the controller when delegation is supported."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = dict(
+ origin=OriginParser(),
+ docker=DockerParser(controller=True),
+ )
+
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=PosixRemoteParser(controller=True),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = '--controller options:'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class PosixTargetParser(TargetNamespaceParser, TypeParser):
+ """Composite argument parser for a POSIX target."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = dict(
+ controller=ControllerParser(),
+ docker=DockerParser(controller=False),
+ )
+
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=PosixRemoteParser(controller=False),
+ )
+
+ parsers.update(
+ ssh=PosixSshParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
+ """Composite argument parser for a Windows target."""
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return True
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers(state.root_namespace.targets)
+
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers([])
+
+ def get_internal_parsers(self, targets: list[WindowsConfig]) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = {}
+
+ if self.allow_inventory and not targets:
+ parsers.update(
+ inventory=WindowsInventoryParser(),
+ )
+
+ if not targets or not any(isinstance(target, WindowsInventoryConfig) for target in targets):
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=WindowsRemoteParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
+ """Composite argument parser for a network target."""
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return True
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers(state.root_namespace.targets)
+
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers([])
+
+ def get_internal_parsers(self, targets: list[NetworkConfig]) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = {}
+
+ if self.allow_inventory and not targets:
+ parsers.update(
+ inventory=NetworkInventoryParser(),
+ )
+
+ if not targets or not any(isinstance(target, NetworkInventoryConfig) for target in targets):
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=NetworkRemoteParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class PythonTargetParser(TargetsNamespaceParser, Parser):
+ """Composite argument parser for a Python target."""
+ def __init__(self, allow_venv: bool) -> None:
+ super().__init__()
+
+ self.allow_venv = allow_venv
+
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-python'
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ versions = list(SUPPORTED_PYTHON_VERSIONS)
+
+ for target in state.root_namespace.targets or []: # type: PosixConfig
+ versions.remove(target.python.version)
+
+ parser = PythonParser(versions, allow_venv=self.allow_venv, allow_default=True)
+ python = parser.parse(state)
+
+ value = ControllerConfig(python=python)
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '\n'.join([
+ f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller',
+ f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller',
+ ])
+
+ return None
+
+
+class SanityPythonTargetParser(PythonTargetParser):
+ """Composite argument parser for a sanity Python target."""
+ def __init__(self) -> None:
+ super().__init__(allow_venv=False)
+
+
+class UnitsPythonTargetParser(PythonTargetParser):
+ """Composite argument parser for a units Python target."""
+ def __init__(self) -> None:
+ super().__init__(allow_venv=True)
+
+
+class PosixSshTargetParser(PosixTargetParser):
+ """Composite argument parser for a POSIX SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-posix'
+
+
+class WindowsSshTargetParser(WindowsTargetParser):
+ """Composite argument parser for a Windows SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-windows'
+
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
+
+
+class NetworkSshTargetParser(NetworkTargetParser):
+ """Composite argument parser for a network SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-network'
+
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
diff --git a/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py
new file mode 100644
index 0000000..aac7a69
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py
@@ -0,0 +1,73 @@
+"""Base classes for the primary parsers for composite command line arguments."""
+from __future__ import annotations
+
+import abc
+import typing as t
+
+from ..argparsing.parsers import (
+ CompletionError,
+ NamespaceParser,
+ ParserState,
+)
+
+
+class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for controller namespace parsers."""
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'controller'
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ if state.root_namespace.targets:
+ raise ControllerRequiredFirstError()
+
+ return super().parse(state)
+
+
+class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for target namespace parsers involving a single target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target'
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'targets'
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return True
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
+
+
+class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for controller namespace parsers involving multiple targets."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target'
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'targets'
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return True
+
+
+class ControllerRequiredFirstError(CompletionError):
+ """Exception raised when controller and target options are specified out-of-order."""
+ def __init__(self) -> None:
+ super().__init__('The `--controller` option must be specified before `--target` option(s).')
diff --git a/test/lib/ansible_test/_internal/cli/parsers/helpers.py b/test/lib/ansible_test/_internal/cli/parsers/helpers.py
new file mode 100644
index 0000000..836a893
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/helpers.py
@@ -0,0 +1,57 @@
+"""Helper functions for composite parsers."""
+from __future__ import annotations
+
+from ...constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...completion import (
+ docker_completion,
+ remote_completion,
+ filter_completion,
+)
+
+from ...host_configs import (
+ DockerConfig,
+ HostConfig,
+ PosixRemoteConfig,
+)
+
+
+def get_docker_pythons(name: str, controller: bool, strict: bool) -> list[str]:
+ """Return a list of docker instance Python versions supported by the specified host config."""
+ image_config = filter_completion(docker_completion()).get(name)
+ available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
+
+ if not image_config:
+ return [] if strict else list(available_pythons)
+
+ supported_pythons = [python for python in image_config.supported_pythons if python in available_pythons]
+
+ return supported_pythons
+
+
+def get_remote_pythons(name: str, controller: bool, strict: bool) -> list[str]:
+ """Return a list of remote instance Python versions supported by the specified host config."""
+ platform_config = filter_completion(remote_completion()).get(name)
+ available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
+
+ if not platform_config:
+ return [] if strict else list(available_pythons)
+
+ supported_pythons = [python for python in platform_config.supported_pythons if python in available_pythons]
+
+ return supported_pythons
+
+
+def get_controller_pythons(controller_config: HostConfig, strict: bool) -> list[str]:
+ """Return a list of controller Python versions supported by the specified host config."""
+ if isinstance(controller_config, DockerConfig):
+ pythons = get_docker_pythons(controller_config.name, False, strict)
+ elif isinstance(controller_config, PosixRemoteConfig):
+ pythons = get_remote_pythons(controller_config.name, False, strict)
+ else:
+ pythons = list(SUPPORTED_PYTHON_VERSIONS)
+
+ return pythons
diff --git a/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py
new file mode 100644
index 0000000..ee6f146
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py
@@ -0,0 +1,310 @@
+"""Composite parsers for the various types of hosts."""
+from __future__ import annotations
+
+import typing as t
+
+from ...completion import (
+ docker_completion,
+ network_completion,
+ remote_completion,
+ windows_completion,
+ filter_completion,
+)
+
+from ...host_configs import (
+ ControllerConfig,
+ DockerConfig,
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+ OriginConfig,
+ PosixRemoteConfig,
+ PosixSshConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ..compat import (
+ get_fallback_remote_controller,
+)
+
+from ..argparsing.parsers import (
+ ChoicesParser,
+ DocumentationState,
+ FileParser,
+ MatchConditions,
+ NamespaceWrappedParser,
+ PairParser,
+ Parser,
+ ParserError,
+ ParserState,
+)
+
+from .value_parsers import (
+ PlatformParser,
+ SshConnectionParser,
+)
+
+from .key_value_parsers import (
+ ControllerKeyValueParser,
+ DockerKeyValueParser,
+ EmptyKeyValueParser,
+ NetworkRemoteKeyValueParser,
+ OriginKeyValueParser,
+ PosixRemoteKeyValueParser,
+ PosixSshKeyValueParser,
+ WindowsRemoteKeyValueParser,
+)
+
+from .helpers import (
+ get_docker_pythons,
+ get_remote_pythons,
+)
+
+
+class OriginParser(Parser):
+ """Composite argument parser for the origin."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = OriginConfig()
+
+ state.set_namespace(namespace)
+
+ parser = OriginKeyValueParser()
+ parser.parse(state)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return OriginKeyValueParser().document(state)
+
+
+class ControllerParser(Parser):
+ """Composite argument parser for the controller."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = ControllerConfig()
+
+ state.set_namespace(namespace)
+
+ parser = ControllerKeyValueParser()
+ parser.parse(state)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return ControllerKeyValueParser().document(state)
+
+
+class DockerParser(PairParser):
+ """Composite argument parser for a docker host."""
+ def __init__(self, controller: bool) -> None:
+ self.controller = controller
+
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return DockerConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('name', ChoicesParser(list(filter_completion(docker_completion(), controller_only=self.controller)),
+ conditions=MatchConditions.CHOICE | MatchConditions.ANY))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return DockerKeyValueParser(choice, self.controller)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value: DockerConfig = super().parse(state)
+
+ if not value.python and not get_docker_pythons(value.name, self.controller, True):
+ raise ParserError(f'Python version required for docker image: {value.name}')
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ default = 'default'
+ content = '\n'.join([f' {image} ({", ".join(get_docker_pythons(image, self.controller, False))})'
+ for image, item in filter_completion(docker_completion(), controller_only=self.controller).items()])
+
+ content += '\n'.join([
+ '',
+ ' {image} # python must be specified for custom images',
+ ])
+
+ state.sections[f'{"controller" if self.controller else "target"} docker images and supported python version (choose one):'] = content
+
+ return f'{{image}}[,{DockerKeyValueParser(default, self.controller).document(state)}]'
+
+
+class PosixRemoteParser(PairParser):
+ """Composite argument parser for a POSIX remote host."""
+ def __init__(self, controller: bool) -> None:
+ self.controller = controller
+
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return PosixRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('name', PlatformParser(list(filter_completion(remote_completion(), controller_only=self.controller))))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return PosixRemoteKeyValueParser(choice, self.controller)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value: PosixRemoteConfig = super().parse(state)
+
+ if not value.python and not get_remote_pythons(value.name, self.controller, True):
+ raise ParserError(f'Python version required for remote: {value.name}')
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ default = get_fallback_remote_controller()
+ content = '\n'.join([f' {name} ({", ".join(get_remote_pythons(name, self.controller, False))})'
+ for name, item in filter_completion(remote_completion(), controller_only=self.controller).items()])
+
+ content += '\n'.join([
+ '',
+ ' {platform}/{version} # python must be specified for unknown systems',
+ ])
+
+ state.sections[f'{"controller" if self.controller else "target"} remote systems and supported python versions (choose one):'] = content
+
+ return f'{{system}}[,{PosixRemoteKeyValueParser(default, self.controller).document(state)}]'
+
+
+class WindowsRemoteParser(PairParser):
+ """Composite argument parser for a Windows remote host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return WindowsRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ names = list(filter_completion(windows_completion()))
+
+ for target in state.root_namespace.targets or []: # type: WindowsRemoteConfig
+ names.remove(target.name)
+
+ return NamespaceWrappedParser('name', PlatformParser(names))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return WindowsRemoteKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ content = '\n'.join([f' {name}' for name, item in filter_completion(windows_completion()).items()])
+
+ content += '\n'.join([
+ '',
+ ' windows/{version} # use an unknown windows version',
+ ])
+
+ state.sections['target remote systems (choose one):'] = content
+
+ return f'{{system}}[,{WindowsRemoteKeyValueParser().document(state)}]'
+
+
+class NetworkRemoteParser(PairParser):
+ """Composite argument parser for a network remote host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return NetworkRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ names = list(filter_completion(network_completion()))
+
+ for target in state.root_namespace.targets or []: # type: NetworkRemoteConfig
+ names.remove(target.name)
+
+ return NamespaceWrappedParser('name', PlatformParser(names))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return NetworkRemoteKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ content = '\n'.join([f' {name}' for name, item in filter_completion(network_completion()).items()])
+
+ content += '\n'.join([
+ '',
+ ' {platform}/{version} # use an unknown platform and version',
+ ])
+
+ state.sections['target remote systems (choose one):'] = content
+
+ return f'{{system}}[,{NetworkRemoteKeyValueParser().document(state)}]'
+
+
+class WindowsInventoryParser(PairParser):
+ """Composite argument parser for a Windows inventory."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return WindowsInventoryConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('path', FileParser())
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return EmptyKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{path} # INI format inventory file'
+
+
+class NetworkInventoryParser(PairParser):
+ """Composite argument parser for a network inventory."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return NetworkInventoryConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('path', FileParser())
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return EmptyKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{path} # INI format inventory file'
+
+
+class PosixSshParser(PairParser):
+ """Composite argument parser for a POSIX SSH host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return PosixSshConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return SshConnectionParser()
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return PosixSshKeyValueParser()
+
+ @property
+ def required(self) -> bool:
+ """True if the delimiter (and thus right parser) is required, otherwise False."""
+ return True
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return f'{SshConnectionParser().document(state)}[,{PosixSshKeyValueParser().document(state)}]'
diff --git a/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py
new file mode 100644
index 0000000..049b71e
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py
@@ -0,0 +1,239 @@
+"""Composite argument key-value parsers used by other parsers."""
+from __future__ import annotations
+
+import typing as t
+
+from ...constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ REMOTE_PROVIDERS,
+ SECCOMP_CHOICES,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...completion import (
+ AuditMode,
+ CGroupVersion,
+)
+
+from ...util import (
+ REMOTE_ARCHITECTURES,
+)
+
+from ...host_configs import (
+ OriginConfig,
+)
+
+from ...become import (
+ SUPPORTED_BECOME_METHODS,
+)
+
+from ..argparsing.parsers import (
+ AnyParser,
+ BooleanParser,
+ ChoicesParser,
+ DocumentationState,
+ EnumValueChoicesParser,
+ IntegerParser,
+ KeyValueParser,
+ Parser,
+ ParserState,
+)
+
+from .value_parsers import (
+ PythonParser,
+)
+
+from .helpers import (
+ get_controller_pythons,
+ get_remote_pythons,
+ get_docker_pythons,
+)
+
+
+class OriginKeyValueParser(KeyValueParser):
+ """Composite argument parser for origin key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ versions = CONTROLLER_PYTHON_VERSIONS
+
+ return dict(
+ python=PythonParser(versions=versions, allow_venv=True, allow_default=True),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=CONTROLLER_PYTHON_VERSIONS, allow_venv=True, allow_default=True)
+
+ section_name = 'origin options'
+
+ state.sections[f'controller {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}} # default'
+
+
+class ControllerKeyValueParser(KeyValueParser):
+ """Composite argument parser for controller key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ versions = get_controller_pythons(state.root_namespace.controller, False)
+ allow_default = bool(get_controller_pythons(state.root_namespace.controller, True))
+ allow_venv = isinstance(state.root_namespace.controller, OriginConfig) or not state.root_namespace.controller
+
+ return dict(
+ python=PythonParser(versions=versions, allow_venv=allow_venv, allow_default=allow_default),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'controller options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller',
+ f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller',
+ ])
+
+ return f'{{{section_name}}} # default'
+
+
+class DockerKeyValueParser(KeyValueParser):
+ """Composite argument parser for docker key/value pairs."""
+ def __init__(self, image: str, controller: bool) -> None:
+ self.controller = controller
+ self.versions = get_docker_pythons(image, controller, False)
+ self.allow_default = bool(get_docker_pythons(image, controller, True))
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default),
+ seccomp=ChoicesParser(SECCOMP_CHOICES),
+ cgroup=EnumValueChoicesParser(CGroupVersion),
+ audit=EnumValueChoicesParser(AuditMode),
+ privileged=BooleanParser(),
+ memory=IntegerParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default)
+
+ section_name = 'docker options'
+
+ state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ f' seccomp={ChoicesParser(SECCOMP_CHOICES).document(state)}',
+ f' cgroup={EnumValueChoicesParser(CGroupVersion).document(state)}',
+ f' audit={EnumValueChoicesParser(AuditMode).document(state)}',
+ f' privileged={BooleanParser().document(state)}',
+ f' memory={IntegerParser().document(state)} # bytes',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class PosixRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for POSIX remote key/value pairs."""
+ def __init__(self, name: str, controller: bool) -> None:
+ self.controller = controller
+ self.versions = get_remote_pythons(name, controller, False)
+ self.allow_default = bool(get_remote_pythons(name, controller, True))
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ become=ChoicesParser(list(SUPPORTED_BECOME_METHODS)),
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default)
+
+ section_name = 'remote options'
+
+ state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([
+ f' become={ChoicesParser(list(SUPPORTED_BECOME_METHODS)).document(state)}',
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class WindowsRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for Windows remote key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'remote options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class NetworkRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for network remote key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ collection=AnyParser(),
+ connection=AnyParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'remote options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ ' collection={collection}',
+ ' connection={connection}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class PosixSshKeyValueParser(KeyValueParser):
+ """Composite argument parser for POSIX SSH host key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ python=PythonParser(versions=list(SUPPORTED_PYTHON_VERSIONS), allow_venv=False, allow_default=False),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=False)
+
+ section_name = 'ssh options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class EmptyKeyValueParser(KeyValueParser):
+ """Composite argument parser when a key/value parser is required but there are no keys available."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return {}
diff --git a/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py
new file mode 100644
index 0000000..9453b76
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py
@@ -0,0 +1,179 @@
+"""Composite argument value parsers used by other parsers."""
+from __future__ import annotations
+
+import collections.abc as c
+import typing as t
+
+from ...host_configs import (
+ NativePythonConfig,
+ PythonConfig,
+ VirtualPythonConfig,
+)
+
+from ..argparsing.parsers import (
+ AbsolutePathParser,
+ AnyParser,
+ ChoicesParser,
+ DocumentationState,
+ IntegerParser,
+ MatchConditions,
+ Parser,
+ ParserError,
+ ParserState,
+ ParserBoundary,
+)
+
+
+class PythonParser(Parser):
+ """
+ Composite argument parser for Python versions, with support for specifying paths and using virtual environments.
+
+ Allowed formats:
+
+ {version}
+ venv/{version}
+ venv/system-site-packages/{version}
+
+ The `{version}` has two possible formats:
+
+ X.Y
+ X.Y@{path}
+
+ Where `X.Y` is the Python major and minor version number and `{path}` is an absolute path with one of the following formats:
+
+ /path/to/python
+ /path/to/python/directory/
+
+ When a trailing slash is present, it is considered a directory, and `python{version}` will be appended to it automatically.
+
+ The default path depends on the context:
+
+ - Known docker/remote environments can declare their own path.
+ - The origin host uses `sys.executable` if `{version}` matches the current version in `sys.version_info`.
+ - The origin host (as a controller or target) use the `$PATH` environment variable to find `python{version}`.
+ - As a fallback/default, the path `/usr/bin/python{version}` is used.
+
+ NOTE: The Python path determines where to find the Python interpreter.
+ In the case of an ansible-test managed virtual environment, that Python interpreter will be used to create the virtual environment.
+ So the path given will not be the one actually used for the controller or target.
+
+ Known docker/remote environments limit the available Python versions to configured values known to be valid.
+ The origin host and unknown environments assume all relevant Python versions are available.
+ """
+ def __init__(self,
+ versions: c.Sequence[str],
+ *,
+ allow_default: bool,
+ allow_venv: bool,
+ ):
+ version_choices = list(versions)
+
+ if allow_default:
+ version_choices.append('default')
+
+ first_choices = list(version_choices)
+
+ if allow_venv:
+ first_choices.append('venv/')
+
+ venv_choices = list(version_choices) + ['system-site-packages/']
+
+ self.versions = versions
+ self.allow_default = allow_default
+ self.allow_venv = allow_venv
+ self.version_choices = version_choices
+ self.first_choices = first_choices
+ self.venv_choices = venv_choices
+ self.venv_choices = venv_choices
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ boundary: ParserBoundary
+
+ with state.delimit('@/', required=False) as boundary:
+ version = ChoicesParser(self.first_choices).parse(state)
+
+ python: PythonConfig
+
+ if version == 'venv':
+ with state.delimit('@/', required=False) as boundary:
+ version = ChoicesParser(self.venv_choices).parse(state)
+
+ if version == 'system-site-packages':
+ system_site_packages = True
+
+ with state.delimit('@', required=False) as boundary:
+ version = ChoicesParser(self.version_choices).parse(state)
+ else:
+ system_site_packages = False
+
+ python = VirtualPythonConfig(version=version, system_site_packages=system_site_packages)
+ else:
+ python = NativePythonConfig(version=version)
+
+ if boundary.match == '@':
+ # FUTURE: For OriginConfig or ControllerConfig->OriginConfig the path could be validated with an absolute path parser (file or directory).
+ python.path = AbsolutePathParser().parse(state)
+
+ return python
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+
+ docs = '[venv/[system-site-packages/]]' if self.allow_venv else ''
+
+ if self.versions:
+ docs += '|'.join(self.version_choices)
+ else:
+ docs += '{X.Y}'
+
+ docs += '[@{path|dir/}]'
+
+ return docs
+
+
+class PlatformParser(ChoicesParser):
+ """Composite argument parser for "{platform}/{version}" formatted choices."""
+ def __init__(self, choices: list[str]) -> None:
+ super().__init__(choices, conditions=MatchConditions.CHOICE | MatchConditions.ANY)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+
+ if len(value.split('/')) != 2:
+ raise ParserError(f'invalid platform format: {value}')
+
+ return value
+
+
+class SshConnectionParser(Parser):
+ """
+ Composite argument parser for connecting to a host using SSH.
+ Format: user@host[:port]
+ """
+ EXPECTED_FORMAT = '{user}@{host}[:{port}]'
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+
+ with state.delimit('@'):
+ user = AnyParser(no_match_message=f'Expected {{user}} from: {self.EXPECTED_FORMAT}').parse(state)
+
+ setattr(namespace, 'user', user)
+
+ with state.delimit(':', required=False) as colon: # type: ParserBoundary
+ host = AnyParser(no_match_message=f'Expected {{host}} from: {self.EXPECTED_FORMAT}').parse(state)
+
+ setattr(namespace, 'host', host)
+
+ if colon.match:
+ port = IntegerParser(65535).parse(state)
+ setattr(namespace, 'port', port)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return self.EXPECTED_FORMAT