diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/lib/ansible_test/_internal/cli/argparsing/parsers.py | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream.tar.xz ansible-core-upstream.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/argparsing/parsers.py')
-rw-r--r-- | test/lib/ansible_test/_internal/cli/argparsing/parsers.py | 597 |
1 files changed, 597 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/parsers.py b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py new file mode 100644 index 0000000..d07e03c --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py @@ -0,0 +1,597 @@ +"""General purpose composite argument parsing and completion.""" +from __future__ import annotations + +import abc +import collections.abc as c +import contextlib +import dataclasses +import enum +import os +import re +import typing as t + +# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior. +# +# Recommended characters for assignment and/or continuation: `/` `:` `=` +# +# The recommended assignment_character list is due to how argcomplete handles continuation characters. +# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558 + +PAIR_DELIMITER = ',' +ASSIGNMENT_DELIMITER = '=' +PATH_DELIMITER = '/' + + +# This class was originally frozen. However, that causes issues when running under Python 3.11. +# See: https://github.com/python/cpython/issues/99856 +@dataclasses.dataclass +class Completion(Exception): + """Base class for argument completion results.""" + + +@dataclasses.dataclass +class CompletionUnavailable(Completion): + """Argument completion unavailable.""" + message: str = 'No completions available.' + + +@dataclasses.dataclass +class CompletionError(Completion): + """Argument completion error.""" + message: t.Optional[str] = None + + +@dataclasses.dataclass +class CompletionSuccess(Completion): + """Successful argument completion result.""" + list_mode: bool + consumed: str + continuation: str + matches: list[str] = dataclasses.field(default_factory=list) + + @property + def preserve(self) -> bool: + """ + True if argcomplete should not mangle completion values, otherwise False. + Only used when more than one completion exists to avoid overwriting the word undergoing completion. + """ + return len(self.matches) > 1 and self.list_mode + + @property + def completions(self) -> list[str]: + """List of completion values to return to argcomplete.""" + completions = self.matches + continuation = '' if self.list_mode else self.continuation + + if not self.preserve: + # include the existing prefix to avoid rewriting the word undergoing completion + completions = [f'{self.consumed}{completion}{continuation}' for completion in completions] + + return completions + + +class ParserMode(enum.Enum): + """Mode the parser is operating in.""" + PARSE = enum.auto() + COMPLETE = enum.auto() + LIST = enum.auto() + + +class ParserError(Exception): + """Base class for all parsing exceptions.""" + + +@dataclasses.dataclass +class ParserBoundary: + """Boundary details for parsing composite input.""" + delimiters: str + required: bool + match: t.Optional[str] = None + ready: bool = True + + +@dataclasses.dataclass +class ParserState: + """State of the composite argument parser.""" + mode: ParserMode + remainder: str = '' + consumed: str = '' + boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list) + namespaces: list[t.Any] = dataclasses.field(default_factory=list) + parts: list[str] = dataclasses.field(default_factory=list) + + @property + def incomplete(self) -> bool: + """True if parsing is incomplete (unparsed input remains), otherwise False.""" + return self.remainder is not None + + def match(self, value: str, choices: list[str]) -> bool: + """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False.""" + if self.current_boundary: + delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match + else: + delimiters, delimiter = '', None + + for choice in choices: + if choice.rstrip(delimiters) == choice: + # choice is not delimited + if value == choice: + return True # value matched + else: + # choice is delimited + if f'{value}{delimiter}' == choice: + return True # value and delimiter matched + + return False + + def read(self) -> str: + """Read and return the next input segment, taking into account parsing boundaries.""" + delimiters = "".join(boundary.delimiters for boundary in self.boundaries) + + if delimiters: + pattern = '([' + re.escape(delimiters) + '])' + regex = re.compile(pattern) + parts = regex.split(self.remainder, 1) + else: + parts = [self.remainder] + + if len(parts) > 1: + value, delimiter, remainder = parts + else: + value, delimiter, remainder = parts[0], None, None + + for boundary in reversed(self.boundaries): + if delimiter and delimiter in boundary.delimiters: + boundary.match = delimiter + self.consumed += value + delimiter + break + + boundary.match = None + boundary.ready = False + + if boundary.required: + break + + self.remainder = remainder + + return value + + @property + def root_namespace(self) -> t.Any: + """THe root namespace.""" + return self.namespaces[0] + + @property + def current_namespace(self) -> t.Any: + """The current namespace.""" + return self.namespaces[-1] + + @property + def current_boundary(self) -> t.Optional[ParserBoundary]: + """The current parser boundary, if any, otherwise None.""" + return self.boundaries[-1] if self.boundaries else None + + def set_namespace(self, namespace: t.Any) -> None: + """Set the current namespace.""" + self.namespaces.append(namespace) + + @contextlib.contextmanager + def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]: + """Context manager for delimiting parsing of input.""" + boundary = ParserBoundary(delimiters=delimiters, required=required) + + self.boundaries.append(boundary) + + try: + yield boundary + finally: + self.boundaries.pop() + + if boundary.required and not boundary.match: + raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead') + + +@dataclasses.dataclass +class DocumentationState: + """State of the composite argument parser's generated documentation.""" + sections: dict[str, str] = dataclasses.field(default_factory=dict) + + +class Parser(metaclass=abc.ABCMeta): + """Base class for all composite argument parsers.""" + @abc.abstractmethod + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + raise Exception(f'Undocumented parser: {type(self)}') + + +class MatchConditions(enum.Flag): + """Acceptable condition(s) for matching user input to available choices.""" + CHOICE = enum.auto() + """Match any choice.""" + ANY = enum.auto() + """Match any non-empty string.""" + NOTHING = enum.auto() + """Match an empty string which is not followed by a boundary match.""" + + +class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which use a list of choices that can be generated during completion.""" + def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.conditions = conditions + + @abc.abstractmethod + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + + def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + return CompletionUnavailable() + + def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument + """Return an instance of ParserError when parsing fails and no choices are available.""" + return ParserError('No choices available.') + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = state.read() + choices = self.get_choices(value) + + if state.mode == ParserMode.PARSE or state.incomplete: + if self.conditions & MatchConditions.CHOICE and state.match(value, choices): + return value + + if self.conditions & MatchConditions.ANY and value: + return value + + if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match: + return value + + if state.mode == ParserMode.PARSE: + if choices: + raise ParserError(f'"{value}" not in: {", ".join(choices)}') + + raise self.no_choices_available(value) + + raise CompletionUnavailable() + + matches = [choice for choice in choices if choice.startswith(value)] + + if not matches: + raise self.no_completion_match(value) + + continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else '' + + raise CompletionSuccess( + list_mode=state.mode == ParserMode.LIST, + consumed=state.consumed, + continuation=continuation, + matches=matches, + ) + + +class ChoicesParser(DynamicChoicesParser): + """Composite argument parser which relies on a static list of choices.""" + def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.choices = choices + + super().__init__(conditions=conditions) + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + return self.choices + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '|'.join(self.choices) + + +class EnumValueChoicesParser(ChoicesParser): + """Composite argument parser which relies on a static list of choices derived from the values of an enum.""" + def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.enum_type = enum_type + + super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return self.enum_type(value) + + +class IntegerParser(DynamicChoicesParser): + """Composite argument parser for integers.""" + PATTERN = re.compile('^[1-9][0-9]*$') + + def __init__(self, maximum: t.Optional[int] = None) -> None: + self.maximum = maximum + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + if not value: + numbers = list(range(1, 10)) + elif self.PATTERN.search(value): + int_prefix = int(value) + base = int_prefix * 10 + numbers = [int_prefix] + [base + i for i in range(0, 10)] + else: + numbers = [] + + # NOTE: the minimum is currently fixed at 1 + + if self.maximum is not None: + numbers = [n for n in numbers if n <= self.maximum] + + return [str(n) for n in numbers] + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return int(value) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{integer}' + + +class BooleanParser(ChoicesParser): + """Composite argument parser for boolean (yes/no) values.""" + def __init__(self) -> None: + super().__init__(['yes', 'no']) + + def parse(self, state: ParserState) -> bool: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return value == 'yes' + + +class AnyParser(ChoicesParser): + """Composite argument parser which accepts any input value.""" + def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None: + self.no_match_message = no_match_message + + conditions = MatchConditions.ANY + + if nothing: + conditions |= MatchConditions.NOTHING + + super().__init__([], conditions=conditions) + + def no_completion_match(self, value: str) -> CompletionUnavailable: + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + if self.no_match_message: + return CompletionUnavailable(message=self.no_match_message) + + return super().no_completion_match(value) + + def no_choices_available(self, value: str) -> ParserError: + """Return an instance of ParserError when parsing fails and no choices are available.""" + if self.no_match_message: + return ParserError(self.no_match_message) + + return super().no_choices_available(value) + + +class RelativePathNameParser(DynamicChoicesParser): + """Composite argument parser for relative path names.""" + RELATIVE_NAMES = ['.', '..'] + + def __init__(self, choices: list[str]) -> None: + self.choices = choices + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + choices = list(self.choices) + + if value in self.RELATIVE_NAMES: + # complete relative names, but avoid suggesting them unless the current name is relative + # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../") + choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES) + + return choices + + +class FileParser(Parser): + """Composite argument parser for absolute or relative file paths.""" + def parse(self, state: ParserState) -> str: + """Parse the input from the given state and return the result.""" + if state.mode == ParserMode.PARSE: + path = AnyParser().parse(state) + + if not os.path.isfile(path): + raise ParserError(f'Not a file: {path}') + else: + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + directory = path or '.' + + try: + with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry] + choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan] + except OSError: + choices = [] + + if not path: + choices.append(PATH_DELIMITER) # allow absolute paths + choices.append('../') # suggest relative paths + + part = RelativePathNameParser(choices).parse(state) + path += f'{part}{boundary.match or ""}' + + return path + + +class AbsolutePathParser(Parser): + """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + if path: + path += AnyParser(nothing=True).parse(state) + else: + path += ChoicesParser([PATH_DELIMITER]).parse(state) + + path += (boundary.match or '') + + return path + + +class NamespaceParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers that store their results in a namespace.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + current = getattr(namespace, self.dest) + + if current and self.limit_one: + if state.mode == ParserMode.PARSE: + raise ParserError('Option cannot be specified more than once.') + + raise CompletionError('Option cannot be specified more than once.') + + value = self.get_value(state) + + if self.use_list: + if not current: + current = [] + setattr(namespace, self.dest, current) + + current.append(value) + else: + setattr(namespace, self.dest, value) + + return value + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return super().parse(state) + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return not self.use_list + + @property + @abc.abstractmethod + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + + +class NamespaceWrappedParser(NamespaceParser): + """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace.""" + def __init__(self, dest: str, parser: Parser) -> None: + self._dest = dest + self.parser = parser + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return self.parser.parse(state) + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return self._dest + + +class KeyValueParser(Parser, metaclass=abc.ABCMeta): + """Base class for key/value composite argument parsers.""" + @abc.abstractmethod + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + parsers = self.get_parsers(state) + keys = list(parsers) + + with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary + while pair.ready: + with state.delimit(ASSIGNMENT_DELIMITER): + key = ChoicesParser(keys).parse(state) + + value = parsers[key].parse(state) + + setattr(namespace, key, value) + + keys.remove(key) + + return namespace + + +class PairParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = self.create_namespace() + + state.set_namespace(namespace) + + with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary + choice = self.get_left_parser(state).parse(state) + + if boundary.match: + self.get_right_parser(choice).parse(state) + + return namespace + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return False + + @property + def delimiter(self) -> str: + """The delimiter to use between the left and right parser.""" + return PAIR_DELIMITER + + @abc.abstractmethod + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + + @abc.abstractmethod + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + + @abc.abstractmethod + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + + +class TypeParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument + """Return a dictionary of type names and type parsers.""" + return self.get_stateless_parsers() + + @abc.abstractmethod + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + parsers = self.get_parsers(state) + + with state.delimit(':'): + key = ChoicesParser(list(parsers)).parse(state) + + value = parsers[key].parse(state) + + return value |