diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/cli')
39 files changed, 5084 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/__init__.py b/test/lib/ansible_test/_internal/cli/__init__.py new file mode 100644 index 0000000..3171639 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/__init__.py @@ -0,0 +1,63 @@ +"""Command line parsing.""" +from __future__ import annotations + +import argparse +import os +import sys +import typing as t + +from .argparsing import ( + CompositeActionCompletionFinder, +) + +from .commands import ( + do_commands, +) + +from .epilog import ( + get_epilog, +) + +from .compat import ( + HostSettings, + convert_legacy_args, +) + +from ..util import ( + get_ansible_version, +) + + +def parse_args(argv: t.Optional[list[str]] = None) -> argparse.Namespace: + """Parse command line arguments.""" + completer = CompositeActionCompletionFinder() + + parser = argparse.ArgumentParser(prog='ansible-test', epilog=get_epilog(completer), formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--version', action='version', version=f'%(prog)s version {get_ansible_version()}') + + do_commands(parser, completer) + + completer( + parser, + always_complete_options=False, + ) + + if argv is None: + argv = sys.argv[1:] + else: + argv = argv[1:] + + args = parser.parse_args(argv) + + if args.explain and not args.verbosity: + args.verbosity = 1 + + if args.no_environment: + pass + elif args.host_path: + args.host_settings = HostSettings.deserialize(os.path.join(args.host_path, 'settings.dat')) + else: + args.host_settings = convert_legacy_args(argv, args, args.target_mode) + args.host_settings.apply_defaults() + + return args diff --git a/test/lib/ansible_test/_internal/cli/actions.py b/test/lib/ansible_test/_internal/cli/actions.py new file mode 100644 index 0000000..3359a84 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/actions.py @@ -0,0 +1,90 @@ +"""Actions for handling composite arguments with argparse.""" +from __future__ import annotations + +from .argparsing import ( + CompositeAction, + NamespaceParser, +) + +from .parsers import ( + DelegatedControllerParser, + NetworkSshTargetParser, + NetworkTargetParser, + OriginControllerParser, + PosixSshTargetParser, + PosixTargetParser, + SanityPythonTargetParser, + UnitsPythonTargetParser, + WindowsSshTargetParser, + WindowsTargetParser, +) + + +class OriginControllerAction(CompositeAction): + """Composite action parser for the controller when the only option is `origin`.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return OriginControllerParser() + + +class DelegatedControllerAction(CompositeAction): + """Composite action parser for the controller when delegation is supported.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return DelegatedControllerParser() + + +class PosixTargetAction(CompositeAction): + """Composite action parser for a POSIX target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return PosixTargetParser() + + +class WindowsTargetAction(CompositeAction): + """Composite action parser for a Windows target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return WindowsTargetParser() + + +class NetworkTargetAction(CompositeAction): + """Composite action parser for a network target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return NetworkTargetParser() + + +class SanityPythonTargetAction(CompositeAction): + """Composite action parser for a sanity target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return SanityPythonTargetParser() + + +class UnitsPythonTargetAction(CompositeAction): + """Composite action parser for a units target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return UnitsPythonTargetParser() + + +class PosixSshTargetAction(CompositeAction): + """Composite action parser for a POSIX SSH target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return PosixSshTargetParser() + + +class WindowsSshTargetAction(CompositeAction): + """Composite action parser for a Windows SSH target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return WindowsSshTargetParser() + + +class NetworkSshTargetAction(CompositeAction): + """Composite action parser for a network SSH target.""" + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + return NetworkSshTargetParser() diff --git a/test/lib/ansible_test/_internal/cli/argparsing/__init__.py b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py new file mode 100644 index 0000000..540cf55 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py @@ -0,0 +1,265 @@ +"""Completion finder which brings together custom options and completion logic.""" +from __future__ import annotations + +import abc +import argparse +import os +import re +import typing as t + +from .argcompletion import ( + OptionCompletionFinder, + get_comp_type, + register_safe_action, + warn, +) + +from .parsers import ( + Completion, + CompletionError, + CompletionSuccess, + CompletionUnavailable, + DocumentationState, + NamespaceParser, + Parser, + ParserError, + ParserMode, + ParserState, +) + + +class RegisteredCompletionFinder(OptionCompletionFinder): + """ + Custom option completion finder for argcomplete which allows completion results to be registered. + These registered completions, if provided, are used to filter the final completion results. + This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221 + """ + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.registered_completions: t.Optional[list[str]] = None + + def completer( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + **kwargs, + ) -> list[str]: + """ + Return a list of completions for the specified prefix and action. + Use this as the completer function for argcomplete. + """ + kwargs.clear() + del kwargs + + completions = self.get_completions(prefix, action, parsed_args) + + if action.nargs and not isinstance(action.nargs, int): + # prevent argcomplete from including unrelated arguments in the completion results + self.registered_completions = completions + + return completions + + @abc.abstractmethod + def get_completions( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + ) -> list[str]: + """ + Return a list of completions for the specified prefix and action. + Called by the complete function. + """ + + def quote_completions(self, completions, cword_prequote, last_wordbreak_pos): + """Modify completion results before returning them.""" + if self.registered_completions is not None: + # If one of the completion handlers registered their results, only allow those exact results to be returned. + # This prevents argcomplete from adding results from other completers when they are known to be invalid. + allowed_completions = set(self.registered_completions) + completions = [completion for completion in completions if completion in allowed_completions] + + return super().quote_completions(completions, cword_prequote, last_wordbreak_pos) + + +class CompositeAction(argparse.Action, metaclass=abc.ABCMeta): + """Base class for actions that parse composite arguments.""" + documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {} + + def __init__( + self, + *args, + **kwargs, + ): + self.definition = self.create_parser() + self.documentation_state[type(self)] = documentation_state = DocumentationState() + self.definition.document(documentation_state) + + kwargs.update(dest=self.definition.dest) + + super().__init__(*args, **kwargs) + + register_safe_action(type(self)) + + @abc.abstractmethod + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + + def __call__( + self, + parser, + namespace, + values, + option_string=None, + ): + state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values) + + try: + self.definition.parse(state) + except ParserError as ex: + error = str(ex) + except CompletionError as ex: + error = ex.message + else: + return + + if get_comp_type(): + # FUTURE: It may be possible to enhance error handling by surfacing this error message during downstream completion. + return # ignore parse errors during completion to avoid breaking downstream completion + + raise argparse.ArgumentError(self, error) + + +class CompositeActionCompletionFinder(RegisteredCompletionFinder): + """Completion finder with support for composite argument parsing.""" + def get_completions( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + ) -> list[str]: + """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed.""" + assert isinstance(action, CompositeAction) + + state = ParserState( + mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE, + remainder=prefix, + namespaces=[parsed_args], + ) + + answer = complete(action.definition, state) + + completions = [] + + if isinstance(answer, CompletionSuccess): + self.disable_completion_mangling = answer.preserve + completions = answer.completions + + if isinstance(answer, CompletionError): + warn(answer.message) + + return completions + + +def detect_file_listing(value: str, mode: ParserMode) -> bool: + """ + Return True if Bash will show a file listing and redraw the prompt, otherwise return False. + + If there are no list results, a file listing will be shown if the value after the last `=` or `:` character: + + - is empty + - matches a full path + - matches a partial path + + Otherwise Bash will play the bell sound and display nothing. + + see: https://github.com/kislyuk/argcomplete/issues/328 + see: https://github.com/kislyuk/argcomplete/pull/284 + """ + listing = False + + if mode == ParserMode.LIST: + right = re.split('[=:]', value)[-1] + listing = not right or os.path.exists(right) + + if not listing: + directory = os.path.dirname(right) + + # noinspection PyBroadException + try: + filenames = os.listdir(directory or '.') + except Exception: # pylint: disable=broad-except + pass + else: + listing = any(filename.startswith(right) for filename in filenames) + + return listing + + +def detect_false_file_completion(value: str, mode: ParserMode) -> bool: + """ + Return True if Bash will provide an incorrect file completion, otherwise return False. + + If there are no completion results, a filename will be automatically completed if the value after the last `=` or `:` character: + + - matches exactly one partial path + + Otherwise Bash will play the bell sound and display nothing. + + see: https://github.com/kislyuk/argcomplete/issues/328 + see: https://github.com/kislyuk/argcomplete/pull/284 + """ + completion = False + + if mode == ParserMode.COMPLETE: + completion = True + + right = re.split('[=:]', value)[-1] + directory, prefix = os.path.split(right) + + # noinspection PyBroadException + try: + filenames = os.listdir(directory or '.') + except Exception: # pylint: disable=broad-except + pass + else: + matches = [filename for filename in filenames if filename.startswith(prefix)] + completion = len(matches) == 1 + + return completion + + +def complete( + completer: Parser, + state: ParserState, +) -> Completion: + """Perform argument completion using the given completer and return the completion result.""" + value = state.remainder + + answer: Completion + + try: + completer.parse(state) + raise ParserError('completion expected') + except CompletionUnavailable as ex: + if detect_file_listing(value, state.mode): + # Displaying a warning before the file listing informs the user it is invalid. Bash will redraw the prompt after the list. + # If the file listing is not shown, a warning could be helpful, but would introduce noise on the terminal since the prompt is not redrawn. + answer = CompletionError(ex.message) + elif detect_false_file_completion(value, state.mode): + # When the current prefix provides no matches, but matches files a single file on disk, Bash will perform an incorrect completion. + # Returning multiple invalid matches instead of no matches will prevent Bash from using its own completion logic in this case. + answer = CompletionSuccess( + list_mode=True, # abuse list mode to enable preservation of the literal results + consumed='', + continuation='', + matches=['completion', 'invalid'] + ) + else: + answer = ex + except Completion as ex: + answer = ex + + return answer diff --git a/test/lib/ansible_test/_internal/cli/argparsing/actions.py b/test/lib/ansible_test/_internal/cli/argparsing/actions.py new file mode 100644 index 0000000..2bcf982 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/actions.py @@ -0,0 +1,18 @@ +"""Actions for argparse.""" +from __future__ import annotations + +import argparse +import enum +import typing as t + + +class EnumAction(argparse.Action): + """Parse an enum using the lowercase enum names.""" + def __init__(self, **kwargs: t.Any) -> None: + self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None) + kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type)) + super().__init__(**kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + value = self.enum_type[values.upper()] + setattr(namespace, self.dest, value) diff --git a/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py new file mode 100644 index 0000000..cf5776d --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py @@ -0,0 +1,124 @@ +"""Wrapper around argcomplete providing bug fixes and additional features.""" +from __future__ import annotations + +import argparse +import enum +import os +import typing as t + + +class Substitute: + """Substitute for missing class which accepts all arguments.""" + def __init__(self, *args, **kwargs) -> None: + pass + + +try: + import argcomplete + + from argcomplete import ( + CompletionFinder, + default_validator, + ) + + warn = argcomplete.warn # pylint: disable=invalid-name +except ImportError: + argcomplete = None + + CompletionFinder = Substitute + default_validator = Substitute # pylint: disable=invalid-name + warn = Substitute # pylint: disable=invalid-name + + +class CompType(enum.Enum): + """ + Bash COMP_TYPE argument completion types. + For documentation, see: https://www.gnu.org/software/bash/manual/html_node/Bash-Variables.html#index-COMP_005fTYPE + """ + COMPLETION = '\t' + """ + Standard completion, typically triggered by a single tab. + """ + MENU_COMPLETION = '%' + """ + Menu completion, which cycles through each completion instead of showing a list. + For help using this feature, see: https://stackoverflow.com/questions/12044574/getting-complete-and-menu-complete-to-work-together + """ + LIST = '?' + """ + Standard list, typically triggered by a double tab. + """ + LIST_AMBIGUOUS = '!' + """ + Listing with `show-all-if-ambiguous` set. + For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dambiguous + For additional details, see: https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type + """ + LIST_UNMODIFIED = '@' + """ + Listing with `show-all-if-unmodified` set. + For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dunmodified + For additional details, see: : https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type + """ + + @property + def list_mode(self) -> bool: + """True if completion is running in list mode, otherwise False.""" + return self in (CompType.LIST, CompType.LIST_AMBIGUOUS, CompType.LIST_UNMODIFIED) + + +def register_safe_action(action_type: t.Type[argparse.Action]) -> None: + """Register the given action as a safe action for argcomplete to use during completion if it is not already registered.""" + if argcomplete and action_type not in argcomplete.safe_actions: + argcomplete.safe_actions += (action_type,) + + +def get_comp_type() -> t.Optional[CompType]: + """Parse the COMP_TYPE environment variable (if present) and return the associated CompType enum value.""" + value = os.environ.get('COMP_TYPE') + comp_type = CompType(chr(int(value))) if value else None + return comp_type + + +class OptionCompletionFinder(CompletionFinder): + """ + Custom completion finder for argcomplete. + It provides support for running completion in list mode, which argcomplete natively handles the same as standard completion. + """ + enabled = bool(argcomplete) + + def __init__(self, *args, validator=None, **kwargs) -> None: + if validator: + raise ValueError() + + self.comp_type = get_comp_type() + self.list_mode = self.comp_type.list_mode if self.comp_type else False + self.disable_completion_mangling = False + + finder = self + + def custom_validator(completion, prefix): + """Completion validator used to optionally bypass validation.""" + if finder.disable_completion_mangling: + return True + + return default_validator(completion, prefix) + + super().__init__( + *args, + validator=custom_validator, + **kwargs, + ) + + def __call__(self, *args, **kwargs): + if self.enabled: + super().__call__(*args, **kwargs) + + def quote_completions(self, completions, cword_prequote, last_wordbreak_pos): + """Intercept default quoting behavior to optionally block mangling of completion entries.""" + if self.disable_completion_mangling: + # Word breaks have already been handled when generating completions, don't mangle them further. + # This is needed in many cases when returning completion lists which lack the existing completion prefix. + last_wordbreak_pos = None + + return super().quote_completions(completions, cword_prequote, last_wordbreak_pos) diff --git a/test/lib/ansible_test/_internal/cli/argparsing/parsers.py b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py new file mode 100644 index 0000000..d07e03c --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py @@ -0,0 +1,597 @@ +"""General purpose composite argument parsing and completion.""" +from __future__ import annotations + +import abc +import collections.abc as c +import contextlib +import dataclasses +import enum +import os +import re +import typing as t + +# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior. +# +# Recommended characters for assignment and/or continuation: `/` `:` `=` +# +# The recommended assignment_character list is due to how argcomplete handles continuation characters. +# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558 + +PAIR_DELIMITER = ',' +ASSIGNMENT_DELIMITER = '=' +PATH_DELIMITER = '/' + + +# This class was originally frozen. However, that causes issues when running under Python 3.11. +# See: https://github.com/python/cpython/issues/99856 +@dataclasses.dataclass +class Completion(Exception): + """Base class for argument completion results.""" + + +@dataclasses.dataclass +class CompletionUnavailable(Completion): + """Argument completion unavailable.""" + message: str = 'No completions available.' + + +@dataclasses.dataclass +class CompletionError(Completion): + """Argument completion error.""" + message: t.Optional[str] = None + + +@dataclasses.dataclass +class CompletionSuccess(Completion): + """Successful argument completion result.""" + list_mode: bool + consumed: str + continuation: str + matches: list[str] = dataclasses.field(default_factory=list) + + @property + def preserve(self) -> bool: + """ + True if argcomplete should not mangle completion values, otherwise False. + Only used when more than one completion exists to avoid overwriting the word undergoing completion. + """ + return len(self.matches) > 1 and self.list_mode + + @property + def completions(self) -> list[str]: + """List of completion values to return to argcomplete.""" + completions = self.matches + continuation = '' if self.list_mode else self.continuation + + if not self.preserve: + # include the existing prefix to avoid rewriting the word undergoing completion + completions = [f'{self.consumed}{completion}{continuation}' for completion in completions] + + return completions + + +class ParserMode(enum.Enum): + """Mode the parser is operating in.""" + PARSE = enum.auto() + COMPLETE = enum.auto() + LIST = enum.auto() + + +class ParserError(Exception): + """Base class for all parsing exceptions.""" + + +@dataclasses.dataclass +class ParserBoundary: + """Boundary details for parsing composite input.""" + delimiters: str + required: bool + match: t.Optional[str] = None + ready: bool = True + + +@dataclasses.dataclass +class ParserState: + """State of the composite argument parser.""" + mode: ParserMode + remainder: str = '' + consumed: str = '' + boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list) + namespaces: list[t.Any] = dataclasses.field(default_factory=list) + parts: list[str] = dataclasses.field(default_factory=list) + + @property + def incomplete(self) -> bool: + """True if parsing is incomplete (unparsed input remains), otherwise False.""" + return self.remainder is not None + + def match(self, value: str, choices: list[str]) -> bool: + """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False.""" + if self.current_boundary: + delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match + else: + delimiters, delimiter = '', None + + for choice in choices: + if choice.rstrip(delimiters) == choice: + # choice is not delimited + if value == choice: + return True # value matched + else: + # choice is delimited + if f'{value}{delimiter}' == choice: + return True # value and delimiter matched + + return False + + def read(self) -> str: + """Read and return the next input segment, taking into account parsing boundaries.""" + delimiters = "".join(boundary.delimiters for boundary in self.boundaries) + + if delimiters: + pattern = '([' + re.escape(delimiters) + '])' + regex = re.compile(pattern) + parts = regex.split(self.remainder, 1) + else: + parts = [self.remainder] + + if len(parts) > 1: + value, delimiter, remainder = parts + else: + value, delimiter, remainder = parts[0], None, None + + for boundary in reversed(self.boundaries): + if delimiter and delimiter in boundary.delimiters: + boundary.match = delimiter + self.consumed += value + delimiter + break + + boundary.match = None + boundary.ready = False + + if boundary.required: + break + + self.remainder = remainder + + return value + + @property + def root_namespace(self) -> t.Any: + """THe root namespace.""" + return self.namespaces[0] + + @property + def current_namespace(self) -> t.Any: + """The current namespace.""" + return self.namespaces[-1] + + @property + def current_boundary(self) -> t.Optional[ParserBoundary]: + """The current parser boundary, if any, otherwise None.""" + return self.boundaries[-1] if self.boundaries else None + + def set_namespace(self, namespace: t.Any) -> None: + """Set the current namespace.""" + self.namespaces.append(namespace) + + @contextlib.contextmanager + def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]: + """Context manager for delimiting parsing of input.""" + boundary = ParserBoundary(delimiters=delimiters, required=required) + + self.boundaries.append(boundary) + + try: + yield boundary + finally: + self.boundaries.pop() + + if boundary.required and not boundary.match: + raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead') + + +@dataclasses.dataclass +class DocumentationState: + """State of the composite argument parser's generated documentation.""" + sections: dict[str, str] = dataclasses.field(default_factory=dict) + + +class Parser(metaclass=abc.ABCMeta): + """Base class for all composite argument parsers.""" + @abc.abstractmethod + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + raise Exception(f'Undocumented parser: {type(self)}') + + +class MatchConditions(enum.Flag): + """Acceptable condition(s) for matching user input to available choices.""" + CHOICE = enum.auto() + """Match any choice.""" + ANY = enum.auto() + """Match any non-empty string.""" + NOTHING = enum.auto() + """Match an empty string which is not followed by a boundary match.""" + + +class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which use a list of choices that can be generated during completion.""" + def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.conditions = conditions + + @abc.abstractmethod + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + + def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + return CompletionUnavailable() + + def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument + """Return an instance of ParserError when parsing fails and no choices are available.""" + return ParserError('No choices available.') + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = state.read() + choices = self.get_choices(value) + + if state.mode == ParserMode.PARSE or state.incomplete: + if self.conditions & MatchConditions.CHOICE and state.match(value, choices): + return value + + if self.conditions & MatchConditions.ANY and value: + return value + + if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match: + return value + + if state.mode == ParserMode.PARSE: + if choices: + raise ParserError(f'"{value}" not in: {", ".join(choices)}') + + raise self.no_choices_available(value) + + raise CompletionUnavailable() + + matches = [choice for choice in choices if choice.startswith(value)] + + if not matches: + raise self.no_completion_match(value) + + continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else '' + + raise CompletionSuccess( + list_mode=state.mode == ParserMode.LIST, + consumed=state.consumed, + continuation=continuation, + matches=matches, + ) + + +class ChoicesParser(DynamicChoicesParser): + """Composite argument parser which relies on a static list of choices.""" + def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.choices = choices + + super().__init__(conditions=conditions) + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + return self.choices + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '|'.join(self.choices) + + +class EnumValueChoicesParser(ChoicesParser): + """Composite argument parser which relies on a static list of choices derived from the values of an enum.""" + def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.enum_type = enum_type + + super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return self.enum_type(value) + + +class IntegerParser(DynamicChoicesParser): + """Composite argument parser for integers.""" + PATTERN = re.compile('^[1-9][0-9]*$') + + def __init__(self, maximum: t.Optional[int] = None) -> None: + self.maximum = maximum + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + if not value: + numbers = list(range(1, 10)) + elif self.PATTERN.search(value): + int_prefix = int(value) + base = int_prefix * 10 + numbers = [int_prefix] + [base + i for i in range(0, 10)] + else: + numbers = [] + + # NOTE: the minimum is currently fixed at 1 + + if self.maximum is not None: + numbers = [n for n in numbers if n <= self.maximum] + + return [str(n) for n in numbers] + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return int(value) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{integer}' + + +class BooleanParser(ChoicesParser): + """Composite argument parser for boolean (yes/no) values.""" + def __init__(self) -> None: + super().__init__(['yes', 'no']) + + def parse(self, state: ParserState) -> bool: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return value == 'yes' + + +class AnyParser(ChoicesParser): + """Composite argument parser which accepts any input value.""" + def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None: + self.no_match_message = no_match_message + + conditions = MatchConditions.ANY + + if nothing: + conditions |= MatchConditions.NOTHING + + super().__init__([], conditions=conditions) + + def no_completion_match(self, value: str) -> CompletionUnavailable: + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + if self.no_match_message: + return CompletionUnavailable(message=self.no_match_message) + + return super().no_completion_match(value) + + def no_choices_available(self, value: str) -> ParserError: + """Return an instance of ParserError when parsing fails and no choices are available.""" + if self.no_match_message: + return ParserError(self.no_match_message) + + return super().no_choices_available(value) + + +class RelativePathNameParser(DynamicChoicesParser): + """Composite argument parser for relative path names.""" + RELATIVE_NAMES = ['.', '..'] + + def __init__(self, choices: list[str]) -> None: + self.choices = choices + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + choices = list(self.choices) + + if value in self.RELATIVE_NAMES: + # complete relative names, but avoid suggesting them unless the current name is relative + # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../") + choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES) + + return choices + + +class FileParser(Parser): + """Composite argument parser for absolute or relative file paths.""" + def parse(self, state: ParserState) -> str: + """Parse the input from the given state and return the result.""" + if state.mode == ParserMode.PARSE: + path = AnyParser().parse(state) + + if not os.path.isfile(path): + raise ParserError(f'Not a file: {path}') + else: + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + directory = path or '.' + + try: + with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry] + choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan] + except OSError: + choices = [] + + if not path: + choices.append(PATH_DELIMITER) # allow absolute paths + choices.append('../') # suggest relative paths + + part = RelativePathNameParser(choices).parse(state) + path += f'{part}{boundary.match or ""}' + + return path + + +class AbsolutePathParser(Parser): + """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + if path: + path += AnyParser(nothing=True).parse(state) + else: + path += ChoicesParser([PATH_DELIMITER]).parse(state) + + path += (boundary.match or '') + + return path + + +class NamespaceParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers that store their results in a namespace.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + current = getattr(namespace, self.dest) + + if current and self.limit_one: + if state.mode == ParserMode.PARSE: + raise ParserError('Option cannot be specified more than once.') + + raise CompletionError('Option cannot be specified more than once.') + + value = self.get_value(state) + + if self.use_list: + if not current: + current = [] + setattr(namespace, self.dest, current) + + current.append(value) + else: + setattr(namespace, self.dest, value) + + return value + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return super().parse(state) + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return not self.use_list + + @property + @abc.abstractmethod + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + + +class NamespaceWrappedParser(NamespaceParser): + """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace.""" + def __init__(self, dest: str, parser: Parser) -> None: + self._dest = dest + self.parser = parser + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return self.parser.parse(state) + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return self._dest + + +class KeyValueParser(Parser, metaclass=abc.ABCMeta): + """Base class for key/value composite argument parsers.""" + @abc.abstractmethod + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + parsers = self.get_parsers(state) + keys = list(parsers) + + with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary + while pair.ready: + with state.delimit(ASSIGNMENT_DELIMITER): + key = ChoicesParser(keys).parse(state) + + value = parsers[key].parse(state) + + setattr(namespace, key, value) + + keys.remove(key) + + return namespace + + +class PairParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = self.create_namespace() + + state.set_namespace(namespace) + + with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary + choice = self.get_left_parser(state).parse(state) + + if boundary.match: + self.get_right_parser(choice).parse(state) + + return namespace + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return False + + @property + def delimiter(self) -> str: + """The delimiter to use between the left and right parser.""" + return PAIR_DELIMITER + + @abc.abstractmethod + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + + @abc.abstractmethod + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + + @abc.abstractmethod + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + + +class TypeParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument + """Return a dictionary of type names and type parsers.""" + return self.get_stateless_parsers() + + @abc.abstractmethod + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + parsers = self.get_parsers(state) + + with state.delimit(':'): + key = ChoicesParser(list(parsers)).parse(state) + + value = parsers[key].parse(state) + + return value diff --git a/test/lib/ansible_test/_internal/cli/commands/__init__.py b/test/lib/ansible_test/_internal/cli/commands/__init__.py new file mode 100644 index 0000000..2eb14ab --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/__init__.py @@ -0,0 +1,241 @@ +"""Command line parsing for all commands.""" +from __future__ import annotations + +import argparse +import functools +import sys + +from ...util import ( + display, +) + +from ..completers import ( + complete_target, + register_completer, +) + +from ..environments import ( + CompositeActionCompletionFinder, +) + +from .coverage import ( + do_coverage, +) + +from .env import ( + do_env, +) + +from .integration import ( + do_integration, +) + +from .sanity import ( + do_sanity, +) + +from .shell import ( + do_shell, +) + +from .units import ( + do_units, +) + + +def do_commands( + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for all commands.""" + common = argparse.ArgumentParser(add_help=False) + + common.add_argument( + '-e', + '--explain', + action='store_true', + help='explain commands that would be executed', + ) + + common.add_argument( + '-v', + '--verbose', + dest='verbosity', + action='count', + default=0, + help='display more output', + ) + + common.add_argument( + '--color', + metavar='COLOR', + nargs='?', + help='generate color output: yes, no, auto', + const='yes', + default='auto', + type=color, + ) + + common.add_argument( + '--debug', + action='store_true', + help='run ansible commands in debug mode', + ) + + common.add_argument( + '--truncate', + dest='truncate', + metavar='COLUMNS', + type=int, + default=display.columns, + help='truncate some long output (0=disabled) (default: auto)', + ) + + common.add_argument( + '--redact', + dest='redact', + action='store_true', + default=True, + help=argparse.SUPPRESS, # kept for backwards compatibility, but no point in advertising since it's the default + ) + + common.add_argument( + '--no-redact', + dest='redact', + action='store_false', + default=False, + help='show sensitive values in output', + ) + + test = argparse.ArgumentParser(add_help=False, parents=[common]) + + testing = test.add_argument_group(title='common testing arguments') + + register_completer(testing.add_argument( + 'include', + metavar='TARGET', + nargs='*', + help='test the specified target', + ), functools.partial(complete_target, completer)) + + register_completer(testing.add_argument( + '--include', + metavar='TARGET', + action='append', + help='include the specified target', + ), functools.partial(complete_target, completer)) + + register_completer(testing.add_argument( + '--exclude', + metavar='TARGET', + action='append', + help='exclude the specified target', + ), functools.partial(complete_target, completer)) + + register_completer(testing.add_argument( + '--require', + metavar='TARGET', + action='append', + help='require the specified target', + ), functools.partial(complete_target, completer)) + + testing.add_argument( + '--coverage', + action='store_true', + help='analyze code coverage when running tests', + ) + + testing.add_argument( + '--coverage-check', + action='store_true', + help='only verify code coverage can be enabled', + ) + + testing.add_argument( + '--metadata', + help=argparse.SUPPRESS, + ) + + testing.add_argument( + '--base-branch', + metavar='BRANCH', + help='base branch used for change detection', + ) + + testing.add_argument( + '--changed', + action='store_true', + help='limit targets based on changes', + ) + + changes = test.add_argument_group(title='change detection arguments') + + changes.add_argument( + '--tracked', + action='store_true', + help=argparse.SUPPRESS, + ) + + changes.add_argument( + '--untracked', + action='store_true', + help='include untracked files', + ) + + changes.add_argument( + '--ignore-committed', + dest='committed', + action='store_false', + help='exclude committed files', + ) + + changes.add_argument( + '--ignore-staged', + dest='staged', + action='store_false', + help='exclude staged files', + ) + + changes.add_argument( + '--ignore-unstaged', + dest='unstaged', + action='store_false', + help='exclude unstaged files', + ) + + changes.add_argument( + '--changed-from', + metavar='PATH', + help=argparse.SUPPRESS, + ) + + changes.add_argument( + '--changed-path', + metavar='PATH', + action='append', + help=argparse.SUPPRESS, + ) + + subparsers = parent.add_subparsers(metavar='COMMAND', required=True) + + do_coverage(subparsers, common, completer) + do_env(subparsers, common, completer) + do_shell(subparsers, common, completer) + + do_integration(subparsers, test, completer) + do_sanity(subparsers, test, completer) + do_units(subparsers, test, completer) + + +def color(value: str) -> bool: + """Strict converter for color option.""" + if value == 'yes': + return True + + if value == 'no': + return False + + if value == 'auto': + return sys.stdout.isatty() + + raise argparse.ArgumentTypeError(f"invalid choice: '{value}' (choose from 'yes', 'no', 'auto')") diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py new file mode 100644 index 0000000..28e6770 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/__init__.py @@ -0,0 +1,85 @@ +"""Command line parsing for all `coverage` commands.""" +from __future__ import annotations + +import argparse + +from ....commands.coverage import ( + COVERAGE_GROUPS, +) + +from ...environments import ( + CompositeActionCompletionFinder, +) + +from .analyze import ( + do_analyze, +) + +from .combine import ( + do_combine, +) + +from .erase import ( + do_erase, +) + +from .html import ( + do_html, +) + +from .report import ( + do_report, +) + +from .xml import ( + do_xml, +) + + +def do_coverage( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for all `coverage` commands.""" + coverage_common = argparse.ArgumentParser(add_help=False, parents=[parent]) + + parser = subparsers.add_parser( + 'coverage', + help='code coverage management and reporting', + ) + + coverage_subparsers = parser.add_subparsers(metavar='COMMAND', required=True) + + do_analyze(coverage_subparsers, coverage_common, completer) + do_erase(coverage_subparsers, coverage_common, completer) + + do_combine(coverage_subparsers, parent, add_coverage_common, completer) + do_report(coverage_subparsers, parent, add_coverage_common, completer) + do_html(coverage_subparsers, parent, add_coverage_common, completer) + do_xml(coverage_subparsers, parent, add_coverage_common, completer) + + +def add_coverage_common( + parser: argparse.ArgumentParser, +): + """Add common coverage arguments.""" + parser.add_argument( + '--group-by', + metavar='GROUP', + action='append', + choices=COVERAGE_GROUPS, + help='group output by: %s' % ', '.join(COVERAGE_GROUPS), + ) + + parser.add_argument( + '--all', + action='store_true', + help='include all python/powershell source files', + ) + + parser.add_argument( + '--stub', + action='store_true', + help='generate empty report of all python/powershell source files', + ) diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py new file mode 100644 index 0000000..05fbd23 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/__init__.py @@ -0,0 +1,28 @@ +"""Command line parsing for all `coverage analyze` commands.""" +from __future__ import annotations + +import argparse + +from .targets import ( + do_targets, +) + +from ....environments import ( + CompositeActionCompletionFinder, +) + + +def do_analyze( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for all `coverage analyze` commands.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'analyze', + help='analyze collected coverage data', + ) + + analyze_subparsers = parser.add_subparsers(metavar='COMMAND', required=True) + + do_targets(analyze_subparsers, parent, completer) diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py new file mode 100644 index 0000000..7b6ea3e --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/__init__.py @@ -0,0 +1,48 @@ +"""Command line parsing for all `coverage analyze targets` commands.""" +from __future__ import annotations + +import argparse + +from .....environments import ( + CompositeActionCompletionFinder, +) + +from .combine import ( + do_combine, +) + +from .expand import ( + do_expand, +) + +from .filter import ( + do_filter, +) + +from .generate import ( + do_generate, +) + +from .missing import ( + do_missing, +) + + +def do_targets( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for all `coverage analyze targets` commands.""" + targets = subparsers.add_parser( + 'targets', + help='analyze integration test target coverage', + ) + + targets_subparsers = targets.add_subparsers(metavar='COMMAND', required=True) + + do_generate(targets_subparsers, parent, completer) + do_expand(targets_subparsers, parent, completer) + do_filter(targets_subparsers, parent, completer) + do_combine(targets_subparsers, parent, completer) + do_missing(targets_subparsers, parent, completer) diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py new file mode 100644 index 0000000..7fa49bf --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/combine.py @@ -0,0 +1,49 @@ +"""Command line parsing for the `coverage analyze targets combine` command.""" +from __future__ import annotations + +import argparse + +from ......commands.coverage.analyze.targets.combine import ( + command_coverage_analyze_targets_combine, + CoverageAnalyzeTargetsCombineConfig, +) + +from .....environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_combine( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `coverage analyze targets combine` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'combine', + parents=[parent], + help='combine multiple aggregated coverage files', + ) + + parser.set_defaults( + func=command_coverage_analyze_targets_combine, + config=CoverageAnalyzeTargetsCombineConfig, + ) + + targets_combine = parser.add_argument_group('coverage arguments') + + targets_combine.add_argument( + 'input_file', + nargs='+', + help='input file to read aggregated coverage from', + ) + + targets_combine.add_argument( + 'output_file', + help='output file to write aggregated coverage to', + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets combine diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py new file mode 100644 index 0000000..f5f020f --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/expand.py @@ -0,0 +1,48 @@ +"""Command line parsing for the `coverage analyze targets expand` command.""" +from __future__ import annotations + +import argparse + +from ......commands.coverage.analyze.targets.expand import ( + command_coverage_analyze_targets_expand, + CoverageAnalyzeTargetsExpandConfig, +) + +from .....environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_expand( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `coverage analyze targets expand` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'expand', + parents=[parent], + help='expand target names from integers in aggregated coverage', + ) + + parser.set_defaults( + func=command_coverage_analyze_targets_expand, + config=CoverageAnalyzeTargetsExpandConfig, + ) + + targets_expand = parser.add_argument_group(title='coverage arguments') + + targets_expand.add_argument( + 'input_file', + help='input file to read aggregated coverage from', + ) + + targets_expand.add_argument( + 'output_file', + help='output file to write expanded coverage to', + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets expand diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py new file mode 100644 index 0000000..afcb828 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/filter.py @@ -0,0 +1,76 @@ +"""Command line parsing for the `coverage analyze targets filter` command.""" +from __future__ import annotations + +import argparse + +from ......commands.coverage.analyze.targets.filter import ( + command_coverage_analyze_targets_filter, + CoverageAnalyzeTargetsFilterConfig, +) + +from .....environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_filter( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `coverage analyze targets filter` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'filter', + parents=[parent], + help='filter aggregated coverage data', + ) + + parser.set_defaults( + func=command_coverage_analyze_targets_filter, + config=CoverageAnalyzeTargetsFilterConfig, + ) + + targets_filter = parser.add_argument_group(title='coverage arguments') + + targets_filter.add_argument( + 'input_file', + help='input file to read aggregated coverage from', + ) + + targets_filter.add_argument( + 'output_file', + help='output file to write expanded coverage to', + ) + + targets_filter.add_argument( + '--include-target', + metavar='TGT', + dest='include_targets', + action='append', + help='include the specified targets', + ) + + targets_filter.add_argument( + '--exclude-target', + metavar='TGT', + dest='exclude_targets', + action='append', + help='exclude the specified targets', + ) + + targets_filter.add_argument( + '--include-path', + metavar='REGEX', + help='include paths matching the given regex', + ) + + targets_filter.add_argument( + '--exclude-path', + metavar='REGEX', + help='exclude paths matching the given regex', + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets filter diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py new file mode 100644 index 0000000..0d13933 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/generate.py @@ -0,0 +1,49 @@ +"""Command line parsing for the `coverage analyze targets generate` command.""" +from __future__ import annotations + +import argparse + +from ......commands.coverage.analyze.targets.generate import ( + command_coverage_analyze_targets_generate, + CoverageAnalyzeTargetsGenerateConfig, +) + +from .....environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_generate( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `coverage analyze targets generate` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'generate', + parents=[parent], + help='aggregate coverage by integration test target', + ) + + parser.set_defaults( + func=command_coverage_analyze_targets_generate, + config=CoverageAnalyzeTargetsGenerateConfig, + ) + + targets_generate = parser.add_argument_group(title='coverage arguments') + + targets_generate.add_argument( + 'input_dir', + nargs='?', + help='directory to read coverage from', + ) + + targets_generate.add_argument( + 'output_file', + help='output file for aggregated coverage', + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets generate diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py new file mode 100644 index 0000000..8af236f --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/analyze/targets/missing.py @@ -0,0 +1,65 @@ +"""Command line parsing for the `coverage analyze targets missing` command.""" +from __future__ import annotations + +import argparse + +from ......commands.coverage.analyze.targets.missing import ( + command_coverage_analyze_targets_missing, + CoverageAnalyzeTargetsMissingConfig, +) + +from .....environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_missing( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `coverage analyze targets missing` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'missing', + parents=[parent], + help='identify coverage in one file missing in another', + ) + + parser.set_defaults( + func=command_coverage_analyze_targets_missing, + config=CoverageAnalyzeTargetsMissingConfig, + ) + + targets_missing = parser.add_argument_group(title='coverage arguments') + + targets_missing.add_argument( + 'from_file', + help='input file containing aggregated coverage', + ) + + targets_missing.add_argument( + 'to_file', + help='input file containing aggregated coverage', + ) + + targets_missing.add_argument( + 'output_file', + help='output file to write aggregated coverage to', + ) + + targets_missing.add_argument( + '--only-gaps', + action='store_true', + help='report only arcs/lines not hit by any target', + ) + + targets_missing.add_argument( + '--only-exists', + action='store_true', + help='limit results to files that exist', + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage analyze targets missing diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py b/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py new file mode 100644 index 0000000..9b6d34a --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/combine.py @@ -0,0 +1,49 @@ +"""Command line parsing for the `coverage combine` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.coverage.combine import ( + command_coverage_combine, + CoverageCombineConfig, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_combine( + subparsers, + parent: argparse.ArgumentParser, + add_coverage_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for the `coverage combine` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'combine', + parents=[parent], + help='combine coverage data and rewrite remote paths', + ) + + parser.set_defaults( + func=command_coverage_combine, + config=CoverageCombineConfig, + ) + + coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments')) + + add_coverage_common(coverage_combine) + + coverage_combine.add_argument( + '--export', + metavar='DIR', + help='directory to export combined coverage files to', + ) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage combine diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py b/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py new file mode 100644 index 0000000..ef356f0 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/erase.py @@ -0,0 +1,36 @@ +"""Command line parsing for the `coverage erase` command.""" +from __future__ import annotations + +import argparse + +from ....commands.coverage.erase import ( + command_coverage_erase, + CoverageEraseConfig, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_erase( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for the `coverage erase` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'erase', + parents=[parent], + help='erase coverage data files', + ) + + parser.set_defaults( + func=command_coverage_erase, + config=CoverageEraseConfig, + ) + + add_environments(parser, completer, ControllerMode.ORIGIN, TargetMode.NO_TARGETS) # coverage erase diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/html.py b/test/lib/ansible_test/_internal/cli/commands/coverage/html.py new file mode 100644 index 0000000..5f719de --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/html.py @@ -0,0 +1,43 @@ +"""Command line parsing for the `coverage html` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.coverage.html import ( + command_coverage_html, + CoverageHtmlConfig, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_html( + subparsers, + parent: argparse.ArgumentParser, + add_coverage_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for the `coverage html` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'html', + parents=[parent], + help='generate html coverage report', + ) + + parser.set_defaults( + func=command_coverage_html, + config=CoverageHtmlConfig, + ) + + coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments')) + + add_coverage_common(coverage_combine) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage html diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/report.py b/test/lib/ansible_test/_internal/cli/commands/coverage/report.py new file mode 100644 index 0000000..e6a6e80 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/report.py @@ -0,0 +1,61 @@ +"""Command line parsing for the `coverage report` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.coverage.report import ( + command_coverage_report, + CoverageReportConfig, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_report( + subparsers, + parent: argparse.ArgumentParser, + add_coverage_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for the `coverage report` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'report', + parents=[parent], + help='generate console coverage report', + ) + + parser.set_defaults( + func=command_coverage_report, + config=CoverageReportConfig, + ) + + coverage_report = t.cast(argparse.ArgumentParser, parser.add_argument_group('coverage arguments')) + + add_coverage_common(coverage_report) + + coverage_report.add_argument( + '--show-missing', + action='store_true', + help='show line numbers of statements not executed', + ) + + coverage_report.add_argument( + '--include', + metavar='PAT[,...]', + help='only include paths that match a pattern (accepts quoted shell wildcards)', + ) + + coverage_report.add_argument( + '--omit', + metavar='PAT[,...]', + help='omit paths that match a pattern (accepts quoted shell wildcards)', + ) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage report diff --git a/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py b/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py new file mode 100644 index 0000000..e7b03ca --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/coverage/xml.py @@ -0,0 +1,43 @@ +"""Command line parsing for the `coverage xml` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.coverage.xml import ( + command_coverage_xml, + CoverageXmlConfig, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_xml( + subparsers, + parent: argparse.ArgumentParser, + add_coverage_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +) -> None: + """Command line parsing for the `coverage xml` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'xml', + parents=[parent], + help='generate xml coverage report', + ) + + parser.set_defaults( + func=command_coverage_xml, + config=CoverageXmlConfig, + ) + + coverage_combine = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='coverage arguments')) + + add_coverage_common(coverage_combine) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NO_TARGETS) # coverage xml diff --git a/test/lib/ansible_test/_internal/cli/commands/env.py b/test/lib/ansible_test/_internal/cli/commands/env.py new file mode 100644 index 0000000..0cd2114 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/env.py @@ -0,0 +1,63 @@ +"""Command line parsing for the `env` command.""" +from __future__ import annotations + +import argparse + +from ...commands.env import ( + EnvConfig, + command_env, +) + +from ..environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_env( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `env` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'env', + parents=[parent], + help='show information about the test environment', + ) + + parser.set_defaults( + func=command_env, + config=EnvConfig, + ) + + env = parser.add_argument_group(title='env arguments') + + env.add_argument( + '--show', + action='store_true', + help='show environment on stdout', + ) + + env.add_argument( + '--dump', + action='store_true', + help='dump environment to disk', + ) + + env.add_argument( + '--list-files', + action='store_true', + help='list files on stdout', + ) + + env.add_argument( + '--timeout', + type=int, + metavar='MINUTES', + help='timeout for future ansible-test commands (0 clears)', + ) + + add_environments(parser, completer, ControllerMode.NO_DELEGATION, TargetMode.NO_TARGETS) # env diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py b/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py new file mode 100644 index 0000000..dfdefb1 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/integration/__init__.py @@ -0,0 +1,162 @@ +"""Command line parsing for all integration commands.""" +from __future__ import annotations + +import argparse + +from ...completers import ( + complete_target, + register_completer, +) + +from ...environments import ( + CompositeActionCompletionFinder, +) + +from .network import ( + do_network_integration, +) + +from .posix import ( + do_posix_integration, +) + +from .windows import ( + do_windows_integration, +) + + +def do_integration( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for all integration commands.""" + parser = argparse.ArgumentParser( + add_help=False, + parents=[parent], + ) + + do_posix_integration(subparsers, parser, add_integration_common, completer) + do_network_integration(subparsers, parser, add_integration_common, completer) + do_windows_integration(subparsers, parser, add_integration_common, completer) + + +def add_integration_common( + parser: argparse.ArgumentParser, +): + """Add common integration arguments.""" + register_completer(parser.add_argument( + '--start-at', + metavar='TARGET', + help='start at the specified target', + ), complete_target) + + parser.add_argument( + '--start-at-task', + metavar='TASK', + help='start at the specified task', + ) + + parser.add_argument( + '--tags', + metavar='TAGS', + help='only run plays and tasks tagged with these values', + ) + + parser.add_argument( + '--skip-tags', + metavar='TAGS', + help='only run plays and tasks whose tags do not match these values', + ) + + parser.add_argument( + '--diff', + action='store_true', + help='show diff output', + ) + + parser.add_argument( + '--allow-destructive', + action='store_true', + help='allow destructive tests', + ) + + parser.add_argument( + '--allow-root', + action='store_true', + help='allow tests requiring root when not root', + ) + + parser.add_argument( + '--allow-disabled', + action='store_true', + help='allow tests which have been marked as disabled', + ) + + parser.add_argument( + '--allow-unstable', + action='store_true', + help='allow tests which have been marked as unstable', + ) + + parser.add_argument( + '--allow-unstable-changed', + action='store_true', + help='allow tests which have been marked as unstable when focused changes are detected', + ) + + parser.add_argument( + '--allow-unsupported', + action='store_true', + help='allow tests which have been marked as unsupported', + ) + + parser.add_argument( + '--retry-on-error', + action='store_true', + help='retry failed test with increased verbosity', + ) + + parser.add_argument( + '--continue-on-error', + action='store_true', + help='continue after failed test', + ) + + parser.add_argument( + '--debug-strategy', + action='store_true', + help='run test playbooks using the debug strategy', + ) + + parser.add_argument( + '--changed-all-target', + metavar='TARGET', + default='all', + help='target to run when all tests are needed', + ) + + parser.add_argument( + '--changed-all-mode', + metavar='MODE', + choices=('default', 'include', 'exclude'), + help='include/exclude behavior with --changed-all-target: %(choices)s', + ) + + parser.add_argument( + '--list-targets', + action='store_true', + help='list matching targets instead of running tests', + ) + + parser.add_argument( + '--no-temp-workdir', + action='store_true', + help='do not run tests from a temporary directory (use only for verifying broken tests)', + ) + + parser.add_argument( + '--no-temp-unicode', + action='store_true', + help='avoid unicode characters in temporary directory (use only for verifying broken tests)', + ) diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/network.py b/test/lib/ansible_test/_internal/cli/commands/integration/network.py new file mode 100644 index 0000000..a05985b --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/integration/network.py @@ -0,0 +1,86 @@ +"""Command line parsing for the `network-integration` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import os +import typing as t + +from ....commands.integration.network import ( + command_network_integration, +) + +from ....config import ( + NetworkIntegrationConfig, +) + +from ....target import ( + walk_network_integration_targets, +) + +from ....data import ( + data_context, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + +from ...completers import ( + register_completer, +) + + +def do_network_integration( + subparsers, + parent: argparse.ArgumentParser, + add_integration_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `network-integration` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'network-integration', + parents=[parent], + help='network integration tests', + ) + + parser.set_defaults( + func=command_network_integration, + targets_func=walk_network_integration_targets, + config=NetworkIntegrationConfig) + + network_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='network integration test arguments')) + + add_integration_common(network_integration) + + register_completer(network_integration.add_argument( + '--testcase', + metavar='TESTCASE', + help='limit a test to a specified testcase', + ), complete_network_testcase) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NETWORK_INTEGRATION) # network-integration + + +def complete_network_testcase(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Return a list of test cases matching the given prefix if only one target was parsed from the command line, otherwise return an empty list.""" + testcases = [] + + # since testcases are module specific, don't autocomplete if more than one + # module is specified + if len(parsed_args.include) != 1: + return [] + + target = parsed_args.include[0] + test_dir = os.path.join(data_context().content.integration_targets_path, target, 'tests') + connection_dirs = data_context().content.get_dirs(test_dir) + + for connection_dir in connection_dirs: + for testcase in [os.path.basename(path) for path in data_context().content.get_files(connection_dir)]: + if testcase.startswith(prefix): + testcases.append(testcase.split('.', 1)[0]) + + return testcases diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/posix.py b/test/lib/ansible_test/_internal/cli/commands/integration/posix.py new file mode 100644 index 0000000..78d6165 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/integration/posix.py @@ -0,0 +1,51 @@ +"""Command line parsing for the `integration` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.integration.posix import ( + command_posix_integration, +) + +from ....config import ( + PosixIntegrationConfig, +) + +from ....target import ( + walk_posix_integration_targets, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_posix_integration( + subparsers, + parent: argparse.ArgumentParser, + add_integration_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `integration` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'integration', + parents=[parent], + help='posix integration tests', + ) + + parser.set_defaults( + func=command_posix_integration, + targets_func=walk_posix_integration_targets, + config=PosixIntegrationConfig, + ) + + posix_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='integration test arguments')) + + add_integration_common(posix_integration) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.POSIX_INTEGRATION) # integration diff --git a/test/lib/ansible_test/_internal/cli/commands/integration/windows.py b/test/lib/ansible_test/_internal/cli/commands/integration/windows.py new file mode 100644 index 0000000..ab022e3 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/integration/windows.py @@ -0,0 +1,51 @@ +"""Command line parsing for the `windows-integration` command.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import typing as t + +from ....commands.integration.windows import ( + command_windows_integration, +) + +from ....config import ( + WindowsIntegrationConfig, +) + +from ....target import ( + walk_windows_integration_targets, +) + +from ...environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_windows_integration( + subparsers, + parent: argparse.ArgumentParser, + add_integration_common: c.Callable[[argparse.ArgumentParser], None], + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `windows-integration` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'windows-integration', + parents=[parent], + help='windows integration tests', + ) + + parser.set_defaults( + func=command_windows_integration, + targets_func=walk_windows_integration_targets, + config=WindowsIntegrationConfig, + ) + + windows_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='windows integration test arguments')) + + add_integration_common(windows_integration) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.WINDOWS_INTEGRATION) # windows-integration diff --git a/test/lib/ansible_test/_internal/cli/commands/sanity.py b/test/lib/ansible_test/_internal/cli/commands/sanity.py new file mode 100644 index 0000000..8b4a9ae --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/sanity.py @@ -0,0 +1,119 @@ +"""Command line parsing for the `sanity` command.""" +from __future__ import annotations + +import argparse + +from ...config import ( + SanityConfig, +) + +from ...commands.sanity import ( + command_sanity, + sanity_get_tests, +) + +from ...target import ( + walk_sanity_targets, +) + +from ...data import ( + data_context, +) + +from ..environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_sanity( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `sanity` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'sanity', + parents=[parent], + help='sanity tests', + ) + + parser.set_defaults( + func=command_sanity, + targets_func=walk_sanity_targets, + config=SanityConfig) + + sanity = parser.add_argument_group(title='sanity test arguments') + + sanity.add_argument( + '--test', + metavar='TEST', + action='append', + choices=[test.name for test in sanity_get_tests()], + help='tests to run', + ) + + sanity.add_argument( + '--skip-test', + metavar='TEST', + action='append', + choices=[test.name for test in sanity_get_tests()], + help='tests to skip', + ) + + sanity.add_argument( + '--allow-disabled', + action='store_true', + help='allow tests to run which are disabled by default', + ) + + sanity.add_argument( + '--list-tests', + action='store_true', + help='list available tests', + ) + + sanity.add_argument( + '--enable-optional-errors', + action='store_true', + help='enable optional errors', + ) + + if data_context().content.is_ansible: + sanity.add_argument( + '--keep-git', + action='store_true', + help='transfer git related files to the remote host/container', + ) + else: + sanity.set_defaults( + keep_git=False, + ) + + sanity.add_argument( + '--lint', + action='store_true', + help='write lint output to stdout, everything else stderr', + ) + + sanity.add_argument( + '--junit', + action='store_true', + help='write test failures to junit xml files', + ) + + sanity.add_argument( + '--failure-ok', + action='store_true', + help='exit successfully on failed tests after saving results', + ) + + sanity.add_argument( + '--prime-venvs', + action='store_true', + help='prepare virtual environments without running tests' + ) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.SANITY) # sanity diff --git a/test/lib/ansible_test/_internal/cli/commands/shell.py b/test/lib/ansible_test/_internal/cli/commands/shell.py new file mode 100644 index 0000000..1baffc6 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/shell.py @@ -0,0 +1,59 @@ +"""Command line parsing for the `shell` command.""" +from __future__ import annotations + +import argparse + +from ...commands.shell import ( + command_shell, +) + +from ...config import ( + ShellConfig, +) + +from ..environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_shell( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `shell` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'shell', + parents=[parent], + help='open an interactive shell', + ) + + parser.set_defaults( + func=command_shell, + config=ShellConfig, + ) + + shell = parser.add_argument_group(title='shell arguments') + + shell.add_argument( + 'cmd', + nargs='*', + help='run the specified command', + ) + + shell.add_argument( + '--raw', + action='store_true', + help='direct to shell with no setup', + ) + + shell.add_argument( + '--export', + metavar='PATH', + help='export inventory instead of opening a shell', + ) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.SHELL) # shell diff --git a/test/lib/ansible_test/_internal/cli/commands/units.py b/test/lib/ansible_test/_internal/cli/commands/units.py new file mode 100644 index 0000000..c541a87 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/commands/units.py @@ -0,0 +1,65 @@ +"""Command line parsing for the `units` command.""" +from __future__ import annotations + +import argparse + +from ...config import ( + UnitsConfig, +) + +from ...commands.units import ( + command_units, +) + +from ...target import ( + walk_units_targets, +) + +from ..environments import ( + CompositeActionCompletionFinder, + ControllerMode, + TargetMode, + add_environments, +) + + +def do_units( + subparsers, + parent: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, +): + """Command line parsing for the `units` command.""" + parser: argparse.ArgumentParser = subparsers.add_parser( + 'units', + parents=[parent], + help='unit tests', + ) + + parser.set_defaults( + func=command_units, + targets_func=walk_units_targets, + config=UnitsConfig, + ) + + units = parser.add_argument_group(title='unit test arguments') + + units.add_argument( + '--collect-only', + action='store_true', + help='collect tests but do not execute them', + ) + + units.add_argument( + '--num-workers', + metavar='INT', + type=int, + help='number of workers to use (default: auto)', + ) + + units.add_argument( + '--requirements-mode', + choices=('only', 'skip'), + help=argparse.SUPPRESS, + ) + + add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.UNITS) # units diff --git a/test/lib/ansible_test/_internal/cli/compat.py b/test/lib/ansible_test/_internal/cli/compat.py new file mode 100644 index 0000000..93006d5 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/compat.py @@ -0,0 +1,505 @@ +"""Provides compatibility with first-generation host delegation options in ansible-test.""" +from __future__ import annotations + +import argparse +import collections.abc as c +import dataclasses +import enum +import os +import types +import typing as t + +from ..constants import ( + CONTROLLER_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ..util import ( + ApplicationError, + display, + filter_args, + sorted_versions, + str_to_version, +) + +from ..docker_util import ( + docker_available, +) + +from ..completion import ( + docker_completion, + remote_completion, + filter_completion, +) + +from ..host_configs import ( + ControllerConfig, + ControllerHostConfig, + DockerConfig, + FallbackDetail, + FallbackReason, + HostConfig, + HostContext, + HostSettings, + NativePythonConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixRemoteConfig, + VirtualPythonConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ..data import ( + data_context, +) + + +def filter_python(version: t.Optional[str], versions: t.Optional[c.Sequence[str]]) -> t.Optional[str]: + """If a Python version is given and is in the given version list, return that Python version, otherwise return None.""" + return version if version in versions else None + + +def controller_python(version: t.Optional[str]) -> t.Optional[str]: + """If a Python version is given and is supported by the controller, return that Python version, otherwise return None.""" + return filter_python(version, CONTROLLER_PYTHON_VERSIONS) + + +def get_fallback_remote_controller() -> str: + """Return the remote fallback platform for the controller.""" + platform = 'freebsd' # lower cost than RHEL and macOS + candidates = [item for item in filter_completion(remote_completion()).values() if item.controller_supported and item.platform == platform] + fallback = sorted(candidates, key=lambda value: str_to_version(value.version), reverse=True)[0] + return fallback.name + + +def get_option_name(name: str) -> str: + """Return a command-line option name from the given option name.""" + if name == 'targets': + name = 'target' + + return f'--{name.replace("_", "-")}' + + +class PythonVersionUnsupportedError(ApplicationError): + """A Python version was requested for a context which does not support that version.""" + def __init__(self, context: str, version: str, versions: c.Iterable[str]) -> None: + super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}') + + +class PythonVersionUnspecifiedError(ApplicationError): + """A Python version was not specified for a context which is unknown, thus the Python version is unknown.""" + def __init__(self, context: str) -> None: + super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.') + + +class ControllerNotSupportedError(ApplicationError): + """Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target.""" + def __init__(self, context: str) -> None: + super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.') + + +class OptionsConflictError(ApplicationError): + """Option(s) were specified which conflict with other options.""" + def __init__(self, first: c.Iterable[str], second: c.Iterable[str]) -> None: + super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.') + + +@dataclasses.dataclass(frozen=True) +class LegacyHostOptions: + """Legacy host options used prior to the availability of separate controller and target host configuration.""" + python: t.Optional[str] = None + python_interpreter: t.Optional[str] = None + local: t.Optional[bool] = None + venv: t.Optional[bool] = None + venv_system_site_packages: t.Optional[bool] = None + remote: t.Optional[str] = None + remote_provider: t.Optional[str] = None + remote_arch: t.Optional[str] = None + docker: t.Optional[str] = None + docker_privileged: t.Optional[bool] = None + docker_seccomp: t.Optional[str] = None + docker_memory: t.Optional[int] = None + windows: t.Optional[list[str]] = None + platform: t.Optional[list[str]] = None + platform_collection: t.Optional[list[tuple[str, str]]] = None + platform_connection: t.Optional[list[tuple[str, str]]] = None + inventory: t.Optional[str] = None + + @staticmethod + def create(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> LegacyHostOptions: + """Create legacy host options from the given namespace.""" + kwargs = {field.name: getattr(namespace, field.name, None) for field in dataclasses.fields(LegacyHostOptions)} + + if kwargs['python'] == 'default': + kwargs['python'] = None + + return LegacyHostOptions(**kwargs) + + @staticmethod + def purge_namespace(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> None: + """Purge legacy host options fields from the given namespace.""" + for field in dataclasses.fields(LegacyHostOptions): + if hasattr(namespace, field.name): + delattr(namespace, field.name) + + @staticmethod + def purge_args(args: list[str]) -> list[str]: + """Purge legacy host options from the given command line arguments.""" + fields: tuple[dataclasses.Field, ...] = dataclasses.fields(LegacyHostOptions) + filters: dict[str, int] = {get_option_name(field.name): 0 if field.type is t.Optional[bool] else 1 for field in fields} + + return filter_args(args, filters) + + def get_options_used(self) -> tuple[str, ...]: + """Return a tuple of the command line options used.""" + fields: tuple[dataclasses.Field, ...] = dataclasses.fields(self) + options = tuple(sorted(get_option_name(field.name) for field in fields if getattr(self, field.name))) + return options + + +class TargetMode(enum.Enum): + """Type of provisioning to use for the targets.""" + WINDOWS_INTEGRATION = enum.auto() # windows-integration + NETWORK_INTEGRATION = enum.auto() # network-integration + POSIX_INTEGRATION = enum.auto() # integration + SANITY = enum.auto() # sanity + UNITS = enum.auto() # units + SHELL = enum.auto() # shell + NO_TARGETS = enum.auto() # coverage + + @property + def one_host(self) -> bool: + """Return True if only one host (the controller) should be used, otherwise return False.""" + return self in (TargetMode.SANITY, TargetMode.UNITS, TargetMode.NO_TARGETS) + + @property + def no_fallback(self) -> bool: + """Return True if no fallback is acceptable for the controller (due to options not applying to the target), otherwise return False.""" + return self in (TargetMode.WINDOWS_INTEGRATION, TargetMode.NETWORK_INTEGRATION, TargetMode.NO_TARGETS) + + @property + def multiple_pythons(self) -> bool: + """Return True if multiple Python versions are allowed, otherwise False.""" + return self in (TargetMode.SANITY, TargetMode.UNITS) + + @property + def has_python(self) -> bool: + """Return True if this mode uses Python, otherwise False.""" + return self in (TargetMode.POSIX_INTEGRATION, TargetMode.SANITY, TargetMode.UNITS, TargetMode.SHELL) + + +def convert_legacy_args( + argv: list[str], + args: t.Union[argparse.Namespace, types.SimpleNamespace], + mode: TargetMode, +) -> HostSettings: + """Convert pre-split host arguments in the given namespace to their split counterparts.""" + old_options = LegacyHostOptions.create(args) + old_options.purge_namespace(args) + + new_options = [ + '--controller', + '--target', + '--target-python', + '--target-posix', + '--target-windows', + '--target-network', + ] + + used_old_options = old_options.get_options_used() + used_new_options = [name for name in new_options if name in argv] + + if used_old_options: + if used_new_options: + raise OptionsConflictError(used_old_options, used_new_options) + + controller, targets, controller_fallback = get_legacy_host_config(mode, old_options) + + if controller_fallback: + if mode.one_host: + display.info(controller_fallback.message, verbosity=1) + else: + display.warning(controller_fallback.message) + + used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS) and not native_python(old_options) + else: + controller = args.controller or OriginConfig() + controller_fallback = None + + if mode == TargetMode.NO_TARGETS: + targets = [] + used_default_pythons = False + elif args.targets: + targets = args.targets + used_default_pythons = False + else: + targets = default_targets(mode, controller) + used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS) + + args.controller = controller + args.targets = targets + + if used_default_pythons: + control_targets = t.cast(list[ControllerConfig], targets) + skipped_python_versions = sorted_versions(list(set(SUPPORTED_PYTHON_VERSIONS) - {target.python.version for target in control_targets})) + else: + skipped_python_versions = [] + + filtered_args = old_options.purge_args(argv) + filtered_args = filter_args(filtered_args, {name: 1 for name in new_options}) + + host_settings = HostSettings( + controller=controller, + targets=targets, + skipped_python_versions=skipped_python_versions, + filtered_args=filtered_args, + controller_fallback=controller_fallback, + ) + + return host_settings + + +def controller_targets( + mode: TargetMode, + options: LegacyHostOptions, + controller: ControllerHostConfig, +) -> list[HostConfig]: + """Return the configuration for controller targets.""" + python = native_python(options) + + targets: list[HostConfig] + + if python: + targets = [ControllerConfig(python=python)] + else: + targets = default_targets(mode, controller) + + return targets + + +def native_python(options: LegacyHostOptions) -> t.Optional[NativePythonConfig]: + """Return a NativePythonConfig for the given version if it is not None, otherwise return None.""" + if not options.python and not options.python_interpreter: + return None + + return NativePythonConfig(version=options.python, path=options.python_interpreter) + + +def get_legacy_host_config( + mode: TargetMode, + options: LegacyHostOptions, +) -> tuple[ControllerHostConfig, list[HostConfig], t.Optional[FallbackDetail]]: + """ + Returns controller and target host configs derived from the provided legacy host options. + The goal is to match the original behavior, by using non-split testing whenever possible. + When the options support the controller, use the options for the controller and use ControllerConfig for the targets. + When the options do not support the controller, use the options for the targets and use a default controller config influenced by the options. + """ + venv_fallback = 'venv/default' + docker_fallback = 'default' + remote_fallback = get_fallback_remote_controller() + + controller_fallback: t.Optional[tuple[str, str, FallbackReason]] = None + + controller: t.Optional[ControllerHostConfig] + targets: list[HostConfig] + + if options.venv: + if controller_python(options.python) or not options.python: + controller = OriginConfig(python=VirtualPythonConfig(version=options.python or 'default', system_site_packages=options.venv_system_site_packages)) + else: + controller_fallback = f'origin:python={venv_fallback}', f'--venv --python {options.python}', FallbackReason.PYTHON + controller = OriginConfig(python=VirtualPythonConfig(version='default', system_site_packages=options.venv_system_site_packages)) + + if mode in (TargetMode.SANITY, TargetMode.UNITS): + python = native_python(options) + + if python: + control_targets = [ControllerConfig(python=python)] + else: + control_targets = controller.get_default_targets(HostContext(controller_config=controller)) + + # Target sanity tests either have no Python requirements or manage their own virtual environments. + # Thus, there is no point in setting up virtual environments ahead of time for them. + + if mode == TargetMode.UNITS: + targets = [ControllerConfig(python=VirtualPythonConfig(version=target.python.version, path=target.python.path, + system_site_packages=options.venv_system_site_packages)) for target in control_targets] + else: + targets = t.cast(list[HostConfig], control_targets) + else: + targets = [ControllerConfig(python=VirtualPythonConfig(version=options.python or 'default', + system_site_packages=options.venv_system_site_packages))] + elif options.docker: + docker_config = filter_completion(docker_completion()).get(options.docker) + + if docker_config: + if options.python and options.python not in docker_config.supported_pythons: + raise PythonVersionUnsupportedError(f'--docker {options.docker}', options.python, docker_config.supported_pythons) + + if docker_config.controller_supported: + if controller_python(options.python) or not options.python: + controller = DockerConfig(name=options.docker, python=native_python(options), + privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory) + targets = controller_targets(mode, options, controller) + else: + controller_fallback = f'docker:{options.docker}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON + controller = DockerConfig(name=options.docker) + targets = controller_targets(mode, options, controller) + else: + controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker}', FallbackReason.ENVIRONMENT + controller = DockerConfig(name=docker_fallback) + targets = [DockerConfig(name=options.docker, python=native_python(options), + privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)] + else: + if not options.python: + raise PythonVersionUnspecifiedError(f'--docker {options.docker}') + + if controller_python(options.python): + controller = DockerConfig(name=options.docker, python=native_python(options), + privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory) + targets = controller_targets(mode, options, controller) + else: + controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON + controller = DockerConfig(name=docker_fallback) + targets = [DockerConfig(name=options.docker, python=native_python(options), + privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)] + elif options.remote: + remote_config = filter_completion(remote_completion()).get(options.remote) + context, reason = None, None + + if remote_config: + if options.python and options.python not in remote_config.supported_pythons: + raise PythonVersionUnsupportedError(f'--remote {options.remote}', options.python, remote_config.supported_pythons) + + if remote_config.controller_supported: + if controller_python(options.python) or not options.python: + controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, + arch=options.remote_arch) + targets = controller_targets(mode, options, controller) + else: + controller_fallback = f'remote:{options.remote}', f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON + controller = PosixRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch) + targets = controller_targets(mode, options, controller) + else: + context, reason = f'--remote {options.remote}', FallbackReason.ENVIRONMENT + controller = None + targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)] + elif mode == TargetMode.SHELL and options.remote.startswith('windows/'): + if options.python and options.python not in CONTROLLER_PYTHON_VERSIONS: + raise ControllerNotSupportedError(f'--python {options.python}') + + controller = OriginConfig(python=native_python(options)) + targets = [WindowsRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch)] + else: + if not options.python: + raise PythonVersionUnspecifiedError(f'--remote {options.remote}') + + if controller_python(options.python): + controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch) + targets = controller_targets(mode, options, controller) + else: + context, reason = f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON + controller = None + targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)] + + if not controller: + if docker_available(): + controller_fallback = f'docker:{docker_fallback}', context, reason + controller = DockerConfig(name=docker_fallback) + else: + controller_fallback = f'remote:{remote_fallback}', context, reason + controller = PosixRemoteConfig(name=remote_fallback) + else: # local/unspecified + # There are several changes in behavior from the legacy implementation when using no delegation (or the `--local` option). + # These changes are due to ansible-test now maintaining consistency between its own Python and that of controller Python subprocesses. + # + # 1) The `--python-interpreter` option (if different from sys.executable) now affects controller subprocesses and triggers re-execution of ansible-test. + # Previously this option was completely ignored except when used with the `--docker` or `--remote` options. + # 2) The `--python` option now triggers re-execution of ansible-test if it differs from sys.version_info. + # Previously it affected Python subprocesses, but not ansible-test itself. + + if controller_python(options.python) or not options.python: + controller = OriginConfig(python=native_python(options)) + targets = controller_targets(mode, options, controller) + else: + controller_fallback = 'origin:python=default', f'--python {options.python}', FallbackReason.PYTHON + controller = OriginConfig() + targets = controller_targets(mode, options, controller) + + if controller_fallback: + controller_option, context, reason = controller_fallback + + if mode.no_fallback: + raise ControllerNotSupportedError(context) + + fallback_detail = FallbackDetail( + reason=reason, + message=f'Using `--controller {controller_option}` since `{context}` does not support the controller.', + ) + else: + fallback_detail = None + + if mode.one_host and any(not isinstance(target, ControllerConfig) for target in targets): + raise ControllerNotSupportedError(controller_fallback[1]) + + if mode == TargetMode.NO_TARGETS: + targets = [] + else: + targets = handle_non_posix_targets(mode, options, targets) + + return controller, targets, fallback_detail + + +def handle_non_posix_targets( + mode: TargetMode, + options: LegacyHostOptions, + targets: list[HostConfig], +) -> list[HostConfig]: + """Return a list of non-POSIX targets if the target mode is non-POSIX.""" + if mode == TargetMode.WINDOWS_INTEGRATION: + if options.windows: + targets = [WindowsRemoteConfig(name=f'windows/{version}', provider=options.remote_provider, arch=options.remote_arch) + for version in options.windows] + else: + targets = [WindowsInventoryConfig(path=options.inventory)] + elif mode == TargetMode.NETWORK_INTEGRATION: + if options.platform: + network_targets = [NetworkRemoteConfig(name=platform, provider=options.remote_provider, arch=options.remote_arch) for platform in options.platform] + + for platform, collection in options.platform_collection or []: + for entry in network_targets: + if entry.platform == platform: + entry.collection = collection + + for platform, connection in options.platform_connection or []: + for entry in network_targets: + if entry.platform == platform: + entry.connection = connection + + targets = t.cast(list[HostConfig], network_targets) + else: + targets = [NetworkInventoryConfig(path=options.inventory)] + + return targets + + +def default_targets( + mode: TargetMode, + controller: ControllerHostConfig, +) -> list[HostConfig]: + """Return a list of default targets for the given target mode.""" + targets: list[HostConfig] + + if mode == TargetMode.WINDOWS_INTEGRATION: + targets = [WindowsInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.winrm')))] + elif mode == TargetMode.NETWORK_INTEGRATION: + targets = [NetworkInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.networking')))] + elif mode.multiple_pythons: + targets = t.cast(list[HostConfig], controller.get_default_targets(HostContext(controller_config=controller))) + else: + targets = [ControllerConfig()] + + return targets diff --git a/test/lib/ansible_test/_internal/cli/completers.py b/test/lib/ansible_test/_internal/cli/completers.py new file mode 100644 index 0000000..903b69b --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/completers.py @@ -0,0 +1,30 @@ +"""Completers for use with argcomplete.""" +from __future__ import annotations + +import argparse + +from ..target import ( + find_target_completion, +) + +from .argparsing.argcompletion import ( + OptionCompletionFinder, +) + + +def complete_target(completer: OptionCompletionFinder, prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Perform completion for the targets configured for the command being parsed.""" + matches = find_target_completion(parsed_args.targets_func, prefix, completer.list_mode) + completer.disable_completion_mangling = completer.list_mode and len(matches) > 1 + return matches + + +def complete_choices(choices: list[str], prefix: str, **_) -> list[str]: + """Perform completion using the provided choices.""" + matches = [choice for choice in choices if choice.startswith(prefix)] + return matches + + +def register_completer(action: argparse.Action, completer) -> None: + """Register the given completer with the specified action.""" + action.completer = completer # type: ignore[attr-defined] # intentionally using an attribute that does not exist diff --git a/test/lib/ansible_test/_internal/cli/converters.py b/test/lib/ansible_test/_internal/cli/converters.py new file mode 100644 index 0000000..71e0dae --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/converters.py @@ -0,0 +1,19 @@ +"""Converters for use as the type argument for arparse's add_argument method.""" +from __future__ import annotations + +import argparse + + +def key_value_type(value: str) -> tuple[str, str]: + """Wrapper around key_value.""" + return key_value(value) + + +def key_value(value: str) -> tuple[str, str]: + """Type parsing and validation for argparse key/value pairs separated by an '=' character.""" + parts = value.split('=') + + if len(parts) != 2: + raise argparse.ArgumentTypeError('"%s" must be in the format "key=value"' % value) + + return parts[0], parts[1] diff --git a/test/lib/ansible_test/_internal/cli/environments.py b/test/lib/ansible_test/_internal/cli/environments.py new file mode 100644 index 0000000..5063715 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/environments.py @@ -0,0 +1,612 @@ +"""Command line parsing for test environments.""" +from __future__ import annotations + +import argparse +import enum +import functools +import typing as t + +from ..constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_PROVIDERS, + SECCOMP_CHOICES, + SUPPORTED_PYTHON_VERSIONS, +) + +from ..util import ( + REMOTE_ARCHITECTURES, +) + +from ..completion import ( + docker_completion, + network_completion, + remote_completion, + windows_completion, + filter_completion, +) + +from ..cli.argparsing import ( + CompositeAction, + CompositeActionCompletionFinder, +) + +from ..cli.argparsing.actions import ( + EnumAction, +) + +from ..cli.actions import ( + DelegatedControllerAction, + NetworkSshTargetAction, + NetworkTargetAction, + OriginControllerAction, + PosixSshTargetAction, + PosixTargetAction, + SanityPythonTargetAction, + UnitsPythonTargetAction, + WindowsSshTargetAction, + WindowsTargetAction, +) + +from ..cli.compat import ( + TargetMode, +) + +from ..config import ( + TerminateMode, +) + +from .completers import ( + complete_choices, + register_completer, +) + +from .converters import ( + key_value_type, +) + +from .epilog import ( + get_epilog, +) + +from ..ci import ( + get_ci_provider, +) + + +class ControllerMode(enum.Enum): + """Type of provisioning to use for the controller.""" + NO_DELEGATION = enum.auto() + ORIGIN = enum.auto() + DELEGATED = enum.auto() + + +def add_environments( + parser: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, + controller_mode: ControllerMode, + target_mode: TargetMode, +) -> None: + """Add arguments for the environments used to run ansible-test and commands it invokes.""" + no_environment = controller_mode == ControllerMode.NO_DELEGATION and target_mode == TargetMode.NO_TARGETS + + parser.set_defaults(no_environment=no_environment) + + if no_environment: + return + + parser.set_defaults(target_mode=target_mode) + + add_global_options(parser, controller_mode) + add_legacy_environment_options(parser, controller_mode, target_mode) + action_types = add_composite_environment_options(parser, completer, controller_mode, target_mode) + + sections = [f'{heading}\n{content}' + for action_type, documentation_state in CompositeAction.documentation_state.items() if action_type in action_types + for heading, content in documentation_state.sections.items()] + + if not get_ci_provider().supports_core_ci_auth(): + sections.append('Remote provisioning options have been hidden since no Ansible Core CI API key was found.') + + sections.append(get_epilog(completer)) + + parser.formatter_class = argparse.RawDescriptionHelpFormatter + parser.epilog = '\n\n'.join(sections) + + +def add_global_options( + parser: argparse.ArgumentParser, + controller_mode: ControllerMode, +): + """Add global options for controlling the test environment that work with both the legacy and composite options.""" + global_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='global environment arguments')) + + global_parser.add_argument( + '--containers', + metavar='JSON', + help=argparse.SUPPRESS, + ) + + global_parser.add_argument( + '--pypi-proxy', + action='store_true', + help=argparse.SUPPRESS, + ) + + global_parser.add_argument( + '--pypi-endpoint', + metavar='URI', + help=argparse.SUPPRESS, + ) + + global_parser.add_argument( + '--requirements', + action='store_true', + default=False, + help='install command requirements', + ) + + global_parser.add_argument( + '--no-pip-check', + action='store_true', + help=argparse.SUPPRESS, # deprecated, kept for now (with a warning) for backwards compatibility + ) + + add_global_remote(global_parser, controller_mode) + add_global_docker(global_parser, controller_mode) + + +def add_composite_environment_options( + parser: argparse.ArgumentParser, + completer: CompositeActionCompletionFinder, + controller_mode: ControllerMode, + target_mode: TargetMode, +) -> list[t.Type[CompositeAction]]: + """Add composite options for controlling the test environment.""" + composite_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group( + title='composite environment arguments (mutually exclusive with "environment arguments" above)')) + + composite_parser.add_argument( + '--host-path', + help=argparse.SUPPRESS, + ) + + action_types: list[t.Type[CompositeAction]] = [] + + def register_action_type(action_type: t.Type[CompositeAction]) -> t.Type[CompositeAction]: + """Register the provided composite action type and return it.""" + action_types.append(action_type) + return action_type + + if controller_mode == ControllerMode.NO_DELEGATION: + composite_parser.set_defaults(controller=None) + else: + register_completer(composite_parser.add_argument( + '--controller', + metavar='OPT', + action=register_action_type(DelegatedControllerAction if controller_mode == ControllerMode.DELEGATED else OriginControllerAction), + help='configuration for the controller', + ), completer.completer) + + if target_mode == TargetMode.NO_TARGETS: + composite_parser.set_defaults(targets=[]) + elif target_mode == TargetMode.SHELL: + group = composite_parser.add_mutually_exclusive_group() + + register_completer(group.add_argument( + '--target-posix', + metavar='OPT', + action=register_action_type(PosixSshTargetAction), + help='configuration for the target', + ), completer.completer) + + suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS + + register_completer(group.add_argument( + '--target-windows', + metavar='OPT', + action=WindowsSshTargetAction if suppress else register_action_type(WindowsSshTargetAction), + help=suppress or 'configuration for the target', + ), completer.completer) + + register_completer(group.add_argument( + '--target-network', + metavar='OPT', + action=NetworkSshTargetAction if suppress else register_action_type(NetworkSshTargetAction), + help=suppress or 'configuration for the target', + ), completer.completer) + else: + if target_mode.multiple_pythons: + target_option = '--target-python' + target_help = 'configuration for the target python interpreter(s)' + elif target_mode == TargetMode.POSIX_INTEGRATION: + target_option = '--target' + target_help = 'configuration for the target' + else: + target_option = '--target' + target_help = 'configuration for the target(s)' + + target_actions = { + TargetMode.POSIX_INTEGRATION: PosixTargetAction, + TargetMode.WINDOWS_INTEGRATION: WindowsTargetAction, + TargetMode.NETWORK_INTEGRATION: NetworkTargetAction, + TargetMode.SANITY: SanityPythonTargetAction, + TargetMode.UNITS: UnitsPythonTargetAction, + } + + target_action = target_actions[target_mode] + + register_completer(composite_parser.add_argument( + target_option, + metavar='OPT', + action=register_action_type(target_action), + help=target_help, + ), completer.completer) + + return action_types + + +def add_legacy_environment_options( + parser: argparse.ArgumentParser, + controller_mode: ControllerMode, + target_mode: TargetMode, +): + """Add legacy options for controlling the test environment.""" + environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private + title='environment arguments (mutually exclusive with "composite environment arguments" below)') + + add_environments_python(environment, target_mode) + add_environments_host(environment, controller_mode, target_mode) + + +def add_environments_python( + environments_parser: argparse.ArgumentParser, + target_mode: TargetMode, +) -> None: + """Add environment arguments to control the Python version(s) used.""" + python_versions: tuple[str, ...] + + if target_mode.has_python: + python_versions = SUPPORTED_PYTHON_VERSIONS + else: + python_versions = CONTROLLER_PYTHON_VERSIONS + + environments_parser.add_argument( + '--python', + metavar='X.Y', + choices=python_versions + ('default',), + help='python version: %s' % ', '.join(python_versions), + ) + + environments_parser.add_argument( + '--python-interpreter', + metavar='PATH', + help='path to the python interpreter', + ) + + +def add_environments_host( + environments_parser: argparse.ArgumentParser, + controller_mode: ControllerMode, + target_mode: TargetMode, +) -> None: + """Add environment arguments for the given host and argument modes.""" + environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private + + add_environment_local(environments_exclusive_group) + add_environment_venv(environments_exclusive_group, environments_parser) + + if controller_mode == ControllerMode.DELEGATED: + add_environment_remote(environments_exclusive_group, environments_parser, target_mode) + add_environment_docker(environments_exclusive_group, environments_parser, target_mode) + + if target_mode == TargetMode.WINDOWS_INTEGRATION: + add_environment_windows(environments_parser) + + if target_mode == TargetMode.NETWORK_INTEGRATION: + add_environment_network(environments_parser) + + +def add_environment_network( + environments_parser: argparse.ArgumentParser, +) -> None: + """Add environment arguments for running on a windows host.""" + register_completer(environments_parser.add_argument( + '--platform', + metavar='PLATFORM', + action='append', + help='network platform/version', + ), complete_network_platform) + + register_completer(environments_parser.add_argument( + '--platform-collection', + type=key_value_type, + metavar='PLATFORM=COLLECTION', + action='append', + help='collection used to test platform', + ), complete_network_platform_collection) + + register_completer(environments_parser.add_argument( + '--platform-connection', + type=key_value_type, + metavar='PLATFORM=CONNECTION', + action='append', + help='connection used to test platform', + ), complete_network_platform_connection) + + environments_parser.add_argument( + '--inventory', + metavar='PATH', + help='path to inventory used for tests', + ) + + +def add_environment_windows( + environments_parser: argparse.ArgumentParser, +) -> None: + """Add environment arguments for running on a windows host.""" + register_completer(environments_parser.add_argument( + '--windows', + metavar='VERSION', + action='append', + help='windows version', + ), complete_windows) + + environments_parser.add_argument( + '--inventory', + metavar='PATH', + help='path to inventory used for tests', + ) + + +def add_environment_local( + exclusive_parser: argparse.ArgumentParser, +) -> None: + """Add environment arguments for running on the local (origin) host.""" + exclusive_parser.add_argument( + '--local', + action='store_true', + help='run from the local environment', + ) + + +def add_environment_venv( + exclusive_parser: argparse.ArgumentParser, + environments_parser: argparse.ArgumentParser, +) -> None: + """Add environment arguments for running in ansible-test managed virtual environments.""" + exclusive_parser.add_argument( + '--venv', + action='store_true', + help='run from a virtual environment', + ) + + environments_parser.add_argument( + '--venv-system-site-packages', + action='store_true', + help='enable system site packages') + + +def add_global_docker( + parser: argparse.ArgumentParser, + controller_mode: ControllerMode, +) -> None: + """Add global options for Docker.""" + if controller_mode != ControllerMode.DELEGATED: + parser.set_defaults( + docker_no_pull=False, + docker_network=None, + docker_terminate=None, + prime_containers=False, + dev_systemd_debug=False, + dev_probe_cgroups=None, + ) + + return + + parser.add_argument( + '--docker-no-pull', + action='store_true', + help=argparse.SUPPRESS, # deprecated, kept for now (with a warning) for backwards compatibility + ) + + parser.add_argument( + '--docker-network', + metavar='NET', + help='run using the specified network', + ) + + parser.add_argument( + '--docker-terminate', + metavar='T', + default=TerminateMode.ALWAYS, + type=TerminateMode, + action=EnumAction, + help='terminate the container: %(choices)s (default: %(default)s)', + ) + + parser.add_argument( + '--prime-containers', + action='store_true', + help='download containers without running tests', + ) + + # Docker support isn't related to ansible-core-ci. + # However, ansible-core-ci support is a reasonable indicator that the user may need the `--dev-*` options. + suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS + + parser.add_argument( + '--dev-systemd-debug', + action='store_true', + help=suppress or 'enable systemd debugging in containers', + ) + + parser.add_argument( + '--dev-probe-cgroups', + metavar='DIR', + nargs='?', + const='', + help=suppress or 'probe container cgroups, with optional log dir', + ) + + +def add_environment_docker( + exclusive_parser: argparse.ArgumentParser, + environments_parser: argparse.ArgumentParser, + target_mode: TargetMode, +) -> None: + """Add environment arguments for running in docker containers.""" + if target_mode in (TargetMode.POSIX_INTEGRATION, TargetMode.SHELL): + docker_images = sorted(filter_completion(docker_completion())) + else: + docker_images = sorted(filter_completion(docker_completion(), controller_only=True)) + + register_completer(exclusive_parser.add_argument( + '--docker', + metavar='IMAGE', + nargs='?', + const='default', + help='run from a docker container', + ), functools.partial(complete_choices, docker_images)) + + environments_parser.add_argument( + '--docker-privileged', + action='store_true', + help='run docker container in privileged mode', + ) + + environments_parser.add_argument( + '--docker-seccomp', + metavar='SC', + choices=SECCOMP_CHOICES, + help='set seccomp confinement for the test container: %(choices)s', + ) + + environments_parser.add_argument( + '--docker-memory', + metavar='INT', + type=int, + help='memory limit for docker in bytes', + ) + + +def add_global_remote( + parser: argparse.ArgumentParser, + controller_mode: ControllerMode, +) -> None: + """Add global options for remote instances.""" + if controller_mode != ControllerMode.DELEGATED: + parser.set_defaults( + remote_stage=None, + remote_endpoint=None, + remote_terminate=None, + ) + + return + + suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS + + register_completer(parser.add_argument( + '--remote-stage', + metavar='STAGE', + default='prod', + help=suppress or 'remote stage to use: prod, dev', + ), complete_remote_stage) + + parser.add_argument( + '--remote-endpoint', + metavar='EP', + help=suppress or 'remote provisioning endpoint to use', + ) + + parser.add_argument( + '--remote-terminate', + metavar='T', + default=TerminateMode.NEVER, + type=TerminateMode, + action=EnumAction, + help=suppress or 'terminate the remote instance: %(choices)s (default: %(default)s)', + ) + + +def add_environment_remote( + exclusive_parser: argparse.ArgumentParser, + environments_parser: argparse.ArgumentParser, + target_mode: TargetMode, +) -> None: + """Add environment arguments for running in ansible-core-ci provisioned remote virtual machines.""" + if target_mode == TargetMode.POSIX_INTEGRATION: + remote_platforms = get_remote_platform_choices() + elif target_mode == TargetMode.SHELL: + remote_platforms = sorted(set(get_remote_platform_choices()) | set(get_windows_platform_choices())) + else: + remote_platforms = get_remote_platform_choices(True) + + suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS + + register_completer(exclusive_parser.add_argument( + '--remote', + metavar='NAME', + help=suppress or 'run from a remote instance', + ), functools.partial(complete_choices, remote_platforms)) + + environments_parser.add_argument( + '--remote-provider', + metavar='PR', + choices=REMOTE_PROVIDERS, + help=suppress or 'remote provider to use: %(choices)s', + ) + + environments_parser.add_argument( + '--remote-arch', + metavar='ARCH', + choices=REMOTE_ARCHITECTURES, + help=suppress or 'remote arch to use: %(choices)s', + ) + + +def complete_remote_stage(prefix: str, **_) -> list[str]: + """Return a list of supported stages matching the given prefix.""" + return [stage for stage in ('prod', 'dev') if stage.startswith(prefix)] + + +def complete_windows(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Return a list of supported Windows versions matching the given prefix, excluding versions already parsed from the command line.""" + return [i for i in get_windows_version_choices() if i.startswith(prefix) and (not parsed_args.windows or i not in parsed_args.windows)] + + +def complete_network_platform(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Return a list of supported network platforms matching the given prefix, excluding platforms already parsed from the command line.""" + images = sorted(filter_completion(network_completion())) + + return [i for i in images if i.startswith(prefix) and (not parsed_args.platform or i not in parsed_args.platform)] + + +def complete_network_platform_collection(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Return a list of supported network platforms matching the given prefix, excluding collection platforms already parsed from the command line.""" + left = prefix.split('=')[0] + images = sorted(set(image.platform for image in filter_completion(network_completion()).values())) + + return [i + '=' for i in images if i.startswith(left) and (not parsed_args.platform_collection or i not in [x[0] for x in parsed_args.platform_collection])] + + +def complete_network_platform_connection(prefix: str, parsed_args: argparse.Namespace, **_) -> list[str]: + """Return a list of supported network platforms matching the given prefix, excluding connection platforms already parsed from the command line.""" + left = prefix.split('=')[0] + images = sorted(set(image.platform for image in filter_completion(network_completion()).values())) + + return [i + '=' for i in images if i.startswith(left) and (not parsed_args.platform_connection or i not in [x[0] for x in parsed_args.platform_connection])] + + +def get_remote_platform_choices(controller: bool = False) -> list[str]: + """Return a list of supported remote platforms matching the given prefix.""" + return sorted(filter_completion(remote_completion(), controller_only=controller)) + + +def get_windows_platform_choices() -> list[str]: + """Return a list of supported Windows versions matching the given prefix.""" + return sorted(f'windows/{windows.version}' for windows in filter_completion(windows_completion()).values()) + + +def get_windows_version_choices() -> list[str]: + """Return a list of supported Windows versions.""" + return sorted(windows.version for windows in filter_completion(windows_completion()).values()) diff --git a/test/lib/ansible_test/_internal/cli/epilog.py b/test/lib/ansible_test/_internal/cli/epilog.py new file mode 100644 index 0000000..3800ff1 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/epilog.py @@ -0,0 +1,23 @@ +"""Argument parsing epilog generation.""" +from __future__ import annotations + +from .argparsing import ( + CompositeActionCompletionFinder, +) + +from ..data import ( + data_context, +) + + +def get_epilog(completer: CompositeActionCompletionFinder) -> str: + """Generate and return the epilog to use for help output.""" + if completer.enabled: + epilog = 'Tab completion available using the "argcomplete" python package.' + else: + epilog = 'Install the "argcomplete" python package to enable tab completion.' + + if data_context().content.unsupported: + epilog += '\n\n' + data_context().explain_working_directory() + + return epilog diff --git a/test/lib/ansible_test/_internal/cli/parsers/__init__.py b/test/lib/ansible_test/_internal/cli/parsers/__init__.py new file mode 100644 index 0000000..1aedf63 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/__init__.py @@ -0,0 +1,303 @@ +"""Composite argument parsers for ansible-test specific command-line arguments.""" +from __future__ import annotations + +import typing as t + +from ...constants import ( + SUPPORTED_PYTHON_VERSIONS, +) + +from ...ci import ( + get_ci_provider, +) + +from ...host_configs import ( + ControllerConfig, + NetworkConfig, + NetworkInventoryConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, +) + +from ..argparsing.parsers import ( + DocumentationState, + Parser, + ParserState, + TypeParser, +) + +from .value_parsers import ( + PythonParser, +) + +from .host_config_parsers import ( + ControllerParser, + DockerParser, + NetworkInventoryParser, + NetworkRemoteParser, + OriginParser, + PosixRemoteParser, + PosixSshParser, + WindowsInventoryParser, + WindowsRemoteParser, +) + + +from .base_argument_parsers import ( + ControllerNamespaceParser, + TargetNamespaceParser, + TargetsNamespaceParser, +) + + +class OriginControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is not supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return dict( + origin=OriginParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class DelegatedControllerParser(ControllerNamespaceParser, TypeParser): + """Composite argument parser for the controller when delegation is supported.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + origin=OriginParser(), + docker=DockerParser(controller=True), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=True), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = '--controller options:' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PosixTargetParser(TargetNamespaceParser, TypeParser): + """Composite argument parser for a POSIX target.""" + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = dict( + controller=ControllerParser(), + docker=DockerParser(controller=False), + ) + + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=PosixRemoteParser(controller=False), + ) + + parsers.update( + ssh=PosixSshParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class WindowsTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a Windows target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[WindowsConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=WindowsInventoryParser(), + ) + + if not targets or not any(isinstance(target, WindowsInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=WindowsRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class NetworkTargetParser(TargetsNamespaceParser, TypeParser): + """Composite argument parser for a network target.""" + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return True + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers(state.root_namespace.targets) + + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + return self.get_internal_parsers([]) + + def get_internal_parsers(self, targets: list[NetworkConfig]) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + parsers: dict[str, Parser] = {} + + if self.allow_inventory and not targets: + parsers.update( + inventory=NetworkInventoryParser(), + ) + + if not targets or not any(isinstance(target, NetworkInventoryConfig) for target in targets): + if get_ci_provider().supports_core_ci_auth(): + parsers.update( + remote=NetworkRemoteParser(), + ) + + return parsers + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '' # place this section before the sections created by the parsers below + state.sections[section] = '\n'.join([f' {name}:{parser.document(state)}' for name, parser in self.get_stateless_parsers().items()]) + + return None + + +class PythonTargetParser(TargetsNamespaceParser, Parser): + """Composite argument parser for a Python target.""" + def __init__(self, allow_venv: bool) -> None: + super().__init__() + + self.allow_venv = allow_venv + + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-python' + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + versions = list(SUPPORTED_PYTHON_VERSIONS) + + for target in state.root_namespace.targets or []: # type: PosixConfig + versions.remove(target.python.version) + + parser = PythonParser(versions, allow_venv=self.allow_venv, allow_default=True) + python = parser.parse(state) + + value = ControllerConfig(python=python) + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section = f'{self.option_name} options (choose one):' + + state.sections[section] = '\n'.join([ + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller', + f' {PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller', + ]) + + return None + + +class SanityPythonTargetParser(PythonTargetParser): + """Composite argument parser for a sanity Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=False) + + +class UnitsPythonTargetParser(PythonTargetParser): + """Composite argument parser for a units Python target.""" + def __init__(self) -> None: + super().__init__(allow_venv=True) + + +class PosixSshTargetParser(PosixTargetParser): + """Composite argument parser for a POSIX SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-posix' + + +class WindowsSshTargetParser(WindowsTargetParser): + """Composite argument parser for a Windows SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-windows' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True + + +class NetworkSshTargetParser(NetworkTargetParser): + """Composite argument parser for a network SSH target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target-network' + + @property + def allow_inventory(self) -> bool: + """True if inventory is allowed, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True diff --git a/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py new file mode 100644 index 0000000..aac7a69 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/base_argument_parsers.py @@ -0,0 +1,73 @@ +"""Base classes for the primary parsers for composite command line arguments.""" +from __future__ import annotations + +import abc +import typing as t + +from ..argparsing.parsers import ( + CompletionError, + NamespaceParser, + ParserState, +) + + +class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for controller namespace parsers.""" + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'controller' + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + if state.root_namespace.targets: + raise ControllerRequiredFirstError() + + return super().parse(state) + + +class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for target namespace parsers involving a single target.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target' + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'targets' + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return True + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return True + + +class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta): + """Base class for controller namespace parsers involving multiple targets.""" + @property + def option_name(self) -> str: + """The option name used for this parser.""" + return '--target' + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return 'targets' + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return True + + +class ControllerRequiredFirstError(CompletionError): + """Exception raised when controller and target options are specified out-of-order.""" + def __init__(self) -> None: + super().__init__('The `--controller` option must be specified before `--target` option(s).') diff --git a/test/lib/ansible_test/_internal/cli/parsers/helpers.py b/test/lib/ansible_test/_internal/cli/parsers/helpers.py new file mode 100644 index 0000000..836a893 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/helpers.py @@ -0,0 +1,57 @@ +"""Helper functions for composite parsers.""" +from __future__ import annotations + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...completion import ( + docker_completion, + remote_completion, + filter_completion, +) + +from ...host_configs import ( + DockerConfig, + HostConfig, + PosixRemoteConfig, +) + + +def get_docker_pythons(name: str, controller: bool, strict: bool) -> list[str]: + """Return a list of docker instance Python versions supported by the specified host config.""" + image_config = filter_completion(docker_completion()).get(name) + available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS + + if not image_config: + return [] if strict else list(available_pythons) + + supported_pythons = [python for python in image_config.supported_pythons if python in available_pythons] + + return supported_pythons + + +def get_remote_pythons(name: str, controller: bool, strict: bool) -> list[str]: + """Return a list of remote instance Python versions supported by the specified host config.""" + platform_config = filter_completion(remote_completion()).get(name) + available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS + + if not platform_config: + return [] if strict else list(available_pythons) + + supported_pythons = [python for python in platform_config.supported_pythons if python in available_pythons] + + return supported_pythons + + +def get_controller_pythons(controller_config: HostConfig, strict: bool) -> list[str]: + """Return a list of controller Python versions supported by the specified host config.""" + if isinstance(controller_config, DockerConfig): + pythons = get_docker_pythons(controller_config.name, False, strict) + elif isinstance(controller_config, PosixRemoteConfig): + pythons = get_remote_pythons(controller_config.name, False, strict) + else: + pythons = list(SUPPORTED_PYTHON_VERSIONS) + + return pythons diff --git a/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py new file mode 100644 index 0000000..ee6f146 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py @@ -0,0 +1,310 @@ +"""Composite parsers for the various types of hosts.""" +from __future__ import annotations + +import typing as t + +from ...completion import ( + docker_completion, + network_completion, + remote_completion, + windows_completion, + filter_completion, +) + +from ...host_configs import ( + ControllerConfig, + DockerConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixRemoteConfig, + PosixSshConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ..compat import ( + get_fallback_remote_controller, +) + +from ..argparsing.parsers import ( + ChoicesParser, + DocumentationState, + FileParser, + MatchConditions, + NamespaceWrappedParser, + PairParser, + Parser, + ParserError, + ParserState, +) + +from .value_parsers import ( + PlatformParser, + SshConnectionParser, +) + +from .key_value_parsers import ( + ControllerKeyValueParser, + DockerKeyValueParser, + EmptyKeyValueParser, + NetworkRemoteKeyValueParser, + OriginKeyValueParser, + PosixRemoteKeyValueParser, + PosixSshKeyValueParser, + WindowsRemoteKeyValueParser, +) + +from .helpers import ( + get_docker_pythons, + get_remote_pythons, +) + + +class OriginParser(Parser): + """Composite argument parser for the origin.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = OriginConfig() + + state.set_namespace(namespace) + + parser = OriginKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return OriginKeyValueParser().document(state) + + +class ControllerParser(Parser): + """Composite argument parser for the controller.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = ControllerConfig() + + state.set_namespace(namespace) + + parser = ControllerKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return ControllerKeyValueParser().document(state) + + +class DockerParser(PairParser): + """Composite argument parser for a docker host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return DockerConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', ChoicesParser(list(filter_completion(docker_completion(), controller_only=self.controller)), + conditions=MatchConditions.CHOICE | MatchConditions.ANY)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return DockerKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: DockerConfig = super().parse(state) + + if not value.python and not get_docker_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for docker image: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = 'default' + content = '\n'.join([f' {image} ({", ".join(get_docker_pythons(image, self.controller, False))})' + for image, item in filter_completion(docker_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {image} # python must be specified for custom images', + ]) + + state.sections[f'{"controller" if self.controller else "target"} docker images and supported python version (choose one):'] = content + + return f'{{image}}[,{DockerKeyValueParser(default, self.controller).document(state)}]' + + +class PosixRemoteParser(PairParser): + """Composite argument parser for a POSIX remote host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', PlatformParser(list(filter_completion(remote_completion(), controller_only=self.controller)))) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixRemoteKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: PosixRemoteConfig = super().parse(state) + + if not value.python and not get_remote_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for remote: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = get_fallback_remote_controller() + content = '\n'.join([f' {name} ({", ".join(get_remote_pythons(name, self.controller, False))})' + for name, item in filter_completion(remote_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # python must be specified for unknown systems', + ]) + + state.sections[f'{"controller" if self.controller else "target"} remote systems and supported python versions (choose one):'] = content + + return f'{{system}}[,{PosixRemoteKeyValueParser(default, self.controller).document(state)}]' + + +class WindowsRemoteParser(PairParser): + """Composite argument parser for a Windows remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(windows_completion())) + + for target in state.root_namespace.targets or []: # type: WindowsRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return WindowsRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(windows_completion()).items()]) + + content += '\n'.join([ + '', + ' windows/{version} # use an unknown windows version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{WindowsRemoteKeyValueParser().document(state)}]' + + +class NetworkRemoteParser(PairParser): + """Composite argument parser for a network remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(network_completion())) + + for target in state.root_namespace.targets or []: # type: NetworkRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return NetworkRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(network_completion()).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # use an unknown platform and version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{NetworkRemoteKeyValueParser().document(state)}]' + + +class WindowsInventoryParser(PairParser): + """Composite argument parser for a Windows inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class NetworkInventoryParser(PairParser): + """Composite argument parser for a network inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class PosixSshParser(PairParser): + """Composite argument parser for a POSIX SSH host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixSshConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return SshConnectionParser() + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixSshKeyValueParser() + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return True + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return f'{SshConnectionParser().document(state)}[,{PosixSshKeyValueParser().document(state)}]' diff --git a/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py new file mode 100644 index 0000000..049b71e --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/key_value_parsers.py @@ -0,0 +1,239 @@ +"""Composite argument key-value parsers used by other parsers.""" +from __future__ import annotations + +import typing as t + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_PROVIDERS, + SECCOMP_CHOICES, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...completion import ( + AuditMode, + CGroupVersion, +) + +from ...util import ( + REMOTE_ARCHITECTURES, +) + +from ...host_configs import ( + OriginConfig, +) + +from ...become import ( + SUPPORTED_BECOME_METHODS, +) + +from ..argparsing.parsers import ( + AnyParser, + BooleanParser, + ChoicesParser, + DocumentationState, + EnumValueChoicesParser, + IntegerParser, + KeyValueParser, + Parser, + ParserState, +) + +from .value_parsers import ( + PythonParser, +) + +from .helpers import ( + get_controller_pythons, + get_remote_pythons, + get_docker_pythons, +) + + +class OriginKeyValueParser(KeyValueParser): + """Composite argument parser for origin key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + versions = CONTROLLER_PYTHON_VERSIONS + + return dict( + python=PythonParser(versions=versions, allow_venv=True, allow_default=True), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=CONTROLLER_PYTHON_VERSIONS, allow_venv=True, allow_default=True) + + section_name = 'origin options' + + state.sections[f'controller {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}} # default' + + +class ControllerKeyValueParser(KeyValueParser): + """Composite argument parser for controller key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + versions = get_controller_pythons(state.root_namespace.controller, False) + allow_default = bool(get_controller_pythons(state.root_namespace.controller, True)) + allow_venv = isinstance(state.root_namespace.controller, OriginConfig) or not state.root_namespace.controller + + return dict( + python=PythonParser(versions=versions, allow_venv=allow_venv, allow_default=allow_default), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'controller options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=True).document(state)} # non-origin controller', + f' python={PythonParser(SUPPORTED_PYTHON_VERSIONS, allow_venv=True, allow_default=True).document(state)} # origin controller', + ]) + + return f'{{{section_name}}} # default' + + +class DockerKeyValueParser(KeyValueParser): + """Composite argument parser for docker key/value pairs.""" + def __init__(self, image: str, controller: bool) -> None: + self.controller = controller + self.versions = get_docker_pythons(image, controller, False) + self.allow_default = bool(get_docker_pythons(image, controller, True)) + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default), + seccomp=ChoicesParser(SECCOMP_CHOICES), + cgroup=EnumValueChoicesParser(CGroupVersion), + audit=EnumValueChoicesParser(AuditMode), + privileged=BooleanParser(), + memory=IntegerParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default) + + section_name = 'docker options' + + state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + f' seccomp={ChoicesParser(SECCOMP_CHOICES).document(state)}', + f' cgroup={EnumValueChoicesParser(CGroupVersion).document(state)}', + f' audit={EnumValueChoicesParser(AuditMode).document(state)}', + f' privileged={BooleanParser().document(state)}', + f' memory={IntegerParser().document(state)} # bytes', + ]) + + return f'{{{section_name}}}' + + +class PosixRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for POSIX remote key/value pairs.""" + def __init__(self, name: str, controller: bool) -> None: + self.controller = controller + self.versions = get_remote_pythons(name, controller, False) + self.allow_default = bool(get_remote_pythons(name, controller, True)) + + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + become=ChoicesParser(list(SUPPORTED_BECOME_METHODS)), + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + python=PythonParser(versions=self.versions, allow_venv=False, allow_default=self.allow_default), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=[], allow_venv=False, allow_default=self.allow_default) + + section_name = 'remote options' + + state.sections[f'{"controller" if self.controller else "target"} {section_name} (comma separated):'] = '\n'.join([ + f' become={ChoicesParser(list(SUPPORTED_BECOME_METHODS)).document(state)}', + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}}' + + +class WindowsRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for Windows remote key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'remote options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + ]) + + return f'{{{section_name}}}' + + +class NetworkRemoteKeyValueParser(KeyValueParser): + """Composite argument parser for network remote key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + provider=ChoicesParser(REMOTE_PROVIDERS), + arch=ChoicesParser(REMOTE_ARCHITECTURES), + collection=AnyParser(), + connection=AnyParser(), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + section_name = 'remote options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' provider={ChoicesParser(REMOTE_PROVIDERS).document(state)}', + f' arch={ChoicesParser(REMOTE_ARCHITECTURES).document(state)}', + ' collection={collection}', + ' connection={connection}', + ]) + + return f'{{{section_name}}}' + + +class PosixSshKeyValueParser(KeyValueParser): + """Composite argument parser for POSIX SSH host key/value pairs.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return dict( + python=PythonParser(versions=list(SUPPORTED_PYTHON_VERSIONS), allow_venv=False, allow_default=False), + ) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + python_parser = PythonParser(versions=SUPPORTED_PYTHON_VERSIONS, allow_venv=False, allow_default=False) + + section_name = 'ssh options' + + state.sections[f'target {section_name} (comma separated):'] = '\n'.join([ + f' python={python_parser.document(state)}', + ]) + + return f'{{{section_name}}}' + + +class EmptyKeyValueParser(KeyValueParser): + """Composite argument parser when a key/value parser is required but there are no keys available.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + return {} diff --git a/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py new file mode 100644 index 0000000..9453b76 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/value_parsers.py @@ -0,0 +1,179 @@ +"""Composite argument value parsers used by other parsers.""" +from __future__ import annotations + +import collections.abc as c +import typing as t + +from ...host_configs import ( + NativePythonConfig, + PythonConfig, + VirtualPythonConfig, +) + +from ..argparsing.parsers import ( + AbsolutePathParser, + AnyParser, + ChoicesParser, + DocumentationState, + IntegerParser, + MatchConditions, + Parser, + ParserError, + ParserState, + ParserBoundary, +) + + +class PythonParser(Parser): + """ + Composite argument parser for Python versions, with support for specifying paths and using virtual environments. + + Allowed formats: + + {version} + venv/{version} + venv/system-site-packages/{version} + + The `{version}` has two possible formats: + + X.Y + X.Y@{path} + + Where `X.Y` is the Python major and minor version number and `{path}` is an absolute path with one of the following formats: + + /path/to/python + /path/to/python/directory/ + + When a trailing slash is present, it is considered a directory, and `python{version}` will be appended to it automatically. + + The default path depends on the context: + + - Known docker/remote environments can declare their own path. + - The origin host uses `sys.executable` if `{version}` matches the current version in `sys.version_info`. + - The origin host (as a controller or target) use the `$PATH` environment variable to find `python{version}`. + - As a fallback/default, the path `/usr/bin/python{version}` is used. + + NOTE: The Python path determines where to find the Python interpreter. + In the case of an ansible-test managed virtual environment, that Python interpreter will be used to create the virtual environment. + So the path given will not be the one actually used for the controller or target. + + Known docker/remote environments limit the available Python versions to configured values known to be valid. + The origin host and unknown environments assume all relevant Python versions are available. + """ + def __init__(self, + versions: c.Sequence[str], + *, + allow_default: bool, + allow_venv: bool, + ): + version_choices = list(versions) + + if allow_default: + version_choices.append('default') + + first_choices = list(version_choices) + + if allow_venv: + first_choices.append('venv/') + + venv_choices = list(version_choices) + ['system-site-packages/'] + + self.versions = versions + self.allow_default = allow_default + self.allow_venv = allow_venv + self.version_choices = version_choices + self.first_choices = first_choices + self.venv_choices = venv_choices + self.venv_choices = venv_choices + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + boundary: ParserBoundary + + with state.delimit('@/', required=False) as boundary: + version = ChoicesParser(self.first_choices).parse(state) + + python: PythonConfig + + if version == 'venv': + with state.delimit('@/', required=False) as boundary: + version = ChoicesParser(self.venv_choices).parse(state) + + if version == 'system-site-packages': + system_site_packages = True + + with state.delimit('@', required=False) as boundary: + version = ChoicesParser(self.version_choices).parse(state) + else: + system_site_packages = False + + python = VirtualPythonConfig(version=version, system_site_packages=system_site_packages) + else: + python = NativePythonConfig(version=version) + + if boundary.match == '@': + # FUTURE: For OriginConfig or ControllerConfig->OriginConfig the path could be validated with an absolute path parser (file or directory). + python.path = AbsolutePathParser().parse(state) + + return python + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + + docs = '[venv/[system-site-packages/]]' if self.allow_venv else '' + + if self.versions: + docs += '|'.join(self.version_choices) + else: + docs += '{X.Y}' + + docs += '[@{path|dir/}]' + + return docs + + +class PlatformParser(ChoicesParser): + """Composite argument parser for "{platform}/{version}" formatted choices.""" + def __init__(self, choices: list[str]) -> None: + super().__init__(choices, conditions=MatchConditions.CHOICE | MatchConditions.ANY) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + + if len(value.split('/')) != 2: + raise ParserError(f'invalid platform format: {value}') + + return value + + +class SshConnectionParser(Parser): + """ + Composite argument parser for connecting to a host using SSH. + Format: user@host[:port] + """ + EXPECTED_FORMAT = '{user}@{host}[:{port}]' + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + + with state.delimit('@'): + user = AnyParser(no_match_message=f'Expected {{user}} from: {self.EXPECTED_FORMAT}').parse(state) + + setattr(namespace, 'user', user) + + with state.delimit(':', required=False) as colon: # type: ParserBoundary + host = AnyParser(no_match_message=f'Expected {{host}} from: {self.EXPECTED_FORMAT}').parse(state) + + setattr(namespace, 'host', host) + + if colon.match: + port = IntegerParser(65535).parse(state) + setattr(namespace, 'port', port) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return self.EXPECTED_FORMAT |