summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/cli')
-rw-r--r--test/lib/ansible_test/_internal/cli/__init__.py63
-rw-r--r--test/lib/ansible_test/_internal/cli/actions.py90
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/__init__.py265
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/actions.py18
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py124
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/parsers.py597
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/__init__.py241
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py85
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py28
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py48
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py49
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py48
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py76
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py49
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py65
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/combine.py49
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/erase.py36
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/html.py43
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/report.py61
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/coverage/xml.py43
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/env.py63
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/integration/__init__.py162
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/integration/network.py86
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/integration/posix.py51
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/integration/windows.py51
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/sanity.py119
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/shell.py59
-rw-r--r--test/lib/ansible_test/_internal/cli/commands/units.py65
-rw-r--r--test/lib/ansible_test/_internal/cli/compat.py505
-rw-r--r--test/lib/ansible_test/_internal/cli/completers.py30
-rw-r--r--test/lib/ansible_test/_internal/cli/converters.py19
-rw-r--r--test/lib/ansible_test/_internal/cli/environments.py612
-rw-r--r--test/lib/ansible_test/_internal/cli/epilog.py23
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/__init__.py303
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py73
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/helpers.py57
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py310
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py239
-rw-r--r--test/lib/ansible_test/_internal/cli/parsers/value_parsers.py179
39 files changed, 5084 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/__init__.py b/test/lib/ansible_test/_internal/cli/__init__.py
new file mode 100644
index 0000000..3171639
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/__init__.py
@@ -0,0 +1,63 @@
+"""Command line parsing."""
+from __future__ import annotations
+
+import argparse
+import os
+import sys
+import typing as t
+
+from .argparsing import (
+ CompositeActionCompletionFinder,
+)
+
+from .commands import (
+ do_commands,
+)
+
+from .epilog import (
+ get_epilog,
+)
+
+from .compat import (
+ HostSettings,
+ convert_legacy_args,
+)
+
+from ..util import (
+ get_ansible_version,
+)
+
+
+def parse_args(argv: t.Optional[list[str]] = None) -> argparse.Namespace:
+ """Parse command line arguments."""
+ completer = CompositeActionCompletionFinder()
+
+ parser = argparse.ArgumentParser(prog='ansible-test', epilog=get_epilog(completer), formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument('--version', action='version', version=f'%(prog)s version {get_ansible_version()}')
+
+ do_commands(parser, completer)
+
+ completer(
+ parser,
+ always_complete_options=False,
+ )
+
+ if argv is None:
+ argv = sys.argv[1:]
+ else:
+ argv = argv[1:]
+
+ args = parser.parse_args(argv)
+
+ if args.explain and not args.verbosity:
+ args.verbosity = 1
+
+ if args.no_environment:
+ pass
+ elif args.host_path:
+ args.host_settings = HostSettings.deserialize(os.path.join(args.host_path, 'settings.dat'))
+ else:
+ args.host_settings = convert_legacy_args(argv, args, args.target_mode)
+ args.host_settings.apply_defaults()
+
+ return args
diff --git a/test/lib/ansible_test/_internal/cli/actions.py b/test/lib/ansible_test/_internal/cli/actions.py
new file mode 100644
index 0000000..3359a84
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/actions.py
@@ -0,0 +1,90 @@
+"""Actions for handling composite arguments with argparse."""
+from __future__ import annotations
+
+from .argparsing import (
+ CompositeAction,
+ NamespaceParser,
+)
+
+from .parsers import (
+ DelegatedControllerParser,
+ NetworkSshTargetParser,
+ NetworkTargetParser,
+ OriginControllerParser,
+ PosixSshTargetParser,
+ PosixTargetParser,
+ SanityPythonTargetParser,
+ UnitsPythonTargetParser,
+ WindowsSshTargetParser,
+ WindowsTargetParser,
+)
+
+
+class OriginControllerAction(CompositeAction):
+ """Composite action parser for the controller when the only option is `origin`."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return OriginControllerParser()
+
+
+class DelegatedControllerAction(CompositeAction):
+ """Composite action parser for the controller when delegation is supported."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return DelegatedControllerParser()
+
+
+class PosixTargetAction(CompositeAction):
+ """Composite action parser for a POSIX target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return PosixTargetParser()
+
+
+class WindowsTargetAction(CompositeAction):
+ """Composite action parser for a Windows target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return WindowsTargetParser()
+
+
+class NetworkTargetAction(CompositeAction):
+ """Composite action parser for a network target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return NetworkTargetParser()
+
+
+class SanityPythonTargetAction(CompositeAction):
+ """Composite action parser for a sanity target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return SanityPythonTargetParser()
+
+
+class UnitsPythonTargetAction(CompositeAction):
+ """Composite action parser for a units target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return UnitsPythonTargetParser()
+
+
+class PosixSshTargetAction(CompositeAction):
+ """Composite action parser for a POSIX SSH target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return PosixSshTargetParser()
+
+
+class WindowsSshTargetAction(CompositeAction):
+ """Composite action parser for a Windows SSH target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return WindowsSshTargetParser()
+
+
+class NetworkSshTargetAction(CompositeAction):
+ """Composite action parser for a network SSH target."""
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+ return NetworkSshTargetParser()
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/__init__.py b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py
new file mode 100644
index 0000000..540cf55
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py
@@ -0,0 +1,265 @@
+"""Completion finder which brings together custom options and completion logic."""
+from __future__ import annotations
+
+import abc
+import argparse
+import os
+import re
+import typing as t
+
+from .argcompletion import (
+ OptionCompletionFinder,
+ get_comp_type,
+ register_safe_action,
+ warn,
+)
+
+from .parsers import (
+ Completion,
+ CompletionError,
+ CompletionSuccess,
+ CompletionUnavailable,
+ DocumentationState,
+ NamespaceParser,
+ Parser,
+ ParserError,
+ ParserMode,
+ ParserState,
+)
+
+
+class RegisteredCompletionFinder(OptionCompletionFinder):
+ """
+ Custom option completion finder for argcomplete which allows completion results to be registered.
+ These registered completions, if provided, are used to filter the final completion results.
+ This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221
+ """
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self.registered_completions: t.Optional[list[str]] = None
+
+ def completer(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ **kwargs,
+ ) -> list[str]:
+ """
+ Return a list of completions for the specified prefix and action.
+ Use this as the completer function for argcomplete.
+ """
+ kwargs.clear()
+ del kwargs
+
+ completions = self.get_completions(prefix, action, parsed_args)
+
+ if action.nargs and not isinstance(action.nargs, int):
+ # prevent argcomplete from including unrelated arguments in the completion results
+ self.registered_completions = completions
+
+ return completions
+
+ @abc.abstractmethod
+ def get_completions(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ ) -> list[str]:
+ """
+ Return a list of completions for the specified prefix and action.
+ Called by the complete function.
+ """
+
+ def quote_completions(self, completions, cword_prequote, last_wordbreak_pos):
+ """Modify completion results before returning them."""
+ if self.registered_completions is not None:
+ # If one of the completion handlers registered their results, only allow those exact results to be returned.
+ # This prevents argcomplete from adding results from other completers when they are known to be invalid.
+ allowed_completions = set(self.registered_completions)
+ completions = [completion for completion in completions if completion in allowed_completions]
+
+ return super().quote_completions(completions, cword_prequote, last_wordbreak_pos)
+
+
+class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
+ """Base class for actions that parse composite arguments."""
+ documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {}
+
+ def __init__(
+ self,
+ *args,
+ **kwargs,
+ ):
+ self.definition = self.create_parser()
+ self.documentation_state[type(self)] = documentation_state = DocumentationState()
+ self.definition.document(documentation_state)
+
+ kwargs.update(dest=self.definition.dest)
+
+ super().__init__(*args, **kwargs)
+
+ register_safe_action(type(self))
+
+ @abc.abstractmethod
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+
+ def __call__(
+ self,
+ parser,
+ namespace,
+ values,
+ option_string=None,
+ ):
+ state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values)
+
+ try:
+ self.definition.parse(state)
+ except ParserError as ex:
+ error = str(ex)
+ except CompletionError as ex:
+ error = ex.message
+ else:
+ return
+
+ if get_comp_type():
+ # FUTURE: It may be possible to enhance error handling by surfacing this error message during downstream completion.
+ return # ignore parse errors during completion to avoid breaking downstream completion
+
+ raise argparse.ArgumentError(self, error)
+
+
+class CompositeActionCompletionFinder(RegisteredCompletionFinder):
+ """Completion finder with support for composite argument parsing."""
+ def get_completions(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ ) -> list[str]:
+ """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
+ assert isinstance(action, CompositeAction)
+
+ state = ParserState(
+ mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE,
+ remainder=prefix,
+ namespaces=[parsed_args],
+ )
+
+ answer = complete(action.definition, state)
+
+ completions = []
+
+ if isinstance(answer, CompletionSuccess):
+ self.disable_completion_mangling = answer.preserve
+ completions = answer.completions
+
+ if isinstance(answer, CompletionError):
+ warn(answer.message)
+
+ return completions
+
+
+def detect_file_listing(value: str, mode: ParserMode) -> bool:
+ """
+ Return True if Bash will show a file listing and redraw the prompt, otherwise return False.
+
+ If there are no list results, a file listing will be shown if the value after the last `=` or `:` character:
+
+ - is empty
+ - matches a full path
+ - matches a partial path
+
+ Otherwise Bash will play the bell sound and display nothing.
+
+ see: https://github.com/kislyuk/argcomplete/issues/328
+ see: https://github.com/kislyuk/argcomplete/pull/284
+ """
+ listing = False
+
+ if mode == ParserMode.LIST:
+ right = re.split('[=:]', value)[-1]
+ listing = not right or os.path.exists(right)
+
+ if not listing:
+ directory = os.path.dirname(right)
+
+ # noinspection PyBroadException
+ try:
+ filenames = os.listdir(directory or '.')
+ except Exception: # pylint: disable=broad-except
+ pass
+ else:
+ listing = any(filename.startswith(right) for filename in filenames)
+
+ return listing
+
+
+def detect_false_file_completion(value: str, mode: ParserMode) -> bool:
+ """
+ Return True if Bash will provide an incorrect file completion, otherwise return False.
+
+ If there are no completion results, a filename will be automatically completed if the value after the last `=` or `:` character:
+
+ - matches exactly one partial path
+
+ Otherwise Bash will play the bell sound and display nothing.
+
+ see: https://github.com/kislyuk/argcomplete/issues/328
+ see: https://github.com/kislyuk/argcomplete/pull/284
+ """
+ completion = False
+
+ if mode == ParserMode.COMPLETE:
+ completion = True
+
+ right = re.split('[=:]', value)[-1]
+ directory, prefix = os.path.split(right)
+
+ # noinspection PyBroadException
+ try:
+ filenames = os.listdir(directory or '.')
+ except Exception: # pylint: disable=broad-except
+ pass
+ else:
+ matches = [filename for filename in filenames if filename.startswith(prefix)]
+ completion = len(matches) == 1
+
+ return completion
+
+
+def complete(
+ completer: Parser,
+ state: ParserState,
+) -> Completion:
+ """Perform argument completion using the given completer and return the completion result."""
+ value = state.remainder
+
+ answer: Completion
+
+ try:
+ completer.parse(state)
+ raise ParserError('completion expected')
+ except CompletionUnavailable as ex:
+ if detect_file_listing(value, state.mode):
+ # Displaying a warning before the file listing informs the user it is invalid. Bash will redraw the prompt after the list.
+ # If the file listing is not shown, a warning could be helpful, but would introduce noise on the terminal since the prompt is not redrawn.
+ answer = CompletionError(ex.message)
+ elif detect_false_file_completion(value, state.mode):
+ # When the current prefix provides no matches, but matches files a single file on disk, Bash will perform an incorrect completion.
+ # Returning multiple invalid matches instead of no matches will prevent Bash from using its own completion logic in this case.
+ answer = CompletionSuccess(
+ list_mode=True, # abuse list mode to enable preservation of the literal results
+ consumed='',
+ continuation='',
+ matches=['completion', 'invalid']
+ )
+ else:
+ answer = ex
+ except Completion as ex:
+ answer = ex
+
+ return answer
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/actions.py b/test/lib/ansible_test/_internal/cli/argparsing/actions.py
new file mode 100644
index 0000000..2bcf982
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/actions.py
@@ -0,0 +1,18 @@
+"""Actions for argparse."""
+from __future__ import annotations
+
+import argparse
+import enum
+import typing as t
+
+
+class EnumAction(argparse.Action):
+ """Parse an enum using the lowercase enum names."""
+ def __init__(self, **kwargs: t.Any) -> None:
+ self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None)
+ kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type))
+ super().__init__(**kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ value = self.enum_type[values.upper()]
+ setattr(namespace, self.dest, value)
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py
new file mode 100644
index 0000000..cf5776d
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py
@@ -0,0 +1,124 @@
+"""Wrapper around argcomplete providing bug fixes and additional features."""
+from __future__ import annotations
+
+import argparse
+import enum
+import os
+import typing as t
+
+
+class Substitute:
+ """Substitute for missing class which accepts all arguments."""
+ def __init__(self, *args, **kwargs) -> None:
+ pass
+
+
+try:
+ import argcomplete
+
+ from argcomplete import (
+ CompletionFinder,
+ default_validator,
+ )
+
+ warn = argcomplete.warn # pylint: disable=invalid-name
+except ImportError:
+ argcomplete = None
+
+ CompletionFinder = Substitute
+ default_validator = Substitute # pylint: disable=invalid-name
+ warn = Substitute # pylint: disable=invalid-name
+
+
+class CompType(enum.Enum):
+ """
+ Bash COMP_TYPE argument completion types.
+ For documentation, see: https://www.gnu.org/software/bash/manual/html_node/Bash-Variables.html#index-COMP_005fTYPE
+ """
+ COMPLETION = '\t'
+ """
+ Standard completion, typically triggered by a single tab.
+ """
+ MENU_COMPLETION = '%'
+ """
+ Menu completion, which cycles through each completion instead of showing a list.
+ For help using this feature, see: https://stackoverflow.com/questions/12044574/getting-complete-and-menu-complete-to-work-together
+ """
+ LIST = '?'
+ """
+ Standard list, typically triggered by a double tab.
+ """
+ LIST_AMBIGUOUS = '!'
+ """
+ Listing with `show-all-if-ambiguous` set.
+ For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dambiguous
+ For additional details, see: https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type
+ """
+ LIST_UNMODIFIED = '@'
+ """
+ Listing with `show-all-if-unmodified` set.
+ For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dunmodified
+ For additional details, see: : https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type
+ """
+
+ @property
+ def list_mode(self) -> bool:
+ """True if completion is running in list mode, otherwise False."""
+ return self in (CompType.LIST, CompType.LIST_AMBIGUOUS, CompType.LIST_UNMODIFIED)
+
+
+def register_safe_action(action_type: t.Type[argparse.Action]) -> None:
+ """Register the given action as a safe action for argcomplete to use during completion if it is not already registered."""
+ if argcomplete and action_type not in argcomplete.safe_actions:
+ argcomplete.safe_actions += (action_type,)
+
+
+def get_comp_type() -> t.Optional[CompType]:
+ """Parse the COMP_TYPE environment variable (if present) and return the associated CompType enum value."""
+ value = os.environ.get('COMP_TYPE')
+ comp_type = CompType(chr(int(value))) if value else None
+ return comp_type
+
+
+class OptionCompletionFinder(CompletionFinder):
+ """
+ Custom completion finder for argcomplete.
+ It provides support for running completion in list mode, which argcomplete natively handles the same as standard completion.
+ """
+ enabled = bool(argcomplete)
+
+ def __init__(self, *args, validator=None, **kwargs) -> None:
+ if validator:
+ raise ValueError()
+
+ self.comp_type = get_comp_type()
+ self.list_mode = self.comp_type.list_mode if self.comp_type else False
+ self.disable_completion_mangling = False
+
+ finder = self
+
+ def custom_validator(completion, prefix):
+ """Completion validator used to optionally bypass validation."""
+ if finder.disable_completion_mangling:
+ return True
+
+ return default_validator(completion, prefix)
+
+ super().__init__(
+ *args,
+ validator=custom_validator,
+ **kwargs,
+ )
+
+ def __call__(self, *args, **kwargs):
+ if self.enabled:
+ super().__call__(*args, **kwargs)
+
+ def quote_completions(self, completions, cword_prequote, last_wordbreak_pos):
+ """Intercept default quoting behavior to optionally block mangling of completion entries."""
+ if self.disable_completion_mangling:
+ # Word breaks have already been handled when generating completions, don't mangle them further.
+ # This is needed in many cases when returning completion lists which lack the existing completion prefix.
+ last_wordbreak_pos = None
+
+ return super().quote_completions(completions, cword_prequote, last_wordbreak_pos)
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/parsers.py b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py
new file mode 100644
index 0000000..d07e03c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py
@@ -0,0 +1,597 @@
+"""General purpose composite argument parsing and completion."""
+from __future__ import annotations
+
+import abc
+import collections.abc as c
+import contextlib
+import dataclasses
+import enum
+import os
+import re
+import typing as t
+
+# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior.
+#
+# Recommended characters for assignment and/or continuation: `/` `:` `=`
+#
+# The recommended assignment_character list is due to how argcomplete handles continuation characters.
+# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558
+
+PAIR_DELIMITER = ','
+ASSIGNMENT_DELIMITER = '='
+PATH_DELIMITER = '/'
+
+
+# This class was originally frozen. However, that causes issues when running under Python 3.11.
+# See: https://github.com/python/cpython/issues/99856
+@dataclasses.dataclass
+class Completion(Exception):
+ """Base class for argument completion results."""
+
+
+@dataclasses.dataclass
+class CompletionUnavailable(Completion):
+ """Argument completion unavailable."""
+ message: str = 'No completions available.'
+
+
+@dataclasses.dataclass
+class CompletionError(Completion):
+ """Argument completion error."""
+ message: t.Optional[str] = None
+
+
+@dataclasses.dataclass
+class CompletionSuccess(Completion):
+ """Successful argument completion result."""
+ list_mode: bool
+ consumed: str
+ continuation: str
+ matches: list[str] = dataclasses.field(default_factory=list)
+
+ @property
+ def preserve(self) -> bool:
+ """
+ True if argcomplete should not mangle completion values, otherwise False.
+ Only used when more than one completion exists to avoid overwriting the word undergoing completion.
+ """
+ return len(self.matches) > 1 and self.list_mode
+
+ @property
+ def completions(self) -> list[str]:
+ """List of completion values to return to argcomplete."""
+ completions = self.matches
+ continuation = '' if self.list_mode else self.continuation
+
+ if not self.preserve:
+ # include the existing prefix to avoid rewriting the word undergoing completion
+ completions = [f'{self.consumed}{completion}{continuation}' for completion in completions]
+
+ return completions
+
+
+class ParserMode(enum.Enum):
+ """Mode the parser is operating in."""
+ PARSE = enum.auto()
+ COMPLETE = enum.auto()
+ LIST = enum.auto()
+
+
+class ParserError(Exception):
+ """Base class for all parsing exceptions."""
+
+
+@dataclasses.dataclass
+class ParserBoundary:
+ """Boundary details for parsing composite input."""
+ delimiters: str
+ required: bool
+ match: t.Optional[str] = None
+ ready: bool = True
+
+
+@dataclasses.dataclass
+class ParserState:
+ """State of the composite argument parser."""
+ mode: ParserMode
+ remainder: str = ''
+ consumed: str = ''
+ boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list)
+ namespaces: list[t.Any] = dataclasses.field(default_factory=list)
+ parts: list[str] = dataclasses.field(default_factory=list)
+
+ @property
+ def incomplete(self) -> bool:
+ """True if parsing is incomplete (unparsed input remains), otherwise False."""
+ return self.remainder is not None
+
+ def match(self, value: str, choices: list[str]) -> bool:
+ """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False."""
+ if self.current_boundary:
+ delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match
+ else:
+ delimiters, delimiter = '', None
+
+ for choice in choices:
+ if choice.rstrip(delimiters) == choice:
+ # choice is not delimited
+ if value == choice:
+ return True # value matched
+ else:
+ # choice is delimited
+ if f'{value}{delimiter}' == choice:
+ return True # value and delimiter matched
+
+ return False
+
+ def read(self) -> str:
+ """Read and return the next input segment, taking into account parsing boundaries."""
+ delimiters = "".join(boundary.delimiters for boundary in self.boundaries)
+
+ if delimiters:
+ pattern = '([' + re.escape(delimiters) + '])'
+ regex = re.compile(pattern)
+ parts = regex.split(self.remainder, 1)
+ else:
+ parts = [self.remainder]
+
+ if len(parts) > 1:
+ value, delimiter, remainder = parts
+ else:
+ value, delimiter, remainder = parts[0], None, None
+
+ for boundary in reversed(self.boundaries):
+ if delimiter and delimiter in boundary.delimiters:
+ boundary.match = delimiter
+ self.consumed += value + delimiter
+ break
+
+ boundary.match = None
+ boundary.ready = False
+
+ if boundary.required:
+ break
+
+ self.remainder = remainder
+
+ return value
+
+ @property
+ def root_namespace(self) -> t.Any:
+ """THe root namespace."""
+ return self.namespaces[0]
+
+ @property
+ def current_namespace(self) -> t.Any:
+ """The current namespace."""
+ return self.namespaces[-1]
+
+ @property
+ def current_boundary(self) -> t.Optional[ParserBoundary]:
+ """The current parser boundary, if any, otherwise None."""
+ return self.boundaries[-1] if self.boundaries else None
+
+ def set_namespace(self, namespace: t.Any) -> None:
+ """Set the current namespace."""
+ self.namespaces.append(namespace)
+
+ @contextlib.contextmanager
+ def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]:
+ """Context manager for delimiting parsing of input."""
+ boundary = ParserBoundary(delimiters=delimiters, required=required)
+
+ self.boundaries.append(boundary)
+
+ try:
+ yield boundary
+ finally:
+ self.boundaries.pop()
+
+ if boundary.required and not boundary.match:
+ raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead')
+
+
+@dataclasses.dataclass
+class DocumentationState:
+ """State of the composite argument parser's generated documentation."""
+ sections: dict[str, str] = dataclasses.field(default_factory=dict)
+
+
+class Parser(metaclass=abc.ABCMeta):
+ """Base class for all composite argument parsers."""
+ @abc.abstractmethod
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ raise Exception(f'Undocumented parser: {type(self)}')
+
+
+class MatchConditions(enum.Flag):
+ """Acceptable condition(s) for matching user input to available choices."""
+ CHOICE = enum.auto()
+ """Match any choice."""
+ ANY = enum.auto()
+ """Match any non-empty string."""
+ NOTHING = enum.auto()
+ """Match an empty string which is not followed by a boundary match."""
+
+
+class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers which use a list of choices that can be generated during completion."""
+ def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.conditions = conditions
+
+ @abc.abstractmethod
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+
+ def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument
+ """Return an instance of CompletionUnavailable when no match was found for the given value."""
+ return CompletionUnavailable()
+
+ def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument
+ """Return an instance of ParserError when parsing fails and no choices are available."""
+ return ParserError('No choices available.')
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = state.read()
+ choices = self.get_choices(value)
+
+ if state.mode == ParserMode.PARSE or state.incomplete:
+ if self.conditions & MatchConditions.CHOICE and state.match(value, choices):
+ return value
+
+ if self.conditions & MatchConditions.ANY and value:
+ return value
+
+ if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match:
+ return value
+
+ if state.mode == ParserMode.PARSE:
+ if choices:
+ raise ParserError(f'"{value}" not in: {", ".join(choices)}')
+
+ raise self.no_choices_available(value)
+
+ raise CompletionUnavailable()
+
+ matches = [choice for choice in choices if choice.startswith(value)]
+
+ if not matches:
+ raise self.no_completion_match(value)
+
+ continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else ''
+
+ raise CompletionSuccess(
+ list_mode=state.mode == ParserMode.LIST,
+ consumed=state.consumed,
+ continuation=continuation,
+ matches=matches,
+ )
+
+
+class ChoicesParser(DynamicChoicesParser):
+ """Composite argument parser which relies on a static list of choices."""
+ def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.choices = choices
+
+ super().__init__(conditions=conditions)
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ return self.choices
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '|'.join(self.choices)
+
+
+class EnumValueChoicesParser(ChoicesParser):
+ """Composite argument parser which relies on a static list of choices derived from the values of an enum."""
+ def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.enum_type = enum_type
+
+ super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return self.enum_type(value)
+
+
+class IntegerParser(DynamicChoicesParser):
+ """Composite argument parser for integers."""
+ PATTERN = re.compile('^[1-9][0-9]*$')
+
+ def __init__(self, maximum: t.Optional[int] = None) -> None:
+ self.maximum = maximum
+
+ super().__init__()
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ if not value:
+ numbers = list(range(1, 10))
+ elif self.PATTERN.search(value):
+ int_prefix = int(value)
+ base = int_prefix * 10
+ numbers = [int_prefix] + [base + i for i in range(0, 10)]
+ else:
+ numbers = []
+
+ # NOTE: the minimum is currently fixed at 1
+
+ if self.maximum is not None:
+ numbers = [n for n in numbers if n <= self.maximum]
+
+ return [str(n) for n in numbers]
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return int(value)
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{integer}'
+
+
+class BooleanParser(ChoicesParser):
+ """Composite argument parser for boolean (yes/no) values."""
+ def __init__(self) -> None:
+ super().__init__(['yes', 'no'])
+
+ def parse(self, state: ParserState) -> bool:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return value == 'yes'
+
+
+class AnyParser(ChoicesParser):
+ """Composite argument parser which accepts any input value."""
+ def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
+ self.no_match_message = no_match_message
+
+ conditions = MatchConditions.ANY
+
+ if nothing:
+ conditions |= MatchConditions.NOTHING
+
+ super().__init__([], conditions=conditions)
+
+ def no_completion_match(self, value: str) -> CompletionUnavailable:
+ """Return an instance of CompletionUnavailable when no match was found for the given value."""
+ if self.no_match_message:
+ return CompletionUnavailable(message=self.no_match_message)
+
+ return super().no_completion_match(value)
+
+ def no_choices_available(self, value: str) -> ParserError:
+ """Return an instance of ParserError when parsing fails and no choices are available."""
+ if self.no_match_message:
+ return ParserError(self.no_match_message)
+
+ return super().no_choices_available(value)
+
+
+class RelativePathNameParser(DynamicChoicesParser):
+ """Composite argument parser for relative path names."""
+ RELATIVE_NAMES = ['.', '..']
+
+ def __init__(self, choices: list[str]) -> None:
+ self.choices = choices
+
+ super().__init__()
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ choices = list(self.choices)
+
+ if value in self.RELATIVE_NAMES:
+ # complete relative names, but avoid suggesting them unless the current name is relative
+ # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../")
+ choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES)
+
+ return choices
+
+
+class FileParser(Parser):
+ """Composite argument parser for absolute or relative file paths."""
+ def parse(self, state: ParserState) -> str:
+ """Parse the input from the given state and return the result."""
+ if state.mode == ParserMode.PARSE:
+ path = AnyParser().parse(state)
+
+ if not os.path.isfile(path):
+ raise ParserError(f'Not a file: {path}')
+ else:
+ path = ''
+
+ with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
+ while boundary.ready:
+ directory = path or '.'
+
+ try:
+ with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry]
+ choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan]
+ except OSError:
+ choices = []
+
+ if not path:
+ choices.append(PATH_DELIMITER) # allow absolute paths
+ choices.append('../') # suggest relative paths
+
+ part = RelativePathNameParser(choices).parse(state)
+ path += f'{part}{boundary.match or ""}'
+
+ return path
+
+
+class AbsolutePathParser(Parser):
+ """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ path = ''
+
+ with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
+ while boundary.ready:
+ if path:
+ path += AnyParser(nothing=True).parse(state)
+ else:
+ path += ChoicesParser([PATH_DELIMITER]).parse(state)
+
+ path += (boundary.match or '')
+
+ return path
+
+
+class NamespaceParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers that store their results in a namespace."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+ current = getattr(namespace, self.dest)
+
+ if current and self.limit_one:
+ if state.mode == ParserMode.PARSE:
+ raise ParserError('Option cannot be specified more than once.')
+
+ raise CompletionError('Option cannot be specified more than once.')
+
+ value = self.get_value(state)
+
+ if self.use_list:
+ if not current:
+ current = []
+ setattr(namespace, self.dest, current)
+
+ current.append(value)
+ else:
+ setattr(namespace, self.dest, value)
+
+ return value
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ return super().parse(state)
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return not self.use_list
+
+ @property
+ @abc.abstractmethod
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+
+
+class NamespaceWrappedParser(NamespaceParser):
+ """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""
+ def __init__(self, dest: str, parser: Parser) -> None:
+ self._dest = dest
+ self.parser = parser
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ return self.parser.parse(state)
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return self._dest
+
+
+class KeyValueParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for key/value composite argument parsers."""
+ @abc.abstractmethod
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+ parsers = self.get_parsers(state)
+ keys = list(parsers)
+
+ with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary
+ while pair.ready:
+ with state.delimit(ASSIGNMENT_DELIMITER):
+ key = ChoicesParser(keys).parse(state)
+
+ value = parsers[key].parse(state)
+
+ setattr(namespace, key, value)
+
+ keys.remove(key)
+
+ return namespace
+
+
+class PairParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = self.create_namespace()
+
+ state.set_namespace(namespace)
+
+ with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary
+ choice = self.get_left_parser(state).parse(state)
+
+ if boundary.match:
+ self.get_right_parser(choice).parse(state)
+
+ return namespace
+
+ @property
+ def required(self) -> bool:
+ """True if the delimiter (and thus right parser) is required, otherwise False."""
+ return False
+
+ @property
+ def delimiter(self) -> str:
+ """The delimiter to use between the left and right parser."""
+ return PAIR_DELIMITER
+
+ @abc.abstractmethod
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+
+ @abc.abstractmethod
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+
+ @abc.abstractmethod
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+
+
+class TypeParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument
+ """Return a dictionary of type names and type parsers."""
+ return self.get_stateless_parsers()
+
+ @abc.abstractmethod
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ parsers = self.get_parsers(state)
+
+ with state.delimit(':'):
+ key = ChoicesParser(list(parsers)).parse(state)
+
+ value = parsers[key].parse(state)
+
+ return value
diff --git a/test/lib/ansible_test/_internal/cli/commands/__init__.py b/test/lib/ansible_test/_internal/cli/commands/__init__.py
new file mode 100644
index 0000000..2eb14ab
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/__init__.py
@@ -0,0 +1,241 @@
+"""Command line parsing for all commands."""
+from __future__ import annotations
+
+import argparse
+import functools
+import sys
+
+from ...util import (
+ display,
+)
+
+from ..completers import (
+ complete_target,
+ register_completer,
+)
+
+from ..environments import (
+ CompositeActionCompletionFinder,
+)
+
+from .coverage import (
+ do_coverage,
+)
+
+from .env import (
+ do_env,
+)
+
+from .integration import (
+ do_integration,
+)
+
+from .sanity import (
+ do_sanity,
+)
+
+from .shell import (
+ do_shell,
+)
+
+from .units import (
+ do_units,
+)
+
+
+def do_commands(
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for all commands."""
+ common = argparse.ArgumentParser(add_help=False)
+
+ common.add_argument(
+ '-e',
+ '--explain',
+ action='store_true',
+ help='explain commands that would be executed',
+ )
+
+ common.add_argument(
+ '-v',
+ '--verbose',
+ dest='verbosity',
+ action='count',
+ default=0,
+ help='display more output',
+ )
+
+ common.add_argument(
+ '--color',
+ metavar='COLOR',
+ nargs='?',
+ help='generate color output: yes, no, auto',
+ const='yes',
+ default='auto',
+ type=color,
+ )
+
+ common.add_argument(
+ '--debug',
+ action='store_true',
+ help='run ansible commands in debug mode',
+ )
+
+ common.add_argument(
+ '--truncate',
+ dest='truncate',
+ metavar='COLUMNS',
+ type=int,
+ default=display.columns,
+ help='truncate some long output (0=disabled) (default: auto)',
+ )
+
+ common.add_argument(
+ '--redact',
+ dest='redact',
+ action='store_true',
+ default=True,
+ help=argparse.SUPPRESS, # kept for backwards compatibility, but no point in advertising since it's the default
+ )
+
+ common.add_argument(
+ '--no-redact',
+ dest='redact',
+ action='store_false',
+ default=False,
+ help='show sensitive values in output',
+ )
+
+ test = argparse.ArgumentParser(add_help=False, parents=[common])
+
+ testing = test.add_argument_group(title='common testing arguments')
+
+ register_completer(testing.add_argument(
+ 'include',
+ metavar='TARGET',
+ nargs='*',
+ help='test the specified target',
+ ), functools.partial(complete_target, completer))
+
+ register_completer(testing.add_argument(
+ '--include',
+ metavar='TARGET',
+ action='append',
+ help='include the specified target',
+ ), functools.partial(complete_target, completer))
+
+ register_completer(testing.add_argument(
+ '--exclude',
+ metavar='TARGET',
+ action='append',
+ help='exclude the specified target',
+ ), functools.partial(complete_target, completer))
+
+ register_completer(testing.add_argument(
+ '--require',
+ metavar='TARGET',
+ action='append',
+ help='require the specified target',
+ ), functools.partial(complete_target, completer))
+
+ testing.add_argument(
+ '--coverage',
+ action='store_true',
+ help='analyze code coverage when running tests',
+ )
+
+ testing.add_argument(
+ '--coverage-check',
+ action='store_true',
+ help='only verify code coverage can be enabled',
+ )
+
+ testing.add_argument(
+ '--metadata',
+ help=argparse.SUPPRESS,
+ )
+
+ testing.add_argument(
+ '--base-branch',
+ metavar='BRANCH',
+ help='base branch used for change detection',
+ )
+
+ testing.add_argument(
+ '--changed',
+ action='store_true',
+ help='limit targets based on changes',
+ )
+
+ changes = test.add_argument_group(title='change detection arguments')
+
+ changes.add_argument(
+ '--tracked',
+ action='store_true',
+ help=argparse.SUPPRESS,
+ )
+
+ changes.add_argument(
+ '--untracked',
+ action='store_true',
+ help='include untracked files',
+ )
+
+ changes.add_argument(
+ '--ignore-committed',
+ dest='committed',
+ action='store_false',
+ help='exclude committed files',
+ )
+
+ changes.add_argument(
+ '--ignore-staged',
+ dest='staged',
+ action='store_false',
+ help='exclude staged files',
+ )
+
+ changes.add_argument(
+ '--ignore-unstaged',
+ dest='unstaged',
+ action='store_false',
+ help='exclude unstaged files',
+ )
+
+ changes.add_argument(
+ '--changed-from',
+ metavar='PATH',
+ help=argparse.SUPPRESS,
+ )
+
+ changes.add_argument(
+ '--changed-path',
+ metavar='PATH',
+ action='append',
+ help=argparse.SUPPRESS,
+ )
+
+ subparsers = parent.add_subparsers(metavar='COMMAND', required=True)
+
+ do_coverage(subparsers, common, completer)
+ do_env(subparsers, common, completer)
+ do_shell(subparsers, common, completer)
+
+ do_integration(subparsers, test, completer)
+ do_sanity(subparsers, test, completer)
+ do_units(subparsers, test, completer)
+
+
+def color(value: str) -> bool:
+ """Strict converter for color option."""
+ if value == 'yes':
+ return True
+
+ if value == 'no':
+ return False
+
+ if value == 'auto':
+ return sys.stdout.isatty()
+
+ raise argparse.ArgumentTypeError(f"invalid choice: '{value}' (choose from 'yes', 'no', 'auto')")
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py
new file mode 100644
index 0000000..28e6770
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py
@@ -0,0 +1,85 @@
+"""Command line parsing for all `coverage` commands."""
+from __future__ import annotations
+
+import argparse
+
+from ....commands.coverage import (
+ COVERAGE_GROUPS,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+)
+
+from .analyze import (
+ do_analyze,
+)
+
+from .combine import (
+ do_combine,
+)
+
+from .erase import (
+ do_erase,
+)
+
+from .html import (
+ do_html,
+)
+
+from .report import (
+ do_report,
+)
+
+from .xml import (
+ do_xml,
+)
+
+
+def do_coverage(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for all `coverage` commands."""
+ coverage_common = argparse.ArgumentParser(add_help=False, parents=[parent])
+
+ parser = subparsers.add_parser(
+ 'coverage',
+ help='code coverage management and reporting',
+ )
+
+ coverage_subparsers = parser.add_subparsers(metavar='COMMAND', required=True)
+
+ do_analyze(coverage_subparsers, coverage_common, completer)
+ do_erase(coverage_subparsers, coverage_common, completer)
+
+ do_combine(coverage_subparsers, parent, add_coverage_common, completer)
+ do_report(coverage_subparsers, parent, add_coverage_common, completer)
+ do_html(coverage_subparsers, parent, add_coverage_common, completer)
+ do_xml(coverage_subparsers, parent, add_coverage_common, completer)
+
+
+def add_coverage_common(
+ parser: argparse.ArgumentParser,
+):
+ """Add common coverage arguments."""
+ parser.add_argument(
+ '--group-by',
+ metavar='GROUP',
+ action='append',
+ choices=COVERAGE_GROUPS,
+ help='group output by: %s' % ', '.join(COVERAGE_GROUPS),
+ )
+
+ parser.add_argument(
+ '--all',
+ action='store_true',
+ help='include all python/powershell source files',
+ )
+
+ parser.add_argument(
+ '--stub',
+ action='store_true',
+ help='generate empty report of all python/powershell source files',
+ )
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py
new file mode 100644
index 0000000..05fbd23
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py
@@ -0,0 +1,28 @@
+"""Command line parsing for all `coverage analyze` commands."""
+from __future__ import annotations
+
+import argparse
+
+from .targets import (
+ do_targets,
+)
+
+from ....environments import (
+ CompositeActionCompletionFinder,
+)
+
+
+def do_analyze(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for all `coverage analyze` commands."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'analyze',
+ help='analyze collected coverage data',
+ )
+
+ analyze_subparsers = parser.add_subparsers(metavar='COMMAND', required=True)
+
+ do_targets(analyze_subparsers, parent, completer)
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py
new file mode 100644
index 0000000..7b6ea3e
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py
@@ -0,0 +1,48 @@
+"""Command line parsing for all `coverage analyze targets` commands."""
+from __future__ import annotations
+
+import argparse
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+)
+
+from .combine import (
+ do_combine,
+)
+
+from .expand import (
+ do_expand,
+)
+
+from .filter import (
+ do_filter,
+)
+
+from .generate import (
+ do_generate,
+)
+
+from .missing import (
+ do_missing,
+)
+
+
+def do_targets(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for all `coverage analyze targets` commands."""
+ targets = subparsers.add_parser(
+ 'targets',
+ help='analyze integration test target coverage',
+ )
+
+ targets_subparsers = targets.add_subparsers(metavar='COMMAND', required=True)
+
+ do_generate(targets_subparsers, parent, completer)
+ do_expand(targets_subparsers, parent, completer)
+ do_filter(targets_subparsers, parent, completer)
+ do_combine(targets_subparsers, parent, completer)
+ do_missing(targets_subparsers, parent, completer)
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py
new file mode 100644
index 0000000..7fa49bf
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py
@@ -0,0 +1,49 @@
+"""Command line parsing for the `coverage analyze targets combine` command."""
+from __future__ import annotations
+
+import argparse
+
+from ......commands.coverage.analyze.targets.combine import (
+ command_coverage_analyze_targets_combine,
+ CoverageAnalyzeTargetsCombineConfig,
+)
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_combine(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `coverage analyze targets combine` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'combine',
+ parents=[parent],
+ help='combine multiple aggregated coverage files',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_analyze_targets_combine,
+ config=CoverageAnalyzeTargetsCombineConfig,
+ )
+
+ targets_combine = parser.add_argument_group('coverage arguments')
+
+ targets_combine.add_argument(
+ 'input_file',
+ nargs='+',
+ help='input file to read aggregated coverage from',
+ )
+
+ targets_combine.add_argument(
+ 'output_file',
+ help='output file to write aggregated coverage to',
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets combine
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py
new file mode 100644
index 0000000..f5f020f
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py
@@ -0,0 +1,48 @@
+"""Command line parsing for the `coverage analyze targets expand` command."""
+from __future__ import annotations
+
+import argparse
+
+from ......commands.coverage.analyze.targets.expand import (
+ command_coverage_analyze_targets_expand,
+ CoverageAnalyzeTargetsExpandConfig,
+)
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_expand(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `coverage analyze targets expand` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'expand',
+ parents=[parent],
+ help='expand target names from integers in aggregated coverage',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_analyze_targets_expand,
+ config=CoverageAnalyzeTargetsExpandConfig,
+ )
+
+ targets_expand = parser.add_argument_group(title='coverage arguments')
+
+ targets_expand.add_argument(
+ 'input_file',
+ help='input file to read aggregated coverage from',
+ )
+
+ targets_expand.add_argument(
+ 'output_file',
+ help='output file to write expanded coverage to',
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets expand
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py
new file mode 100644
index 0000000..afcb828
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py
@@ -0,0 +1,76 @@
+"""Command line parsing for the `coverage analyze targets filter` command."""
+from __future__ import annotations
+
+import argparse
+
+from ......commands.coverage.analyze.targets.filter import (
+ command_coverage_analyze_targets_filter,
+ CoverageAnalyzeTargetsFilterConfig,
+)
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_filter(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `coverage analyze targets filter` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'filter',
+ parents=[parent],
+ help='filter aggregated coverage data',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_analyze_targets_filter,
+ config=CoverageAnalyzeTargetsFilterConfig,
+ )
+
+ targets_filter = parser.add_argument_group(title='coverage arguments')
+
+ targets_filter.add_argument(
+ 'input_file',
+ help='input file to read aggregated coverage from',
+ )
+
+ targets_filter.add_argument(
+ 'output_file',
+ help='output file to write expanded coverage to',
+ )
+
+ targets_filter.add_argument(
+ '--include-target',
+ metavar='TGT',
+ dest='include_targets',
+ action='append',
+ help='include the specified targets',
+ )
+
+ targets_filter.add_argument(
+ '--exclude-target',
+ metavar='TGT',
+ dest='exclude_targets',
+ action='append',
+ help='exclude the specified targets',
+ )
+
+ targets_filter.add_argument(
+ '--include-path',
+ metavar='REGEX',
+ help='include paths matching the given regex',
+ )
+
+ targets_filter.add_argument(
+ '--exclude-path',
+ metavar='REGEX',
+ help='exclude paths matching the given regex',
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets filter
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py
new file mode 100644
index 0000000..0d13933
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py
@@ -0,0 +1,49 @@
+"""Command line parsing for the `coverage analyze targets generate` command."""
+from __future__ import annotations
+
+import argparse
+
+from ......commands.coverage.analyze.targets.generate import (
+ command_coverage_analyze_targets_generate,
+ CoverageAnalyzeTargetsGenerateConfig,
+)
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_generate(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `coverage analyze targets generate` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'generate',
+ parents=[parent],
+ help='aggregate coverage by integration test target',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_analyze_targets_generate,
+ config=CoverageAnalyzeTargetsGenerateConfig,
+ )
+
+ targets_generate = parser.add_argument_group(title='coverage arguments')
+
+ targets_generate.add_argument(
+ 'input_dir',
+ nargs='?',
+ help='directory to read coverage from',
+ )
+
+ targets_generate.add_argument(
+ 'output_file',
+ help='output file for aggregated coverage',
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets generate
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py
new file mode 100644
index 0000000..8af236f
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py
@@ -0,0 +1,65 @@
+"""Command line parsing for the `coverage analyze targets missing` command."""
+from __future__ import annotations
+
+import argparse
+
+from ......commands.coverage.analyze.targets.missing import (
+ command_coverage_analyze_targets_missing,
+ CoverageAnalyzeTargetsMissingConfig,
+)
+
+from .....environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_missing(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `coverage analyze targets missing` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'missing',
+ parents=[parent],
+ help='identify coverage in one file missing in another',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_analyze_targets_missing,
+ config=CoverageAnalyzeTargetsMissingConfig,
+ )
+
+ targets_missing = parser.add_argument_group(title='coverage arguments')
+
+ targets_missing.add_argument(
+ 'from_file',
+ help='input file containing aggregated coverage',
+ )
+
+ targets_missing.add_argument(
+ 'to_file',
+ help='input file containing aggregated coverage',
+ )
+
+ targets_missing.add_argument(
+ 'output_file',
+ help='output file to write aggregated coverage to',
+ )
+
+ targets_missing.add_argument(
+ '--only-gaps',
+ action='store_true',
+ help='report only arcs/lines not hit by any target',
+ )
+
+ targets_missing.add_argument(
+ '--only-exists',
+ action='store_true',
+ help='limit results to files that exist',
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets missing
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py b/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py
new file mode 100644
index 0000000..9b6d34a
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py
@@ -0,0 +1,49 @@
+"""Command line parsing for the `coverage combine` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.coverage.combine import (
+ command_coverage_combine,
+ CoverageCombineConfig,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_combine(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for the `coverage combine` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'combine',
+ parents=[parent],
+ help='combine coverage data and rewrite remote paths',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_combine,
+ config=CoverageCombineConfig,
+ )
+
+ coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments'))
+
+ add_coverage_common(coverage_combine)
+
+ coverage_combine.add_argument(
+ '--export',
+ metavar='DIR',
+ help='directory to export combined coverage files to',
+ )
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage combine
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py b/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py
new file mode 100644
index 0000000..ef356f0
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py
@@ -0,0 +1,36 @@
+"""Command line parsing for the `coverage erase` command."""
+from __future__ import annotations
+
+import argparse
+
+from ....commands.coverage.erase import (
+ command_coverage_erase,
+ CoverageEraseConfig,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_erase(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for the `coverage erase` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'erase',
+ parents=[parent],
+ help='erase coverage data files',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_erase,
+ config=CoverageEraseConfig,
+ )
+
+ add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage erase
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/html.py b/test/lib/ansible_test/_internal/cli/commands/coverage/html.py
new file mode 100644
index 0000000..5f719de
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/html.py
@@ -0,0 +1,43 @@
+"""Command line parsing for the `coverage html` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.coverage.html import (
+ command_coverage_html,
+ CoverageHtmlConfig,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_html(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for the `coverage html` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'html',
+ parents=[parent],
+ help='generate html coverage report',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_html,
+ config=CoverageHtmlConfig,
+ )
+
+ coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments'))
+
+ add_coverage_common(coverage_combine)
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage html
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/report.py b/test/lib/ansible_test/_internal/cli/commands/coverage/report.py
new file mode 100644
index 0000000..e6a6e80
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/report.py
@@ -0,0 +1,61 @@
+"""Command line parsing for the `coverage report` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.coverage.report import (
+ command_coverage_report,
+ CoverageReportConfig,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_report(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for the `coverage report` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'report',
+ parents=[parent],
+ help='generate console coverage report',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_report,
+ config=CoverageReportConfig,
+ )
+
+ coverage_report = t.cast(argparse.ArgumentParser, parser.add_argument_group('coverage arguments'))
+
+ add_coverage_common(coverage_report)
+
+ coverage_report.add_argument(
+ '--show-missing',
+ action='store_true',
+ help='show line numbers of statements not executed',
+ )
+
+ coverage_report.add_argument(
+ '--include',
+ metavar='PAT[,...]',
+ help='only include paths that match a pattern (accepts quoted shell wildcards)',
+ )
+
+ coverage_report.add_argument(
+ '--omit',
+ metavar='PAT[,...]',
+ help='omit paths that match a pattern (accepts quoted shell wildcards)',
+ )
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage report
diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py b/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py
new file mode 100644
index 0000000..e7b03ca
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py
@@ -0,0 +1,43 @@
+"""Command line parsing for the `coverage xml` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.coverage.xml import (
+ command_coverage_xml,
+ CoverageXmlConfig,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_xml(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+) -> None:
+ """Command line parsing for the `coverage xml` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'xml',
+ parents=[parent],
+ help='generate xml coverage report',
+ )
+
+ parser.set_defaults(
+ func=command_coverage_xml,
+ config=CoverageXmlConfig,
+ )
+
+ coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments'))
+
+ add_coverage_common(coverage_combine)
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage xml
diff --git a/test/lib/ansible_test/_internal/cli/commands/env.py b/test/lib/ansible_test/_internal/cli/commands/env.py
new file mode 100644
index 0000000..0cd2114
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/env.py
@@ -0,0 +1,63 @@
+"""Command line parsing for the `env` command."""
+from __future__ import annotations
+
+import argparse
+
+from ...commands.env import (
+ EnvConfig,
+ command_env,
+)
+
+from ..environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_env(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `env` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'env',
+ parents=[parent],
+ help='show information about the test environment',
+ )
+
+ parser.set_defaults(
+ func=command_env,
+ config=EnvConfig,
+ )
+
+ env = parser.add_argument_group(title='env arguments')
+
+ env.add_argument(
+ '--show',
+ action='store_true',
+ help='show environment on stdout',
+ )
+
+ env.add_argument(
+ '--dump',
+ action='store_true',
+ help='dump environment to disk',
+ )
+
+ env.add_argument(
+ '--list-files',
+ action='store_true',
+ help='list files on stdout',
+ )
+
+ env.add_argument(
+ '--timeout',
+ type=int,
+ metavar='MINUTES',
+ help='timeout for future ansible-test commands (0 clears)',
+ )
+
+ add_environments(parser, completer, ControllerMode.NO_DELEGATION, TargetMode.NO_TARGETS) # env
diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py b/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py
new file mode 100644
index 0000000..dfdefb1
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py
@@ -0,0 +1,162 @@
+"""Command line parsing for all integration commands."""
+from __future__ import annotations
+
+import argparse
+
+from ...completers import (
+ complete_target,
+ register_completer,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+)
+
+from .network import (
+ do_network_integration,
+)
+
+from .posix import (
+ do_posix_integration,
+)
+
+from .windows import (
+ do_windows_integration,
+)
+
+
+def do_integration(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for all integration commands."""
+ parser = argparse.ArgumentParser(
+ add_help=False,
+ parents=[parent],
+ )
+
+ do_posix_integration(subparsers, parser, add_integration_common, completer)
+ do_network_integration(subparsers, parser, add_integration_common, completer)
+ do_windows_integration(subparsers, parser, add_integration_common, completer)
+
+
+def add_integration_common(
+ parser: argparse.ArgumentParser,
+):
+ """Add common integration arguments."""
+ register_completer(parser.add_argument(
+ '--start-at',
+ metavar='TARGET',
+ help='start at the specified target',
+ ), complete_target)
+
+ parser.add_argument(
+ '--start-at-task',
+ metavar='TASK',
+ help='start at the specified task',
+ )
+
+ parser.add_argument(
+ '--tags',
+ metavar='TAGS',
+ help='only run plays and tasks tagged with these values',
+ )
+
+ parser.add_argument(
+ '--skip-tags',
+ metavar='TAGS',
+ help='only run plays and tasks whose tags do not match these values',
+ )
+
+ parser.add_argument(
+ '--diff',
+ action='store_true',
+ help='show diff output',
+ )
+
+ parser.add_argument(
+ '--allow-destructive',
+ action='store_true',
+ help='allow destructive tests',
+ )
+
+ parser.add_argument(
+ '--allow-root',
+ action='store_true',
+ help='allow tests requiring root when not root',
+ )
+
+ parser.add_argument(
+ '--allow-disabled',
+ action='store_true',
+ help='allow tests which have been marked as disabled',
+ )
+
+ parser.add_argument(
+ '--allow-unstable',
+ action='store_true',
+ help='allow tests which have been marked as unstable',
+ )
+
+ parser.add_argument(
+ '--allow-unstable-changed',
+ action='store_true',
+ help='allow tests which have been marked as unstable when focused changes are detected',
+ )
+
+ parser.add_argument(
+ '--allow-unsupported',
+ action='store_true',
+ help='allow tests which have been marked as unsupported',
+ )
+
+ parser.add_argument(
+ '--retry-on-error',
+ action='store_true',
+ help='retry failed test with increased verbosity',
+ )
+
+ parser.add_argument(
+ '--continue-on-error',
+ action='store_true',
+ help='continue after failed test',
+ )
+
+ parser.add_argument(
+ '--debug-strategy',
+ action='store_true',
+ help='run test playbooks using the debug strategy',
+ )
+
+ parser.add_argument(
+ '--changed-all-target',
+ metavar='TARGET',
+ default='all',
+ help='target to run when all tests are needed',
+ )
+
+ parser.add_argument(
+ '--changed-all-mode',
+ metavar='MODE',
+ choices=('default', 'include', 'exclude'),
+ help='include/exclude behavior with --changed-all-target: %(choices)s',
+ )
+
+ parser.add_argument(
+ '--list-targets',
+ action='store_true',
+ help='list matching targets instead of running tests',
+ )
+
+ parser.add_argument(
+ '--no-temp-workdir',
+ action='store_true',
+ help='do not run tests from a temporary directory (use only for verifying broken tests)',
+ )
+
+ parser.add_argument(
+ '--no-temp-unicode',
+ action='store_true',
+ help='avoid unicode characters in temporary directory (use only for verifying broken tests)',
+ )
diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/network.py b/test/lib/ansible_test/_internal/cli/commands/integration/network.py
new file mode 100644
index 0000000..a05985b
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/integration/network.py
@@ -0,0 +1,86 @@
+"""Command line parsing for the `network-integration` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import os
+import typing as t
+
+from ....commands.integration.network import (
+ command_network_integration,
+)
+
+from ....config import (
+ NetworkIntegrationConfig,
+)
+
+from ....target import (
+ walk_network_integration_targets,
+)
+
+from ....data import (
+ data_context,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+from ...completers import (
+ register_completer,
+)
+
+
+def do_network_integration(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_integration_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `network-integration` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'network-integration',
+ parents=[parent],
+ help='network integration tests',
+ )
+
+ parser.set_defaults(
+ func=command_network_integration,
+ targets_func=walk_network_integration_targets,
+ config=NetworkIntegrationConfig)
+
+ network_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='network integration test arguments'))
+
+ add_integration_common(network_integration)
+
+ register_completer(network_integration.add_argument(
+ '--testcase',
+ metavar='TESTCASE',
+ help='limit a test to a specified testcase',
+ ), complete_network_testcase)
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NETWORK_INTEGRATION) # network-integration
+
+
+def complete_network_testcase(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Return a list of test cases matching the given prefix if only one target was parsed from the command line, otherwise return an empty list."""
+ testcases = []
+
+ # since testcases are module specific, don't autocomplete if more than one
+ # module is specified
+ if len(parsed_args.include) != 1:
+ return []
+
+ target = parsed_args.include[0]
+ test_dir = os.path.join(data_context().content.integration_targets_path, target, 'tests')
+ connection_dirs = data_context().content.get_dirs(test_dir)
+
+ for connection_dir in connection_dirs:
+ for testcase in [os.path.basename(path) for path in data_context().content.get_files(connection_dir)]:
+ if testcase.startswith(prefix):
+ testcases.append(testcase.split('.', 1)[0])
+
+ return testcases
diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/posix.py b/test/lib/ansible_test/_internal/cli/commands/integration/posix.py
new file mode 100644
index 0000000..78d6165
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/integration/posix.py
@@ -0,0 +1,51 @@
+"""Command line parsing for the `integration` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.integration.posix import (
+ command_posix_integration,
+)
+
+from ....config import (
+ PosixIntegrationConfig,
+)
+
+from ....target import (
+ walk_posix_integration_targets,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_posix_integration(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_integration_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `integration` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'integration',
+ parents=[parent],
+ help='posix integration tests',
+ )
+
+ parser.set_defaults(
+ func=command_posix_integration,
+ targets_func=walk_posix_integration_targets,
+ config=PosixIntegrationConfig,
+ )
+
+ posix_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='integration test arguments'))
+
+ add_integration_common(posix_integration)
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.POSIX_INTEGRATION) # integration
diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/windows.py b/test/lib/ansible_test/_internal/cli/commands/integration/windows.py
new file mode 100644
index 0000000..ab022e3
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/integration/windows.py
@@ -0,0 +1,51 @@
+"""Command line parsing for the `windows-integration` command."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import typing as t
+
+from ....commands.integration.windows import (
+ command_windows_integration,
+)
+
+from ....config import (
+ WindowsIntegrationConfig,
+)
+
+from ....target import (
+ walk_windows_integration_targets,
+)
+
+from ...environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_windows_integration(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ add_integration_common: c.Callable[[argparse.ArgumentParser], None],
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `windows-integration` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'windows-integration',
+ parents=[parent],
+ help='windows integration tests',
+ )
+
+ parser.set_defaults(
+ func=command_windows_integration,
+ targets_func=walk_windows_integration_targets,
+ config=WindowsIntegrationConfig,
+ )
+
+ windows_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='windows integration test arguments'))
+
+ add_integration_common(windows_integration)
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.WINDOWS_INTEGRATION) # windows-integration
diff --git a/test/lib/ansible_test/_internal/cli/commands/sanity.py b/test/lib/ansible_test/_internal/cli/commands/sanity.py
new file mode 100644
index 0000000..8b4a9ae
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/sanity.py
@@ -0,0 +1,119 @@
+"""Command line parsing for the `sanity` command."""
+from __future__ import annotations
+
+import argparse
+
+from ...config import (
+ SanityConfig,
+)
+
+from ...commands.sanity import (
+ command_sanity,
+ sanity_get_tests,
+)
+
+from ...target import (
+ walk_sanity_targets,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ..environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_sanity(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `sanity` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'sanity',
+ parents=[parent],
+ help='sanity tests',
+ )
+
+ parser.set_defaults(
+ func=command_sanity,
+ targets_func=walk_sanity_targets,
+ config=SanityConfig)
+
+ sanity = parser.add_argument_group(title='sanity test arguments')
+
+ sanity.add_argument(
+ '--test',
+ metavar='TEST',
+ action='append',
+ choices=[test.name for test in sanity_get_tests()],
+ help='tests to run',
+ )
+
+ sanity.add_argument(
+ '--skip-test',
+ metavar='TEST',
+ action='append',
+ choices=[test.name for test in sanity_get_tests()],
+ help='tests to skip',
+ )
+
+ sanity.add_argument(
+ '--allow-disabled',
+ action='store_true',
+ help='allow tests to run which are disabled by default',
+ )
+
+ sanity.add_argument(
+ '--list-tests',
+ action='store_true',
+ help='list available tests',
+ )
+
+ sanity.add_argument(
+ '--enable-optional-errors',
+ action='store_true',
+ help='enable optional errors',
+ )
+
+ if data_context().content.is_ansible:
+ sanity.add_argument(
+ '--keep-git',
+ action='store_true',
+ help='transfer git related files to the remote host/container',
+ )
+ else:
+ sanity.set_defaults(
+ keep_git=False,
+ )
+
+ sanity.add_argument(
+ '--lint',
+ action='store_true',
+ help='write lint output to stdout, everything else stderr',
+ )
+
+ sanity.add_argument(
+ '--junit',
+ action='store_true',
+ help='write test failures to junit xml files',
+ )
+
+ sanity.add_argument(
+ '--failure-ok',
+ action='store_true',
+ help='exit successfully on failed tests after saving results',
+ )
+
+ sanity.add_argument(
+ '--prime-venvs',
+ action='store_true',
+ help='prepare virtual environments without running tests'
+ )
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.SANITY) # sanity
diff --git a/test/lib/ansible_test/_internal/cli/commands/shell.py b/test/lib/ansible_test/_internal/cli/commands/shell.py
new file mode 100644
index 0000000..1baffc6
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/shell.py
@@ -0,0 +1,59 @@
+"""Command line parsing for the `shell` command."""
+from __future__ import annotations
+
+import argparse
+
+from ...commands.shell import (
+ command_shell,
+)
+
+from ...config import (
+ ShellConfig,
+)
+
+from ..environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_shell(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `shell` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'shell',
+ parents=[parent],
+ help='open an interactive shell',
+ )
+
+ parser.set_defaults(
+ func=command_shell,
+ config=ShellConfig,
+ )
+
+ shell = parser.add_argument_group(title='shell arguments')
+
+ shell.add_argument(
+ 'cmd',
+ nargs='*',
+ help='run the specified command',
+ )
+
+ shell.add_argument(
+ '--raw',
+ action='store_true',
+ help='direct to shell with no setup',
+ )
+
+ shell.add_argument(
+ '--export',
+ metavar='PATH',
+ help='export inventory instead of opening a shell',
+ )
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.SHELL) # shell
diff --git a/test/lib/ansible_test/_internal/cli/commands/units.py b/test/lib/ansible_test/_internal/cli/commands/units.py
new file mode 100644
index 0000000..c541a87
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/commands/units.py
@@ -0,0 +1,65 @@
+"""Command line parsing for the `units` command."""
+from __future__ import annotations
+
+import argparse
+
+from ...config import (
+ UnitsConfig,
+)
+
+from ...commands.units import (
+ command_units,
+)
+
+from ...target import (
+ walk_units_targets,
+)
+
+from ..environments import (
+ CompositeActionCompletionFinder,
+ ControllerMode,
+ TargetMode,
+ add_environments,
+)
+
+
+def do_units(
+ subparsers,
+ parent: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+):
+ """Command line parsing for the `units` command."""
+ parser: argparse.ArgumentParser = subparsers.add_parser(
+ 'units',
+ parents=[parent],
+ help='unit tests',
+ )
+
+ parser.set_defaults(
+ func=command_units,
+ targets_func=walk_units_targets,
+ config=UnitsConfig,
+ )
+
+ units = parser.add_argument_group(title='unit test arguments')
+
+ units.add_argument(
+ '--collect-only',
+ action='store_true',
+ help='collect tests but do not execute them',
+ )
+
+ units.add_argument(
+ '--num-workers',
+ metavar='INT',
+ type=int,
+ help='number of workers to use (default: auto)',
+ )
+
+ units.add_argument(
+ '--requirements-mode',
+ choices=('only', 'skip'),
+ help=argparse.SUPPRESS,
+ )
+
+ add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.UNITS) # units
diff --git a/test/lib/ansible_test/_internal/cli/compat.py b/test/lib/ansible_test/_internal/cli/compat.py
new file mode 100644
index 0000000..93006d5
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/compat.py
@@ -0,0 +1,505 @@
+"""Provides compatibility with first-generation host delegation options in ansible-test."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import dataclasses
+import enum
+import os
+import types
+import typing as t
+
+from ..constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ..util import (
+ ApplicationError,
+ display,
+ filter_args,
+ sorted_versions,
+ str_to_version,
+)
+
+from ..docker_util import (
+ docker_available,
+)
+
+from ..completion import (
+ docker_completion,
+ remote_completion,
+ filter_completion,
+)
+
+from ..host_configs import (
+ ControllerConfig,
+ ControllerHostConfig,
+ DockerConfig,
+ FallbackDetail,
+ FallbackReason,
+ HostConfig,
+ HostContext,
+ HostSettings,
+ NativePythonConfig,
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+ OriginConfig,
+ PosixRemoteConfig,
+ VirtualPythonConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ..data import (
+ data_context,
+)
+
+
+def filter_python(version: t.Optional[str], versions: t.Optional[c.Sequence[str]]) -> t.Optional[str]:
+ """If a Python version is given and is in the given version list, return that Python version, otherwise return None."""
+ return version if version in versions else None
+
+
+def controller_python(version: t.Optional[str]) -> t.Optional[str]:
+ """If a Python version is given and is supported by the controller, return that Python version, otherwise return None."""
+ return filter_python(version, CONTROLLER_PYTHON_VERSIONS)
+
+
+def get_fallback_remote_controller() -> str:
+ """Return the remote fallback platform for the controller."""
+ platform = 'freebsd' # lower cost than RHEL and macOS
+ candidates = [item for item in filter_completion(remote_completion()).values() if item.controller_supported and item.platform == platform]
+ fallback = sorted(candidates, key=lambda value: str_to_version(value.version), reverse=True)[0]
+ return fallback.name
+
+
+def get_option_name(name: str) -> str:
+ """Return a command-line option name from the given option name."""
+ if name == 'targets':
+ name = 'target'
+
+ return f'--{name.replace("_", "-")}'
+
+
+class PythonVersionUnsupportedError(ApplicationError):
+ """A Python version was requested for a context which does not support that version."""
+ def __init__(self, context: str, version: str, versions: c.Iterable[str]) -> None:
+ super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}')
+
+
+class PythonVersionUnspecifiedError(ApplicationError):
+ """A Python version was not specified for a context which is unknown, thus the Python version is unknown."""
+ def __init__(self, context: str) -> None:
+ super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.')
+
+
+class ControllerNotSupportedError(ApplicationError):
+ """Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target."""
+ def __init__(self, context: str) -> None:
+ super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.')
+
+
+class OptionsConflictError(ApplicationError):
+ """Option(s) were specified which conflict with other options."""
+ def __init__(self, first: c.Iterable[str], second: c.Iterable[str]) -> None:
+ super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.')
+
+
+@dataclasses.dataclass(frozen=True)
+class LegacyHostOptions:
+ """Legacy host options used prior to the availability of separate controller and target host configuration."""
+ python: t.Optional[str] = None
+ python_interpreter: t.Optional[str] = None
+ local: t.Optional[bool] = None
+ venv: t.Optional[bool] = None
+ venv_system_site_packages: t.Optional[bool] = None
+ remote: t.Optional[str] = None
+ remote_provider: t.Optional[str] = None
+ remote_arch: t.Optional[str] = None
+ docker: t.Optional[str] = None
+ docker_privileged: t.Optional[bool] = None
+ docker_seccomp: t.Optional[str] = None
+ docker_memory: t.Optional[int] = None
+ windows: t.Optional[list[str]] = None
+ platform: t.Optional[list[str]] = None
+ platform_collection: t.Optional[list[tuple[str, str]]] = None
+ platform_connection: t.Optional[list[tuple[str, str]]] = None
+ inventory: t.Optional[str] = None
+
+ @staticmethod
+ def create(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> LegacyHostOptions:
+ """Create legacy host options from the given namespace."""
+ kwargs = {field.name: getattr(namespace, field.name, None) for field in dataclasses.fields(LegacyHostOptions)}
+
+ if kwargs['python'] == 'default':
+ kwargs['python'] = None
+
+ return LegacyHostOptions(**kwargs)
+
+ @staticmethod
+ def purge_namespace(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> None:
+ """Purge legacy host options fields from the given namespace."""
+ for field in dataclasses.fields(LegacyHostOptions):
+ if hasattr(namespace, field.name):
+ delattr(namespace, field.name)
+
+ @staticmethod
+ def purge_args(args: list[str]) -> list[str]:
+ """Purge legacy host options from the given command line arguments."""
+ fields: tuple[dataclasses.Field, ...] = dataclasses.fields(LegacyHostOptions)
+ filters: dict[str, int] = {get_option_name(field.name): 0 if field.type is t.Optional[bool] else 1 for field in fields}
+
+ return filter_args(args, filters)
+
+ def get_options_used(self) -> tuple[str, ...]:
+ """Return a tuple of the command line options used."""
+ fields: tuple[dataclasses.Field, ...] = dataclasses.fields(self)
+ options = tuple(sorted(get_option_name(field.name) for field in fields if getattr(self, field.name)))
+ return options
+
+
+class TargetMode(enum.Enum):
+ """Type of provisioning to use for the targets."""
+ WINDOWS_INTEGRATION = enum.auto() # windows-integration
+ NETWORK_INTEGRATION = enum.auto() # network-integration
+ POSIX_INTEGRATION = enum.auto() # integration
+ SANITY = enum.auto() # sanity
+ UNITS = enum.auto() # units
+ SHELL = enum.auto() # shell
+ NO_TARGETS = enum.auto() # coverage
+
+ @property
+ def one_host(self) -> bool:
+ """Return True if only one host (the controller) should be used, otherwise return False."""
+ return self in (TargetMode.SANITY, TargetMode.UNITS, TargetMode.NO_TARGETS)
+
+ @property
+ def no_fallback(self) -> bool:
+ """Return True if no fallback is acceptable for the controller (due to options not applying to the target), otherwise return False."""
+ return self in (TargetMode.WINDOWS_INTEGRATION, TargetMode.NETWORK_INTEGRATION, TargetMode.NO_TARGETS)
+
+ @property
+ def multiple_pythons(self) -> bool:
+ """Return True if multiple Python versions are allowed, otherwise False."""
+ return self in (TargetMode.SANITY, TargetMode.UNITS)
+
+ @property
+ def has_python(self) -> bool:
+ """Return True if this mode uses Python, otherwise False."""
+ return self in (TargetMode.POSIX_INTEGRATION, TargetMode.SANITY, TargetMode.UNITS, TargetMode.SHELL)
+
+
+def convert_legacy_args(
+ argv: list[str],
+ args: t.Union[argparse.Namespace, types.SimpleNamespace],
+ mode: TargetMode,
+) -> HostSettings:
+ """Convert pre-split host arguments in the given namespace to their split counterparts."""
+ old_options = LegacyHostOptions.create(args)
+ old_options.purge_namespace(args)
+
+ new_options = [
+ '--controller',
+ '--target',
+ '--target-python',
+ '--target-posix',
+ '--target-windows',
+ '--target-network',
+ ]
+
+ used_old_options = old_options.get_options_used()
+ used_new_options = [name for name in new_options if name in argv]
+
+ if used_old_options:
+ if used_new_options:
+ raise OptionsConflictError(used_old_options, used_new_options)
+
+ controller, targets, controller_fallback = get_legacy_host_config(mode, old_options)
+
+ if controller_fallback:
+ if mode.one_host:
+ display.info(controller_fallback.message, verbosity=1)
+ else:
+ display.warning(controller_fallback.message)
+
+ used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS) and not native_python(old_options)
+ else:
+ controller = args.controller or OriginConfig()
+ controller_fallback = None
+
+ if mode == TargetMode.NO_TARGETS:
+ targets = []
+ used_default_pythons = False
+ elif args.targets:
+ targets = args.targets
+ used_default_pythons = False
+ else:
+ targets = default_targets(mode, controller)
+ used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS)
+
+ args.controller = controller
+ args.targets = targets
+
+ if used_default_pythons:
+ control_targets = t.cast(list[ControllerConfig], targets)
+ skipped_python_versions = sorted_versions(list(set(SUPPORTED_PYTHON_VERSIONS) - {target.python.version for target in control_targets}))
+ else:
+ skipped_python_versions = []
+
+ filtered_args = old_options.purge_args(argv)
+ filtered_args = filter_args(filtered_args, {name: 1 for name in new_options})
+
+ host_settings = HostSettings(
+ controller=controller,
+ targets=targets,
+ skipped_python_versions=skipped_python_versions,
+ filtered_args=filtered_args,
+ controller_fallback=controller_fallback,
+ )
+
+ return host_settings
+
+
+def controller_targets(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+ controller: ControllerHostConfig,
+) -> list[HostConfig]:
+ """Return the configuration for controller targets."""
+ python = native_python(options)
+
+ targets: list[HostConfig]
+
+ if python:
+ targets = [ControllerConfig(python=python)]
+ else:
+ targets = default_targets(mode, controller)
+
+ return targets
+
+
+def native_python(options: LegacyHostOptions) -> t.Optional[NativePythonConfig]:
+ """Return a NativePythonConfig for the given version if it is not None, otherwise return None."""
+ if not options.python and not options.python_interpreter:
+ return None
+
+ return NativePythonConfig(version=options.python, path=options.python_interpreter)
+
+
+def get_legacy_host_config(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+) -> tuple[ControllerHostConfig, list[HostConfig], t.Optional[FallbackDetail]]:
+ """
+ Returns controller and target host configs derived from the provided legacy host options.
+ The goal is to match the original behavior, by using non-split testing whenever possible.
+ When the options support the controller, use the options for the controller and use ControllerConfig for the targets.
+ When the options do not support the controller, use the options for the targets and use a default controller config influenced by the options.
+ """
+ venv_fallback = 'venv/default'
+ docker_fallback = 'default'
+ remote_fallback = get_fallback_remote_controller()
+
+ controller_fallback: t.Optional[tuple[str, str, FallbackReason]] = None
+
+ controller: t.Optional[ControllerHostConfig]
+ targets: list[HostConfig]
+
+ if options.venv:
+ if controller_python(options.python) or not options.python:
+ controller = OriginConfig(python=VirtualPythonConfig(version=options.python or 'default', system_site_packages=options.venv_system_site_packages))
+ else:
+ controller_fallback = f'origin:python={venv_fallback}', f'--venv --python {options.python}', FallbackReason.PYTHON
+ controller = OriginConfig(python=VirtualPythonConfig(version='default', system_site_packages=options.venv_system_site_packages))
+
+ if mode in (TargetMode.SANITY, TargetMode.UNITS):
+ python = native_python(options)
+
+ if python:
+ control_targets = [ControllerConfig(python=python)]
+ else:
+ control_targets = controller.get_default_targets(HostContext(controller_config=controller))
+
+ # Target sanity tests either have no Python requirements or manage their own virtual environments.
+ # Thus, there is no point in setting up virtual environments ahead of time for them.
+
+ if mode == TargetMode.UNITS:
+ targets = [ControllerConfig(python=VirtualPythonConfig(version=target.python.version, path=target.python.path,
+ system_site_packages=options.venv_system_site_packages)) for target in control_targets]
+ else:
+ targets = t.cast(list[HostConfig], control_targets)
+ else:
+ targets = [ControllerConfig(python=VirtualPythonConfig(version=options.python or 'default',
+ system_site_packages=options.venv_system_site_packages))]
+ elif options.docker:
+ docker_config = filter_completion(docker_completion()).get(options.docker)
+
+ if docker_config:
+ if options.python and options.python not in docker_config.supported_pythons:
+ raise PythonVersionUnsupportedError(f'--docker {options.docker}', options.python, docker_config.supported_pythons)
+
+ if docker_config.controller_supported:
+ if controller_python(options.python) or not options.python:
+ controller = DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{options.docker}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON
+ controller = DockerConfig(name=options.docker)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker}', FallbackReason.ENVIRONMENT
+ controller = DockerConfig(name=docker_fallback)
+ targets = [DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)]
+ else:
+ if not options.python:
+ raise PythonVersionUnspecifiedError(f'--docker {options.docker}')
+
+ if controller_python(options.python):
+ controller = DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON
+ controller = DockerConfig(name=docker_fallback)
+ targets = [DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)]
+ elif options.remote:
+ remote_config = filter_completion(remote_completion()).get(options.remote)
+ context, reason = None, None
+
+ if remote_config:
+ if options.python and options.python not in remote_config.supported_pythons:
+ raise PythonVersionUnsupportedError(f'--remote {options.remote}', options.python, remote_config.supported_pythons)
+
+ if remote_config.controller_supported:
+ if controller_python(options.python) or not options.python:
+ controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider,
+ arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'remote:{options.remote}', f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON
+ controller = PosixRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ context, reason = f'--remote {options.remote}', FallbackReason.ENVIRONMENT
+ controller = None
+ targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)]
+ elif mode == TargetMode.SHELL and options.remote.startswith('windows/'):
+ if options.python and options.python not in CONTROLLER_PYTHON_VERSIONS:
+ raise ControllerNotSupportedError(f'--python {options.python}')
+
+ controller = OriginConfig(python=native_python(options))
+ targets = [WindowsRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch)]
+ else:
+ if not options.python:
+ raise PythonVersionUnspecifiedError(f'--remote {options.remote}')
+
+ if controller_python(options.python):
+ controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ context, reason = f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON
+ controller = None
+ targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)]
+
+ if not controller:
+ if docker_available():
+ controller_fallback = f'docker:{docker_fallback}', context, reason
+ controller = DockerConfig(name=docker_fallback)
+ else:
+ controller_fallback = f'remote:{remote_fallback}', context, reason
+ controller = PosixRemoteConfig(name=remote_fallback)
+ else: # local/unspecified
+ # There are several changes in behavior from the legacy implementation when using no delegation (or the `--local` option).
+ # These changes are due to ansible-test now maintaining consistency between its own Python and that of controller Python subprocesses.
+ #
+ # 1) The `--python-interpreter` option (if different from sys.executable) now affects controller subprocesses and triggers re-execution of ansible-test.
+ # Previously this option was completely ignored except when used with the `--docker` or `--remote` options.
+ # 2) The `--python` option now triggers re-execution of ansible-test if it differs from sys.version_info.
+ # Previously it affected Python subprocesses, but not ansible-test itself.
+
+ if controller_python(options.python) or not options.python:
+ controller = OriginConfig(python=native_python(options))
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = 'origin:python=default', f'--python {options.python}', FallbackReason.PYTHON
+ controller = OriginConfig()
+ targets = controller_targets(mode, options, controller)
+
+ if controller_fallback:
+ controller_option, context, reason = controller_fallback
+
+ if mode.no_fallback:
+ raise ControllerNotSupportedError(context)
+
+ fallback_detail = FallbackDetail(
+ reason=reason,
+ message=f'Using `--controller {controller_option}` since `{context}` does not support the controller.',
+ )
+ else:
+ fallback_detail = None
+
+ if mode.one_host and any(not isinstance(target, ControllerConfig) for target in targets):
+ raise ControllerNotSupportedError(controller_fallback[1])
+
+ if mode == TargetMode.NO_TARGETS:
+ targets = []
+ else:
+ targets = handle_non_posix_targets(mode, options, targets)
+
+ return controller, targets, fallback_detail
+
+
+def handle_non_posix_targets(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+ targets: list[HostConfig],
+) -> list[HostConfig]:
+ """Return a list of non-POSIX targets if the target mode is non-POSIX."""
+ if mode == TargetMode.WINDOWS_INTEGRATION:
+ if options.windows:
+ targets = [WindowsRemoteConfig(name=f'windows/{version}', provider=options.remote_provider, arch=options.remote_arch)
+ for version in options.windows]
+ else:
+ targets = [WindowsInventoryConfig(path=options.inventory)]
+ elif mode == TargetMode.NETWORK_INTEGRATION:
+ if options.platform:
+ network_targets = [NetworkRemoteConfig(name=platform, provider=options.remote_provider, arch=options.remote_arch) for platform in options.platform]
+
+ for platform, collection in options.platform_collection or []:
+ for entry in network_targets:
+ if entry.platform == platform:
+ entry.collection = collection
+
+ for platform, connection in options.platform_connection or []:
+ for entry in network_targets:
+ if entry.platform == platform:
+ entry.connection = connection
+
+ targets = t.cast(list[HostConfig], network_targets)
+ else:
+ targets = [NetworkInventoryConfig(path=options.inventory)]
+
+ return targets
+
+
+def default_targets(
+ mode: TargetMode,
+ controller: ControllerHostConfig,
+) -> list[HostConfig]:
+ """Return a list of default targets for the given target mode."""
+ targets: list[HostConfig]
+
+ if mode == TargetMode.WINDOWS_INTEGRATION:
+ targets = [WindowsInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.winrm')))]
+ elif mode == TargetMode.NETWORK_INTEGRATION:
+ targets = [NetworkInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.networking')))]
+ elif mode.multiple_pythons:
+ targets = t.cast(list[HostConfig], controller.get_default_targets(HostContext(controller_config=controller)))
+ else:
+ targets = [ControllerConfig()]
+
+ return targets
diff --git a/test/lib/ansible_test/_internal/cli/completers.py b/test/lib/ansible_test/_internal/cli/completers.py
new file mode 100644
index 0000000..903b69b
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/completers.py
@@ -0,0 +1,30 @@
+"""Completers for use with argcomplete."""
+from __future__ import annotations
+
+import argparse
+
+from ..target import (
+ find_target_completion,
+)
+
+from .argparsing.argcompletion import (
+ OptionCompletionFinder,
+)
+
+
+def complete_target(completer: OptionCompletionFinder, prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Perform completion for the targets configured for the command being parsed."""
+ matches = find_target_completion(parsed_args.targets_func, prefix, completer.list_mode)
+ completer.disable_completion_mangling = completer.list_mode and len(matches) > 1
+ return matches
+
+
+def complete_choices(choices: list[str], prefix: str, **_) -> list[str]:
+ """Perform completion using the provided choices."""
+ matches = [choice for choice in choices if choice.startswith(prefix)]
+ return matches
+
+
+def register_completer(action: argparse.Action, completer) -> None:
+ """Register the given completer with the specified action."""
+ action.completer = completer # type: ignore[attr-defined] # intentionally using an attribute that does not exist
diff --git a/test/lib/ansible_test/_internal/cli/converters.py b/test/lib/ansible_test/_internal/cli/converters.py
new file mode 100644
index 0000000..71e0dae
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/converters.py
@@ -0,0 +1,19 @@
+"""Converters for use as the type argument for arparse's add_argument method."""
+from __future__ import annotations
+
+import argparse
+
+
+def key_value_type(value: str) -> tuple[str, str]:
+ """Wrapper around key_value."""
+ return key_value(value)
+
+
+def key_value(value: str) -> tuple[str, str]:
+ """Type parsing and validation for argparse key/value pairs separated by an '=' character."""
+ parts = value.split('=')
+
+ if len(parts) != 2:
+ raise argparse.ArgumentTypeError('"%s" must be in the format "key=value"' % value)
+
+ return parts[0], parts[1]
diff --git a/test/lib/ansible_test/_internal/cli/environments.py b/test/lib/ansible_test/_internal/cli/environments.py
new file mode 100644
index 0000000..5063715
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/environments.py
@@ -0,0 +1,612 @@
+"""Command line parsing for test environments."""
+from __future__ import annotations
+
+import argparse
+import enum
+import functools
+import typing as t
+
+from ..constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ REMOTE_PROVIDERS,
+ SECCOMP_CHOICES,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ..util import (
+ REMOTE_ARCHITECTURES,
+)
+
+from ..completion import (
+ docker_completion,
+ network_completion,
+ remote_completion,
+ windows_completion,
+ filter_completion,
+)
+
+from ..cli.argparsing import (
+ CompositeAction,
+ CompositeActionCompletionFinder,
+)
+
+from ..cli.argparsing.actions import (
+ EnumAction,
+)
+
+from ..cli.actions import (
+ DelegatedControllerAction,
+ NetworkSshTargetAction,
+ NetworkTargetAction,
+ OriginControllerAction,
+ PosixSshTargetAction,
+ PosixTargetAction,
+ SanityPythonTargetAction,
+ UnitsPythonTargetAction,
+ WindowsSshTargetAction,
+ WindowsTargetAction,
+)
+
+from ..cli.compat import (
+ TargetMode,
+)
+
+from ..config import (
+ TerminateMode,
+)
+
+from .completers import (
+ complete_choices,
+ register_completer,
+)
+
+from .converters import (
+ key_value_type,
+)
+
+from .epilog import (
+ get_epilog,
+)
+
+from ..ci import (
+ get_ci_provider,
+)
+
+
+class ControllerMode(enum.Enum):
+ """Type of provisioning to use for the controller."""
+ NO_DELEGATION = enum.auto()
+ ORIGIN = enum.auto()
+ DELEGATED = enum.auto()
+
+
+def add_environments(
+ parser: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+ controller_mode: ControllerMode,
+ target_mode: TargetMode,
+) -> None:
+ """Add arguments for the environments used to run ansible-test and commands it invokes."""
+ no_environment = controller_mode == ControllerMode.NO_DELEGATION and target_mode == TargetMode.NO_TARGETS
+
+ parser.set_defaults(no_environment=no_environment)
+
+ if no_environment:
+ return
+
+ parser.set_defaults(target_mode=target_mode)
+
+ add_global_options(parser, controller_mode)
+ add_legacy_environment_options(parser, controller_mode, target_mode)
+ action_types = add_composite_environment_options(parser, completer, controller_mode, target_mode)
+
+ sections = [f'{heading}\n{content}'
+ for action_type, documentation_state in CompositeAction.documentation_state.items() if action_type in action_types
+ for heading, content in documentation_state.sections.items()]
+
+ if not get_ci_provider().supports_core_ci_auth():
+ sections.append('Remote provisioning options have been hidden since no Ansible Core CI API key was found.')
+
+ sections.append(get_epilog(completer))
+
+ parser.formatter_class = argparse.RawDescriptionHelpFormatter
+ parser.epilog = '\n\n'.join(sections)
+
+
+def add_global_options(
+ parser: argparse.ArgumentParser,
+ controller_mode: ControllerMode,
+):
+ """Add global options for controlling the test environment that work with both the legacy and composite options."""
+ global_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='global environment arguments'))
+
+ global_parser.add_argument(
+ '--containers',
+ metavar='JSON',
+ help=argparse.SUPPRESS,
+ )
+
+ global_parser.add_argument(
+ '--pypi-proxy',
+ action='store_true',
+ help=argparse.SUPPRESS,
+ )
+
+ global_parser.add_argument(
+ '--pypi-endpoint',
+ metavar='URI',
+ help=argparse.SUPPRESS,
+ )
+
+ global_parser.add_argument(
+ '--requirements',
+ action='store_true',
+ default=False,
+ help='install command requirements',
+ )
+
+ global_parser.add_argument(
+ '--no-pip-check',
+ action='store_true',
+ help=argparse.SUPPRESS, # deprecated, kept for now (with a warning) for backwards compatibility
+ )
+
+ add_global_remote(global_parser, controller_mode)
+ add_global_docker(global_parser, controller_mode)
+
+
+def add_composite_environment_options(
+ parser: argparse.ArgumentParser,
+ completer: CompositeActionCompletionFinder,
+ controller_mode: ControllerMode,
+ target_mode: TargetMode,
+) -> list[t.Type[CompositeAction]]:
+ """Add composite options for controlling the test environment."""
+ composite_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(
+ title='composite environment arguments (mutually exclusive with "environment arguments" above)'))
+
+ composite_parser.add_argument(
+ '--host-path',
+ help=argparse.SUPPRESS,
+ )
+
+ action_types: list[t.Type[CompositeAction]] = []
+
+ def register_action_type(action_type: t.Type[CompositeAction]) -> t.Type[CompositeAction]:
+ """Register the provided composite action type and return it."""
+ action_types.append(action_type)
+ return action_type
+
+ if controller_mode == ControllerMode.NO_DELEGATION:
+ composite_parser.set_defaults(controller=None)
+ else:
+ register_completer(composite_parser.add_argument(
+ '--controller',
+ metavar='OPT',
+ action=register_action_type(DelegatedControllerAction if controller_mode == ControllerMode.DELEGATED else OriginControllerAction),
+ help='configuration for the controller',
+ ), completer.completer)
+
+ if target_mode == TargetMode.NO_TARGETS:
+ composite_parser.set_defaults(targets=[])
+ elif target_mode == TargetMode.SHELL:
+ group = composite_parser.add_mutually_exclusive_group()
+
+ register_completer(group.add_argument(
+ '--target-posix',
+ metavar='OPT',
+ action=register_action_type(PosixSshTargetAction),
+ help='configuration for the target',
+ ), completer.completer)
+
+ suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
+
+ register_completer(group.add_argument(
+ '--target-windows',
+ metavar='OPT',
+ action=WindowsSshTargetAction if suppress else register_action_type(WindowsSshTargetAction),
+ help=suppress or 'configuration for the target',
+ ), completer.completer)
+
+ register_completer(group.add_argument(
+ '--target-network',
+ metavar='OPT',
+ action=NetworkSshTargetAction if suppress else register_action_type(NetworkSshTargetAction),
+ help=suppress or 'configuration for the target',
+ ), completer.completer)
+ else:
+ if target_mode.multiple_pythons:
+ target_option = '--target-python'
+ target_help = 'configuration for the target python interpreter(s)'
+ elif target_mode == TargetMode.POSIX_INTEGRATION:
+ target_option = '--target'
+ target_help = 'configuration for the target'
+ else:
+ target_option = '--target'
+ target_help = 'configuration for the target(s)'
+
+ target_actions = {
+ TargetMode.POSIX_INTEGRATION: PosixTargetAction,
+ TargetMode.WINDOWS_INTEGRATION: WindowsTargetAction,
+ TargetMode.NETWORK_INTEGRATION: NetworkTargetAction,
+ TargetMode.SANITY: SanityPythonTargetAction,
+ TargetMode.UNITS: UnitsPythonTargetAction,
+ }
+
+ target_action = target_actions[target_mode]
+
+ register_completer(composite_parser.add_argument(
+ target_option,
+ metavar='OPT',
+ action=register_action_type(target_action),
+ help=target_help,
+ ), completer.completer)
+
+ return action_types
+
+
+def add_legacy_environment_options(
+ parser: argparse.ArgumentParser,
+ controller_mode: ControllerMode,
+ target_mode: TargetMode,
+):
+ """Add legacy options for controlling the test environment."""
+ environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private
+ title='environment arguments (mutually exclusive with "composite environment arguments" below)')
+
+ add_environments_python(environment, target_mode)
+ add_environments_host(environment, controller_mode, target_mode)
+
+
+def add_environments_python(
+ environments_parser: argparse.ArgumentParser,
+ target_mode: TargetMode,
+) -> None:
+ """Add environment arguments to control the Python version(s) used."""
+ python_versions: tuple[str, ...]
+
+ if target_mode.has_python:
+ python_versions = SUPPORTED_PYTHON_VERSIONS
+ else:
+ python_versions = CONTROLLER_PYTHON_VERSIONS
+
+ environments_parser.add_argument(
+ '--python',
+ metavar='X.Y',
+ choices=python_versions + ('default',),
+ help='python version: %s' % ', '.join(python_versions),
+ )
+
+ environments_parser.add_argument(
+ '--python-interpreter',
+ metavar='PATH',
+ help='path to the python interpreter',
+ )
+
+
+def add_environments_host(
+ environments_parser: argparse.ArgumentParser,
+ controller_mode: ControllerMode,
+ target_mode: TargetMode,
+) -> None:
+ """Add environment arguments for the given host and argument modes."""
+ environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private
+
+ add_environment_local(environments_exclusive_group)
+ add_environment_venv(environments_exclusive_group, environments_parser)
+
+ if controller_mode == ControllerMode.DELEGATED:
+ add_environment_remote(environments_exclusive_group, environments_parser, target_mode)
+ add_environment_docker(environments_exclusive_group, environments_parser, target_mode)
+
+ if target_mode == TargetMode.WINDOWS_INTEGRATION:
+ add_environment_windows(environments_parser)
+
+ if target_mode == TargetMode.NETWORK_INTEGRATION:
+ add_environment_network(environments_parser)
+
+
+def add_environment_network(
+ environments_parser: argparse.ArgumentParser,
+) -> None:
+ """Add environment arguments for running on a windows host."""
+ register_completer(environments_parser.add_argument(
+ '--platform',
+ metavar='PLATFORM',
+ action='append',
+ help='network platform/version',
+ ), complete_network_platform)
+
+ register_completer(environments_parser.add_argument(
+ '--platform-collection',
+ type=key_value_type,
+ metavar='PLATFORM=COLLECTION',
+ action='append',
+ help='collection used to test platform',
+ ), complete_network_platform_collection)
+
+ register_completer(environments_parser.add_argument(
+ '--platform-connection',
+ type=key_value_type,
+ metavar='PLATFORM=CONNECTION',
+ action='append',
+ help='connection used to test platform',
+ ), complete_network_platform_connection)
+
+ environments_parser.add_argument(
+ '--inventory',
+ metavar='PATH',
+ help='path to inventory used for tests',
+ )
+
+
+def add_environment_windows(
+ environments_parser: argparse.ArgumentParser,
+) -> None:
+ """Add environment arguments for running on a windows host."""
+ register_completer(environments_parser.add_argument(
+ '--windows',
+ metavar='VERSION',
+ action='append',
+ help='windows version',
+ ), complete_windows)
+
+ environments_parser.add_argument(
+ '--inventory',
+ metavar='PATH',
+ help='path to inventory used for tests',
+ )
+
+
+def add_environment_local(
+ exclusive_parser: argparse.ArgumentParser,
+) -> None:
+ """Add environment arguments for running on the local (origin) host."""
+ exclusive_parser.add_argument(
+ '--local',
+ action='store_true',
+ help='run from the local environment',
+ )
+
+
+def add_environment_venv(
+ exclusive_parser: argparse.ArgumentParser,
+ environments_parser: argparse.ArgumentParser,
+) -> None:
+ """Add environment arguments for running in ansible-test managed virtual environments."""
+ exclusive_parser.add_argument(
+ '--venv',
+ action='store_true',
+ help='run from a virtual environment',
+ )
+
+ environments_parser.add_argument(
+ '--venv-system-site-packages',
+ action='store_true',
+ help='enable system site packages')
+
+
+def add_global_docker(
+ parser: argparse.ArgumentParser,
+ controller_mode: ControllerMode,
+) -> None:
+ """Add global options for Docker."""
+ if controller_mode != ControllerMode.DELEGATED:
+ parser.set_defaults(
+ docker_no_pull=False,
+ docker_network=None,
+ docker_terminate=None,
+ prime_containers=False,
+ dev_systemd_debug=False,
+ dev_probe_cgroups=None,
+ )
+
+ return
+
+ parser.add_argument(
+ '--docker-no-pull',
+ action='store_true',
+ help=argparse.SUPPRESS, # deprecated, kept for now (with a warning) for backwards compatibility
+ )
+
+ parser.add_argument(
+ '--docker-network',
+ metavar='NET',
+ help='run using the specified network',
+ )
+
+ parser.add_argument(
+ '--docker-terminate',
+ metavar='T',
+ default=TerminateMode.ALWAYS,
+ type=TerminateMode,
+ action=EnumAction,
+ help='terminate the container: %(choices)s (default: %(default)s)',
+ )
+
+ parser.add_argument(
+ '--prime-containers',
+ action='store_true',
+ help='download containers without running tests',
+ )
+
+ # Docker support isn't related to ansible-core-ci.
+ # However, ansible-core-ci support is a reasonable indicator that the user may need the `--dev-*` options.
+ suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
+
+ parser.add_argument(
+ '--dev-systemd-debug',
+ action='store_true',
+ help=suppress or 'enable systemd debugging in containers',
+ )
+
+ parser.add_argument(
+ '--dev-probe-cgroups',
+ metavar='DIR',
+ nargs='?',
+ const='',
+ help=suppress or 'probe container cgroups, with optional log dir',
+ )
+
+
+def add_environment_docker(
+ exclusive_parser: argparse.ArgumentParser,
+ environments_parser: argparse.ArgumentParser,
+ target_mode: TargetMode,
+) -> None:
+ """Add environment arguments for running in docker containers."""
+ if target_mode in (TargetMode.POSIX_INTEGRATION, TargetMode.SHELL):
+ docker_images = sorted(filter_completion(docker_completion()))
+ else:
+ docker_images = sorted(filter_completion(docker_completion(), controller_only=True))
+
+ register_completer(exclusive_parser.add_argument(
+ '--docker',
+ metavar='IMAGE',
+ nargs='?',
+ const='default',
+ help='run from a docker container',
+ ), functools.partial(complete_choices, docker_images))
+
+ environments_parser.add_argument(
+ '--docker-privileged',
+ action='store_true',
+ help='run docker container in privileged mode',
+ )
+
+ environments_parser.add_argument(
+ '--docker-seccomp',
+ metavar='SC',
+ choices=SECCOMP_CHOICES,
+ help='set seccomp confinement for the test container: %(choices)s',
+ )
+
+ environments_parser.add_argument(
+ '--docker-memory',
+ metavar='INT',
+ type=int,
+ help='memory limit for docker in bytes',
+ )
+
+
+def add_global_remote(
+ parser: argparse.ArgumentParser,
+ controller_mode: ControllerMode,
+) -> None:
+ """Add global options for remote instances."""
+ if controller_mode != ControllerMode.DELEGATED:
+ parser.set_defaults(
+ remote_stage=None,
+ remote_endpoint=None,
+ remote_terminate=None,
+ )
+
+ return
+
+ suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
+
+ register_completer(parser.add_argument(
+ '--remote-stage',
+ metavar='STAGE',
+ default='prod',
+ help=suppress or 'remote stage to use: prod, dev',
+ ), complete_remote_stage)
+
+ parser.add_argument(
+ '--remote-endpoint',
+ metavar='EP',
+ help=suppress or 'remote provisioning endpoint to use',
+ )
+
+ parser.add_argument(
+ '--remote-terminate',
+ metavar='T',
+ default=TerminateMode.NEVER,
+ type=TerminateMode,
+ action=EnumAction,
+ help=suppress or 'terminate the remote instance: %(choices)s (default: %(default)s)',
+ )
+
+
+def add_environment_remote(
+ exclusive_parser: argparse.ArgumentParser,
+ environments_parser: argparse.ArgumentParser,
+ target_mode: TargetMode,
+) -> None:
+ """Add environment arguments for running in ansible-core-ci provisioned remote virtual machines."""
+ if target_mode == TargetMode.POSIX_INTEGRATION:
+ remote_platforms = get_remote_platform_choices()
+ elif target_mode == TargetMode.SHELL:
+ remote_platforms = sorted(set(get_remote_platform_choices()) | set(get_windows_platform_choices()))
+ else:
+ remote_platforms = get_remote_platform_choices(True)
+
+ suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
+
+ register_completer(exclusive_parser.add_argument(
+ '--remote',
+ metavar='NAME',
+ help=suppress or 'run from a remote instance',
+ ), functools.partial(complete_choices, remote_platforms))
+
+ environments_parser.add_argument(
+ '--remote-provider',
+ metavar='PR',
+ choices=REMOTE_PROVIDERS,
+ help=suppress or 'remote provider to use: %(choices)s',
+ )
+
+ environments_parser.add_argument(
+ '--remote-arch',
+ metavar='ARCH',
+ choices=REMOTE_ARCHITECTURES,
+ help=suppress or 'remote arch to use: %(choices)s',
+ )
+
+
+def complete_remote_stage(prefix: str, **_) -> list[str]:
+ """Return a list of supported stages matching the given prefix."""
+ return [stage for stage in ('prod', 'dev') if stage.startswith(prefix)]
+
+
+def complete_windows(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Return a list of supported Windows versions matching the given prefix, excluding versions already parsed from the command line."""
+ return [i for i in get_windows_version_choices() if i.startswith(prefix) and (not parsed_args.windows or i not in parsed_args.windows)]
+
+
+def complete_network_platform(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Return a list of supported network platforms matching the given prefix, excluding platforms already parsed from the command line."""
+ images = sorted(filter_completion(network_completion()))
+
+ return [i for i in images if i.startswith(prefix) and (not parsed_args.platform or i not in parsed_args.platform)]
+
+
+def complete_network_platform_collection(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Return a list of supported network platforms matching the given prefix, excluding collection platforms already parsed from the command line."""
+ left = prefix.split('=')[0]
+ images = sorted(set(image.platform for image in filter_completion(network_completion()).values()))
+
+ return [i + '=' for i in images if i.startswith(left) and (not parsed_args.platform_collection or i not in [x[0] for x in parsed_args.platform_collection])]
+
+
+def complete_network_platform_connection(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]:
+ """Return a list of supported network platforms matching the given prefix, excluding connection platforms already parsed from the command line."""
+ left = prefix.split('=')[0]
+ images = sorted(set(image.platform for image in filter_completion(network_completion()).values()))
+
+ return [i + '=' for i in images if i.startswith(left) and (not parsed_args.platform_connection or i not in [x[0] for x in parsed_args.platform_connection])]
+
+
+def get_remote_platform_choices(controller: bool = False) -> list[str]:
+ """Return a list of supported remote platforms matching the given prefix."""
+ return sorted(filter_completion(remote_completion(), controller_only=controller))
+
+
+def get_windows_platform_choices() -> list[str]:
+ """Return a list of supported Windows versions matching the given prefix."""
+ return sorted(f'windows/{windows.version}' for windows in filter_completion(windows_completion()).values())
+
+
+def get_windows_version_choices() -> list[str]:
+ """Return a list of supported Windows versions."""
+ return sorted(windows.version for windows in filter_completion(windows_completion()).values())
diff --git a/test/lib/ansible_test/_internal/cli/epilog.py b/test/lib/ansible_test/_internal/cli/epilog.py
new file mode 100644
index 0000000..3800ff1
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/epilog.py
@@ -0,0 +1,23 @@
+"""Argument parsing epilog generation."""
+from __future__ import annotations
+
+from .argparsing import (
+ CompositeActionCompletionFinder,
+)
+
+from ..data import (
+ data_context,
+)
+
+
+def get_epilog(completer: CompositeActionCompletionFinder) -> str:
+ """Generate and return the epilog to use for help output."""
+ if completer.enabled:
+ epilog = 'Tab completion available using the "argcomplete" python package.'
+ else:
+ epilog = 'Install the "argcomplete" python package to enable tab completion.'
+
+ if data_context().content.unsupported:
+ epilog += '\n\n' + data_context().explain_working_directory()
+
+ return epilog
diff --git a/test/lib/ansible_test/_internal/cli/parsers/__init__.py b/test/lib/ansible_test/_internal/cli/parsers/__init__.py
new file mode 100644
index 0000000..1aedf63
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/__init__.py
@@ -0,0 +1,303 @@
+"""Composite argument parsers for ansible-test specific command-line arguments."""
+from __future__ import annotations
+
+import typing as t
+
+from ...constants import (
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...ci import (
+ get_ci_provider,
+)
+
+from ...host_configs import (
+ ControllerConfig,
+ NetworkConfig,
+ NetworkInventoryConfig,
+ PosixConfig,
+ WindowsConfig,
+ WindowsInventoryConfig,
+)
+
+from ..argparsing.parsers import (
+ DocumentationState,
+ Parser,
+ ParserState,
+ TypeParser,
+)
+
+from .value_parsers import (
+ PythonParser,
+)
+
+from .host_config_parsers import (
+ ControllerParser,
+ DockerParser,
+ NetworkInventoryParser,
+ NetworkRemoteParser,
+ OriginParser,
+ PosixRemoteParser,
+ PosixSshParser,
+ WindowsInventoryParser,
+ WindowsRemoteParser,
+)
+
+
+from .base_argument_parsers import (
+ ControllerNamespaceParser,
+ TargetNamespaceParser,
+ TargetsNamespaceParser,
+)
+
+
+class OriginControllerParser(ControllerNamespaceParser, TypeParser):
+ """Composite argument parser for the controller when delegation is not supported."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return dict(
+ origin=OriginParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = '--controller options:'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class DelegatedControllerParser(ControllerNamespaceParser, TypeParser):
+ """Composite argument parser for the controller when delegation is supported."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = dict(
+ origin=OriginParser(),
+ docker=DockerParser(controller=True),
+ )
+
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=PosixRemoteParser(controller=True),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = '--controller options:'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class PosixTargetParser(TargetNamespaceParser, TypeParser):
+ """Composite argument parser for a POSIX target."""
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = dict(
+ controller=ControllerParser(),
+ docker=DockerParser(controller=False),
+ )
+
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=PosixRemoteParser(controller=False),
+ )
+
+ parsers.update(
+ ssh=PosixSshParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
+ """Composite argument parser for a Windows target."""
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return True
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers(state.root_namespace.targets)
+
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers([])
+
+ def get_internal_parsers(self, targets: list[WindowsConfig]) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = {}
+
+ if self.allow_inventory and not targets:
+ parsers.update(
+ inventory=WindowsInventoryParser(),
+ )
+
+ if not targets or not any(isinstance(target, WindowsInventoryConfig) for target in targets):
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=WindowsRemoteParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
+ """Composite argument parser for a network target."""
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return True
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers(state.root_namespace.targets)
+
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ return self.get_internal_parsers([])
+
+ def get_internal_parsers(self, targets: list[NetworkConfig]) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+ parsers: dict[str, Parser] = {}
+
+ if self.allow_inventory and not targets:
+ parsers.update(
+ inventory=NetworkInventoryParser(),
+ )
+
+ if not targets or not any(isinstance(target, NetworkInventoryConfig) for target in targets):
+ if get_ci_provider().supports_core_ci_auth():
+ parsers.update(
+ remote=NetworkRemoteParser(),
+ )
+
+ return parsers
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '' # place this section before the sections created by the parsers below
+ state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()])
+
+ return None
+
+
+class PythonTargetParser(TargetsNamespaceParser, Parser):
+ """Composite argument parser for a Python target."""
+ def __init__(self, allow_venv: bool) -> None:
+ super().__init__()
+
+ self.allow_venv = allow_venv
+
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-python'
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ versions = list(SUPPORTED_PYTHON_VERSIONS)
+
+ for target in state.root_namespace.targets or []: # type: PosixConfig
+ versions.remove(target.python.version)
+
+ parser = PythonParser(versions, allow_venv=self.allow_venv, allow_default=True)
+ python = parser.parse(state)
+
+ value = ControllerConfig(python=python)
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section = f'{self.option_name} options (choose one):'
+
+ state.sections[section] = '\n'.join([
+ f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller',
+ f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller',
+ ])
+
+ return None
+
+
+class SanityPythonTargetParser(PythonTargetParser):
+ """Composite argument parser for a sanity Python target."""
+ def __init__(self) -> None:
+ super().__init__(allow_venv=False)
+
+
+class UnitsPythonTargetParser(PythonTargetParser):
+ """Composite argument parser for a units Python target."""
+ def __init__(self) -> None:
+ super().__init__(allow_venv=True)
+
+
+class PosixSshTargetParser(PosixTargetParser):
+ """Composite argument parser for a POSIX SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-posix'
+
+
+class WindowsSshTargetParser(WindowsTargetParser):
+ """Composite argument parser for a Windows SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-windows'
+
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
+
+
+class NetworkSshTargetParser(NetworkTargetParser):
+ """Composite argument parser for a network SSH target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target-network'
+
+ @property
+ def allow_inventory(self) -> bool:
+ """True if inventory is allowed, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
diff --git a/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py
new file mode 100644
index 0000000..aac7a69
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py
@@ -0,0 +1,73 @@
+"""Base classes for the primary parsers for composite command line arguments."""
+from __future__ import annotations
+
+import abc
+import typing as t
+
+from ..argparsing.parsers import (
+ CompletionError,
+ NamespaceParser,
+ ParserState,
+)
+
+
+class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for controller namespace parsers."""
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'controller'
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ if state.root_namespace.targets:
+ raise ControllerRequiredFirstError()
+
+ return super().parse(state)
+
+
+class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for target namespace parsers involving a single target."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target'
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'targets'
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return True
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return True
+
+
+class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
+ """Base class for controller namespace parsers involving multiple targets."""
+ @property
+ def option_name(self) -> str:
+ """The option name used for this parser."""
+ return '--target'
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return 'targets'
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return True
+
+
+class ControllerRequiredFirstError(CompletionError):
+ """Exception raised when controller and target options are specified out-of-order."""
+ def __init__(self) -> None:
+ super().__init__('The `--controller` option must be specified before `--target` option(s).')
diff --git a/test/lib/ansible_test/_internal/cli/parsers/helpers.py b/test/lib/ansible_test/_internal/cli/parsers/helpers.py
new file mode 100644
index 0000000..836a893
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/helpers.py
@@ -0,0 +1,57 @@
+"""Helper functions for composite parsers."""
+from __future__ import annotations
+
+from ...constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...completion import (
+ docker_completion,
+ remote_completion,
+ filter_completion,
+)
+
+from ...host_configs import (
+ DockerConfig,
+ HostConfig,
+ PosixRemoteConfig,
+)
+
+
+def get_docker_pythons(name: str, controller: bool, strict: bool) -> list[str]:
+ """Return a list of docker instance Python versions supported by the specified host config."""
+ image_config = filter_completion(docker_completion()).get(name)
+ available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
+
+ if not image_config:
+ return [] if strict else list(available_pythons)
+
+ supported_pythons = [python for python in image_config.supported_pythons if python in available_pythons]
+
+ return supported_pythons
+
+
+def get_remote_pythons(name: str, controller: bool, strict: bool) -> list[str]:
+ """Return a list of remote instance Python versions supported by the specified host config."""
+ platform_config = filter_completion(remote_completion()).get(name)
+ available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
+
+ if not platform_config:
+ return [] if strict else list(available_pythons)
+
+ supported_pythons = [python for python in platform_config.supported_pythons if python in available_pythons]
+
+ return supported_pythons
+
+
+def get_controller_pythons(controller_config: HostConfig, strict: bool) -> list[str]:
+ """Return a list of controller Python versions supported by the specified host config."""
+ if isinstance(controller_config, DockerConfig):
+ pythons = get_docker_pythons(controller_config.name, False, strict)
+ elif isinstance(controller_config, PosixRemoteConfig):
+ pythons = get_remote_pythons(controller_config.name, False, strict)
+ else:
+ pythons = list(SUPPORTED_PYTHON_VERSIONS)
+
+ return pythons
diff --git a/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py
new file mode 100644
index 0000000..ee6f146
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py
@@ -0,0 +1,310 @@
+"""Composite parsers for the various types of hosts."""
+from __future__ import annotations
+
+import typing as t
+
+from ...completion import (
+ docker_completion,
+ network_completion,
+ remote_completion,
+ windows_completion,
+ filter_completion,
+)
+
+from ...host_configs import (
+ ControllerConfig,
+ DockerConfig,
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+ OriginConfig,
+ PosixRemoteConfig,
+ PosixSshConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ..compat import (
+ get_fallback_remote_controller,
+)
+
+from ..argparsing.parsers import (
+ ChoicesParser,
+ DocumentationState,
+ FileParser,
+ MatchConditions,
+ NamespaceWrappedParser,
+ PairParser,
+ Parser,
+ ParserError,
+ ParserState,
+)
+
+from .value_parsers import (
+ PlatformParser,
+ SshConnectionParser,
+)
+
+from .key_value_parsers import (
+ ControllerKeyValueParser,
+ DockerKeyValueParser,
+ EmptyKeyValueParser,
+ NetworkRemoteKeyValueParser,
+ OriginKeyValueParser,
+ PosixRemoteKeyValueParser,
+ PosixSshKeyValueParser,
+ WindowsRemoteKeyValueParser,
+)
+
+from .helpers import (
+ get_docker_pythons,
+ get_remote_pythons,
+)
+
+
+class OriginParser(Parser):
+ """Composite argument parser for the origin."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = OriginConfig()
+
+ state.set_namespace(namespace)
+
+ parser = OriginKeyValueParser()
+ parser.parse(state)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return OriginKeyValueParser().document(state)
+
+
+class ControllerParser(Parser):
+ """Composite argument parser for the controller."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = ControllerConfig()
+
+ state.set_namespace(namespace)
+
+ parser = ControllerKeyValueParser()
+ parser.parse(state)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return ControllerKeyValueParser().document(state)
+
+
+class DockerParser(PairParser):
+ """Composite argument parser for a docker host."""
+ def __init__(self, controller: bool) -> None:
+ self.controller = controller
+
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return DockerConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('name', ChoicesParser(list(filter_completion(docker_completion(), controller_only=self.controller)),
+ conditions=MatchConditions.CHOICE | MatchConditions.ANY))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return DockerKeyValueParser(choice, self.controller)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value: DockerConfig = super().parse(state)
+
+ if not value.python and not get_docker_pythons(value.name, self.controller, True):
+ raise ParserError(f'Python version required for docker image: {value.name}')
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ default = 'default'
+ content = '\n'.join([f' {image} ({", ".join(get_docker_pythons(image, self.controller, False))})'
+ for image, item in filter_completion(docker_completion(), controller_only=self.controller).items()])
+
+ content += '\n'.join([
+ '',
+ ' {image} # python must be specified for custom images',
+ ])
+
+ state.sections[f'{"controller" if self.controller else "target"} docker images and supported python version (choose one):'] = content
+
+ return f'{{image}}[,{DockerKeyValueParser(default, self.controller).document(state)}]'
+
+
+class PosixRemoteParser(PairParser):
+ """Composite argument parser for a POSIX remote host."""
+ def __init__(self, controller: bool) -> None:
+ self.controller = controller
+
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return PosixRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('name', PlatformParser(list(filter_completion(remote_completion(), controller_only=self.controller))))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return PosixRemoteKeyValueParser(choice, self.controller)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value: PosixRemoteConfig = super().parse(state)
+
+ if not value.python and not get_remote_pythons(value.name, self.controller, True):
+ raise ParserError(f'Python version required for remote: {value.name}')
+
+ return value
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ default = get_fallback_remote_controller()
+ content = '\n'.join([f' {name} ({", ".join(get_remote_pythons(name, self.controller, False))})'
+ for name, item in filter_completion(remote_completion(), controller_only=self.controller).items()])
+
+ content += '\n'.join([
+ '',
+ ' {platform}/{version} # python must be specified for unknown systems',
+ ])
+
+ state.sections[f'{"controller" if self.controller else "target"} remote systems and supported python versions (choose one):'] = content
+
+ return f'{{system}}[,{PosixRemoteKeyValueParser(default, self.controller).document(state)}]'
+
+
+class WindowsRemoteParser(PairParser):
+ """Composite argument parser for a Windows remote host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return WindowsRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ names = list(filter_completion(windows_completion()))
+
+ for target in state.root_namespace.targets or []: # type: WindowsRemoteConfig
+ names.remove(target.name)
+
+ return NamespaceWrappedParser('name', PlatformParser(names))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return WindowsRemoteKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ content = '\n'.join([f' {name}' for name, item in filter_completion(windows_completion()).items()])
+
+ content += '\n'.join([
+ '',
+ ' windows/{version} # use an unknown windows version',
+ ])
+
+ state.sections['target remote systems (choose one):'] = content
+
+ return f'{{system}}[,{WindowsRemoteKeyValueParser().document(state)}]'
+
+
+class NetworkRemoteParser(PairParser):
+ """Composite argument parser for a network remote host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return NetworkRemoteConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ names = list(filter_completion(network_completion()))
+
+ for target in state.root_namespace.targets or []: # type: NetworkRemoteConfig
+ names.remove(target.name)
+
+ return NamespaceWrappedParser('name', PlatformParser(names))
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return NetworkRemoteKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ content = '\n'.join([f' {name}' for name, item in filter_completion(network_completion()).items()])
+
+ content += '\n'.join([
+ '',
+ ' {platform}/{version} # use an unknown platform and version',
+ ])
+
+ state.sections['target remote systems (choose one):'] = content
+
+ return f'{{system}}[,{NetworkRemoteKeyValueParser().document(state)}]'
+
+
+class WindowsInventoryParser(PairParser):
+ """Composite argument parser for a Windows inventory."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return WindowsInventoryConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('path', FileParser())
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return EmptyKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{path} # INI format inventory file'
+
+
+class NetworkInventoryParser(PairParser):
+ """Composite argument parser for a network inventory."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return NetworkInventoryConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return NamespaceWrappedParser('path', FileParser())
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return EmptyKeyValueParser()
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{path} # INI format inventory file'
+
+
+class PosixSshParser(PairParser):
+ """Composite argument parser for a POSIX SSH host."""
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+ return PosixSshConfig()
+
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+ return SshConnectionParser()
+
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+ return PosixSshKeyValueParser()
+
+ @property
+ def required(self) -> bool:
+ """True if the delimiter (and thus right parser) is required, otherwise False."""
+ return True
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return f'{SshConnectionParser().document(state)}[,{PosixSshKeyValueParser().document(state)}]'
diff --git a/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py
new file mode 100644
index 0000000..049b71e
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py
@@ -0,0 +1,239 @@
+"""Composite argument key-value parsers used by other parsers."""
+from __future__ import annotations
+
+import typing as t
+
+from ...constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ REMOTE_PROVIDERS,
+ SECCOMP_CHOICES,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ...completion import (
+ AuditMode,
+ CGroupVersion,
+)
+
+from ...util import (
+ REMOTE_ARCHITECTURES,
+)
+
+from ...host_configs import (
+ OriginConfig,
+)
+
+from ...become import (
+ SUPPORTED_BECOME_METHODS,
+)
+
+from ..argparsing.parsers import (
+ AnyParser,
+ BooleanParser,
+ ChoicesParser,
+ DocumentationState,
+ EnumValueChoicesParser,
+ IntegerParser,
+ KeyValueParser,
+ Parser,
+ ParserState,
+)
+
+from .value_parsers import (
+ PythonParser,
+)
+
+from .helpers import (
+ get_controller_pythons,
+ get_remote_pythons,
+ get_docker_pythons,
+)
+
+
+class OriginKeyValueParser(KeyValueParser):
+ """Composite argument parser for origin key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ versions = CONTROLLER_PYTHON_VERSIONS
+
+ return dict(
+ python=PythonParser(versions=versions, allow_venv=True, allow_default=True),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=CONTROLLER_PYTHON_VERSIONS, allow_venv=True, allow_default=True)
+
+ section_name = 'origin options'
+
+ state.sections[f'controller {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}} # default'
+
+
+class ControllerKeyValueParser(KeyValueParser):
+ """Composite argument parser for controller key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ versions = get_controller_pythons(state.root_namespace.controller, False)
+ allow_default = bool(get_controller_pythons(state.root_namespace.controller, True))
+ allow_venv = isinstance(state.root_namespace.controller, OriginConfig) or not state.root_namespace.controller
+
+ return dict(
+ python=PythonParser(versions=versions, allow_venv=allow_venv, allow_default=allow_default),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'controller options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller',
+ f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller',
+ ])
+
+ return f'{{{section_name}}} # default'
+
+
+class DockerKeyValueParser(KeyValueParser):
+ """Composite argument parser for docker key/value pairs."""
+ def __init__(self, image: str, controller: bool) -> None:
+ self.controller = controller
+ self.versions = get_docker_pythons(image, controller, False)
+ self.allow_default = bool(get_docker_pythons(image, controller, True))
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default),
+ seccomp=ChoicesParser(SECCOMP_CHOICES),
+ cgroup=EnumValueChoicesParser(CGroupVersion),
+ audit=EnumValueChoicesParser(AuditMode),
+ privileged=BooleanParser(),
+ memory=IntegerParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default)
+
+ section_name = 'docker options'
+
+ state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ f' seccomp={ChoicesParser(SECCOMP_CHOICES).document(state)}',
+ f' cgroup={EnumValueChoicesParser(CGroupVersion).document(state)}',
+ f' audit={EnumValueChoicesParser(AuditMode).document(state)}',
+ f' privileged={BooleanParser().document(state)}',
+ f' memory={IntegerParser().document(state)} # bytes',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class PosixRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for POSIX remote key/value pairs."""
+ def __init__(self, name: str, controller: bool) -> None:
+ self.controller = controller
+ self.versions = get_remote_pythons(name, controller, False)
+ self.allow_default = bool(get_remote_pythons(name, controller, True))
+
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ become=ChoicesParser(list(SUPPORTED_BECOME_METHODS)),
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default)
+
+ section_name = 'remote options'
+
+ state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([
+ f' become={ChoicesParser(list(SUPPORTED_BECOME_METHODS)).document(state)}',
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class WindowsRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for Windows remote key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'remote options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class NetworkRemoteKeyValueParser(KeyValueParser):
+ """Composite argument parser for network remote key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ provider=ChoicesParser(REMOTE_PROVIDERS),
+ arch=ChoicesParser(REMOTE_ARCHITECTURES),
+ collection=AnyParser(),
+ connection=AnyParser(),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ section_name = 'remote options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}',
+ f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}',
+ ' collection={collection}',
+ ' connection={connection}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class PosixSshKeyValueParser(KeyValueParser):
+ """Composite argument parser for POSIX SSH host key/value pairs."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return dict(
+ python=PythonParser(versions=list(SUPPORTED_PYTHON_VERSIONS), allow_venv=False, allow_default=False),
+ )
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ python_parser = PythonParser(versions=SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=False)
+
+ section_name = 'ssh options'
+
+ state.sections[f'target {section_name} (comma separated):'] = '\n'.join([
+ f' python={python_parser.document(state)}',
+ ])
+
+ return f'{{{section_name}}}'
+
+
+class EmptyKeyValueParser(KeyValueParser):
+ """Composite argument parser when a key/value parser is required but there are no keys available."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+ return {}
diff --git a/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py
new file mode 100644
index 0000000..9453b76
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py
@@ -0,0 +1,179 @@
+"""Composite argument value parsers used by other parsers."""
+from __future__ import annotations
+
+import collections.abc as c
+import typing as t
+
+from ...host_configs import (
+ NativePythonConfig,
+ PythonConfig,
+ VirtualPythonConfig,
+)
+
+from ..argparsing.parsers import (
+ AbsolutePathParser,
+ AnyParser,
+ ChoicesParser,
+ DocumentationState,
+ IntegerParser,
+ MatchConditions,
+ Parser,
+ ParserError,
+ ParserState,
+ ParserBoundary,
+)
+
+
+class PythonParser(Parser):
+ """
+ Composite argument parser for Python versions, with support for specifying paths and using virtual environments.
+
+ Allowed formats:
+
+ {version}
+ venv/{version}
+ venv/system-site-packages/{version}
+
+ The `{version}` has two possible formats:
+
+ X.Y
+ X.Y@{path}
+
+ Where `X.Y` is the Python major and minor version number and `{path}` is an absolute path with one of the following formats:
+
+ /path/to/python
+ /path/to/python/directory/
+
+ When a trailing slash is present, it is considered a directory, and `python{version}` will be appended to it automatically.
+
+ The default path depends on the context:
+
+ - Known docker/remote environments can declare their own path.
+ - The origin host uses `sys.executable` if `{version}` matches the current version in `sys.version_info`.
+ - The origin host (as a controller or target) use the `$PATH` environment variable to find `python{version}`.
+ - As a fallback/default, the path `/usr/bin/python{version}` is used.
+
+ NOTE: The Python path determines where to find the Python interpreter.
+ In the case of an ansible-test managed virtual environment, that Python interpreter will be used to create the virtual environment.
+ So the path given will not be the one actually used for the controller or target.
+
+ Known docker/remote environments limit the available Python versions to configured values known to be valid.
+ The origin host and unknown environments assume all relevant Python versions are available.
+ """
+ def __init__(self,
+ versions: c.Sequence[str],
+ *,
+ allow_default: bool,
+ allow_venv: bool,
+ ):
+ version_choices = list(versions)
+
+ if allow_default:
+ version_choices.append('default')
+
+ first_choices = list(version_choices)
+
+ if allow_venv:
+ first_choices.append('venv/')
+
+ venv_choices = list(version_choices) + ['system-site-packages/']
+
+ self.versions = versions
+ self.allow_default = allow_default
+ self.allow_venv = allow_venv
+ self.version_choices = version_choices
+ self.first_choices = first_choices
+ self.venv_choices = venv_choices
+ self.venv_choices = venv_choices
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ boundary: ParserBoundary
+
+ with state.delimit('@/', required=False) as boundary:
+ version = ChoicesParser(self.first_choices).parse(state)
+
+ python: PythonConfig
+
+ if version == 'venv':
+ with state.delimit('@/', required=False) as boundary:
+ version = ChoicesParser(self.venv_choices).parse(state)
+
+ if version == 'system-site-packages':
+ system_site_packages = True
+
+ with state.delimit('@', required=False) as boundary:
+ version = ChoicesParser(self.version_choices).parse(state)
+ else:
+ system_site_packages = False
+
+ python = VirtualPythonConfig(version=version, system_site_packages=system_site_packages)
+ else:
+ python = NativePythonConfig(version=version)
+
+ if boundary.match == '@':
+ # FUTURE: For OriginConfig or ControllerConfig->OriginConfig the path could be validated with an absolute path parser (file or directory).
+ python.path = AbsolutePathParser().parse(state)
+
+ return python
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+
+ docs = '[venv/[system-site-packages/]]' if self.allow_venv else ''
+
+ if self.versions:
+ docs += '|'.join(self.version_choices)
+ else:
+ docs += '{X.Y}'
+
+ docs += '[@{path|dir/}]'
+
+ return docs
+
+
+class PlatformParser(ChoicesParser):
+ """Composite argument parser for "{platform}/{version}" formatted choices."""
+ def __init__(self, choices: list[str]) -> None:
+ super().__init__(choices, conditions=MatchConditions.CHOICE | MatchConditions.ANY)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+
+ if len(value.split('/')) != 2:
+ raise ParserError(f'invalid platform format: {value}')
+
+ return value
+
+
+class SshConnectionParser(Parser):
+ """
+ Composite argument parser for connecting to a host using SSH.
+ Format: user@host[:port]
+ """
+ EXPECTED_FORMAT = '{user}@{host}[:{port}]'
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+
+ with state.delimit('@'):
+ user = AnyParser(no_match_message=f'Expected {{user}} from: {self.EXPECTED_FORMAT}').parse(state)
+
+ setattr(namespace, 'user', user)
+
+ with state.delimit(':', required=False) as colon: # type: ParserBoundary
+ host = AnyParser(no_match_message=f'Expected {{host}} from: {self.EXPECTED_FORMAT}').parse(state)
+
+ setattr(namespace, 'host', host)
+
+ if colon.match:
+ port = IntegerParser(65535).parse(state)
+ setattr(namespace, 'port', port)
+
+ return namespace
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return self.EXPECTED_FORMAT