diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/argparsing')
4 files changed, 1004 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/__init__.py b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py new file mode 100644 index 0000000..540cf55 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py @@ -0,0 +1,265 @@ +"""Completion finder which brings together custom options and completion logic.""" +from __future__ import annotations + +import abc +import argparse +import os +import re +import typing as t + +from .argcompletion import ( + OptionCompletionFinder, + get_comp_type, + register_safe_action, + warn, +) + +from .parsers import ( + Completion, + CompletionError, + CompletionSuccess, + CompletionUnavailable, + DocumentationState, + NamespaceParser, + Parser, + ParserError, + ParserMode, + ParserState, +) + + +class RegisteredCompletionFinder(OptionCompletionFinder): + """ + Custom option completion finder for argcomplete which allows completion results to be registered. + These registered completions, if provided, are used to filter the final completion results. + This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221 + """ + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.registered_completions: t.Optional[list[str]] = None + + def completer( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + **kwargs, + ) -> list[str]: + """ + Return a list of completions for the specified prefix and action. + Use this as the completer function for argcomplete. + """ + kwargs.clear() + del kwargs + + completions = self.get_completions(prefix, action, parsed_args) + + if action.nargs and not isinstance(action.nargs, int): + # prevent argcomplete from including unrelated arguments in the completion results + self.registered_completions = completions + + return completions + + @abc.abstractmethod + def get_completions( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + ) -> list[str]: + """ + Return a list of completions for the specified prefix and action. + Called by the complete function. + """ + + def quote_completions(self, completions, cword_prequote, last_wordbreak_pos): + """Modify completion results before returning them.""" + if self.registered_completions is not None: + # If one of the completion handlers registered their results, only allow those exact results to be returned. + # This prevents argcomplete from adding results from other completers when they are known to be invalid. + allowed_completions = set(self.registered_completions) + completions = [completion for completion in completions if completion in allowed_completions] + + return super().quote_completions(completions, cword_prequote, last_wordbreak_pos) + + +class CompositeAction(argparse.Action, metaclass=abc.ABCMeta): + """Base class for actions that parse composite arguments.""" + documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {} + + def __init__( + self, + *args, + **kwargs, + ): + self.definition = self.create_parser() + self.documentation_state[type(self)] = documentation_state = DocumentationState() + self.definition.document(documentation_state) + + kwargs.update(dest=self.definition.dest) + + super().__init__(*args, **kwargs) + + register_safe_action(type(self)) + + @abc.abstractmethod + def create_parser(self) -> NamespaceParser: + """Return a namespace parser to parse the argument associated with this action.""" + + def __call__( + self, + parser, + namespace, + values, + option_string=None, + ): + state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values) + + try: + self.definition.parse(state) + except ParserError as ex: + error = str(ex) + except CompletionError as ex: + error = ex.message + else: + return + + if get_comp_type(): + # FUTURE: It may be possible to enhance error handling by surfacing this error message during downstream completion. + return # ignore parse errors during completion to avoid breaking downstream completion + + raise argparse.ArgumentError(self, error) + + +class CompositeActionCompletionFinder(RegisteredCompletionFinder): + """Completion finder with support for composite argument parsing.""" + def get_completions( + self, + prefix: str, + action: argparse.Action, + parsed_args: argparse.Namespace, + ) -> list[str]: + """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed.""" + assert isinstance(action, CompositeAction) + + state = ParserState( + mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE, + remainder=prefix, + namespaces=[parsed_args], + ) + + answer = complete(action.definition, state) + + completions = [] + + if isinstance(answer, CompletionSuccess): + self.disable_completion_mangling = answer.preserve + completions = answer.completions + + if isinstance(answer, CompletionError): + warn(answer.message) + + return completions + + +def detect_file_listing(value: str, mode: ParserMode) -> bool: + """ + Return True if Bash will show a file listing and redraw the prompt, otherwise return False. + + If there are no list results, a file listing will be shown if the value after the last `=` or `:` character: + + - is empty + - matches a full path + - matches a partial path + + Otherwise Bash will play the bell sound and display nothing. + + see: https://github.com/kislyuk/argcomplete/issues/328 + see: https://github.com/kislyuk/argcomplete/pull/284 + """ + listing = False + + if mode == ParserMode.LIST: + right = re.split('[=:]', value)[-1] + listing = not right or os.path.exists(right) + + if not listing: + directory = os.path.dirname(right) + + # noinspection PyBroadException + try: + filenames = os.listdir(directory or '.') + except Exception: # pylint: disable=broad-except + pass + else: + listing = any(filename.startswith(right) for filename in filenames) + + return listing + + +def detect_false_file_completion(value: str, mode: ParserMode) -> bool: + """ + Return True if Bash will provide an incorrect file completion, otherwise return False. + + If there are no completion results, a filename will be automatically completed if the value after the last `=` or `:` character: + + - matches exactly one partial path + + Otherwise Bash will play the bell sound and display nothing. + + see: https://github.com/kislyuk/argcomplete/issues/328 + see: https://github.com/kislyuk/argcomplete/pull/284 + """ + completion = False + + if mode == ParserMode.COMPLETE: + completion = True + + right = re.split('[=:]', value)[-1] + directory, prefix = os.path.split(right) + + # noinspection PyBroadException + try: + filenames = os.listdir(directory or '.') + except Exception: # pylint: disable=broad-except + pass + else: + matches = [filename for filename in filenames if filename.startswith(prefix)] + completion = len(matches) == 1 + + return completion + + +def complete( + completer: Parser, + state: ParserState, +) -> Completion: + """Perform argument completion using the given completer and return the completion result.""" + value = state.remainder + + answer: Completion + + try: + completer.parse(state) + raise ParserError('completion expected') + except CompletionUnavailable as ex: + if detect_file_listing(value, state.mode): + # Displaying a warning before the file listing informs the user it is invalid. Bash will redraw the prompt after the list. + # If the file listing is not shown, a warning could be helpful, but would introduce noise on the terminal since the prompt is not redrawn. + answer = CompletionError(ex.message) + elif detect_false_file_completion(value, state.mode): + # When the current prefix provides no matches, but matches files a single file on disk, Bash will perform an incorrect completion. + # Returning multiple invalid matches instead of no matches will prevent Bash from using its own completion logic in this case. + answer = CompletionSuccess( + list_mode=True, # abuse list mode to enable preservation of the literal results + consumed='', + continuation='', + matches=['completion', 'invalid'] + ) + else: + answer = ex + except Completion as ex: + answer = ex + + return answer diff --git a/test/lib/ansible_test/_internal/cli/argparsing/actions.py b/test/lib/ansible_test/_internal/cli/argparsing/actions.py new file mode 100644 index 0000000..2bcf982 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/actions.py @@ -0,0 +1,18 @@ +"""Actions for argparse.""" +from __future__ import annotations + +import argparse +import enum +import typing as t + + +class EnumAction(argparse.Action): + """Parse an enum using the lowercase enum names.""" + def __init__(self, **kwargs: t.Any) -> None: + self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None) + kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type)) + super().__init__(**kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + value = self.enum_type[values.upper()] + setattr(namespace, self.dest, value) diff --git a/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py new file mode 100644 index 0000000..cf5776d --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py @@ -0,0 +1,124 @@ +"""Wrapper around argcomplete providing bug fixes and additional features.""" +from __future__ import annotations + +import argparse +import enum +import os +import typing as t + + +class Substitute: + """Substitute for missing class which accepts all arguments.""" + def __init__(self, *args, **kwargs) -> None: + pass + + +try: + import argcomplete + + from argcomplete import ( + CompletionFinder, + default_validator, + ) + + warn = argcomplete.warn # pylint: disable=invalid-name +except ImportError: + argcomplete = None + + CompletionFinder = Substitute + default_validator = Substitute # pylint: disable=invalid-name + warn = Substitute # pylint: disable=invalid-name + + +class CompType(enum.Enum): + """ + Bash COMP_TYPE argument completion types. + For documentation, see: https://www.gnu.org/software/bash/manual/html_node/Bash-Variables.html#index-COMP_005fTYPE + """ + COMPLETION = '\t' + """ + Standard completion, typically triggered by a single tab. + """ + MENU_COMPLETION = '%' + """ + Menu completion, which cycles through each completion instead of showing a list. + For help using this feature, see: https://stackoverflow.com/questions/12044574/getting-complete-and-menu-complete-to-work-together + """ + LIST = '?' + """ + Standard list, typically triggered by a double tab. + """ + LIST_AMBIGUOUS = '!' + """ + Listing with `show-all-if-ambiguous` set. + For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dambiguous + For additional details, see: https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type + """ + LIST_UNMODIFIED = '@' + """ + Listing with `show-all-if-unmodified` set. + For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dunmodified + For additional details, see: : https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type + """ + + @property + def list_mode(self) -> bool: + """True if completion is running in list mode, otherwise False.""" + return self in (CompType.LIST, CompType.LIST_AMBIGUOUS, CompType.LIST_UNMODIFIED) + + +def register_safe_action(action_type: t.Type[argparse.Action]) -> None: + """Register the given action as a safe action for argcomplete to use during completion if it is not already registered.""" + if argcomplete and action_type not in argcomplete.safe_actions: + argcomplete.safe_actions += (action_type,) + + +def get_comp_type() -> t.Optional[CompType]: + """Parse the COMP_TYPE environment variable (if present) and return the associated CompType enum value.""" + value = os.environ.get('COMP_TYPE') + comp_type = CompType(chr(int(value))) if value else None + return comp_type + + +class OptionCompletionFinder(CompletionFinder): + """ + Custom completion finder for argcomplete. + It provides support for running completion in list mode, which argcomplete natively handles the same as standard completion. + """ + enabled = bool(argcomplete) + + def __init__(self, *args, validator=None, **kwargs) -> None: + if validator: + raise ValueError() + + self.comp_type = get_comp_type() + self.list_mode = self.comp_type.list_mode if self.comp_type else False + self.disable_completion_mangling = False + + finder = self + + def custom_validator(completion, prefix): + """Completion validator used to optionally bypass validation.""" + if finder.disable_completion_mangling: + return True + + return default_validator(completion, prefix) + + super().__init__( + *args, + validator=custom_validator, + **kwargs, + ) + + def __call__(self, *args, **kwargs): + if self.enabled: + super().__call__(*args, **kwargs) + + def quote_completions(self, completions, cword_prequote, last_wordbreak_pos): + """Intercept default quoting behavior to optionally block mangling of completion entries.""" + if self.disable_completion_mangling: + # Word breaks have already been handled when generating completions, don't mangle them further. + # This is needed in many cases when returning completion lists which lack the existing completion prefix. + last_wordbreak_pos = None + + return super().quote_completions(completions, cword_prequote, last_wordbreak_pos) diff --git a/test/lib/ansible_test/_internal/cli/argparsing/parsers.py b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py new file mode 100644 index 0000000..d07e03c --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py @@ -0,0 +1,597 @@ +"""General purpose composite argument parsing and completion.""" +from __future__ import annotations + +import abc +import collections.abc as c +import contextlib +import dataclasses +import enum +import os +import re +import typing as t + +# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior. +# +# Recommended characters for assignment and/or continuation: `/` `:` `=` +# +# The recommended assignment_character list is due to how argcomplete handles continuation characters. +# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558 + +PAIR_DELIMITER = ',' +ASSIGNMENT_DELIMITER = '=' +PATH_DELIMITER = '/' + + +# This class was originally frozen. However, that causes issues when running under Python 3.11. +# See: https://github.com/python/cpython/issues/99856 +@dataclasses.dataclass +class Completion(Exception): + """Base class for argument completion results.""" + + +@dataclasses.dataclass +class CompletionUnavailable(Completion): + """Argument completion unavailable.""" + message: str = 'No completions available.' + + +@dataclasses.dataclass +class CompletionError(Completion): + """Argument completion error.""" + message: t.Optional[str] = None + + +@dataclasses.dataclass +class CompletionSuccess(Completion): + """Successful argument completion result.""" + list_mode: bool + consumed: str + continuation: str + matches: list[str] = dataclasses.field(default_factory=list) + + @property + def preserve(self) -> bool: + """ + True if argcomplete should not mangle completion values, otherwise False. + Only used when more than one completion exists to avoid overwriting the word undergoing completion. + """ + return len(self.matches) > 1 and self.list_mode + + @property + def completions(self) -> list[str]: + """List of completion values to return to argcomplete.""" + completions = self.matches + continuation = '' if self.list_mode else self.continuation + + if not self.preserve: + # include the existing prefix to avoid rewriting the word undergoing completion + completions = [f'{self.consumed}{completion}{continuation}' for completion in completions] + + return completions + + +class ParserMode(enum.Enum): + """Mode the parser is operating in.""" + PARSE = enum.auto() + COMPLETE = enum.auto() + LIST = enum.auto() + + +class ParserError(Exception): + """Base class for all parsing exceptions.""" + + +@dataclasses.dataclass +class ParserBoundary: + """Boundary details for parsing composite input.""" + delimiters: str + required: bool + match: t.Optional[str] = None + ready: bool = True + + +@dataclasses.dataclass +class ParserState: + """State of the composite argument parser.""" + mode: ParserMode + remainder: str = '' + consumed: str = '' + boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list) + namespaces: list[t.Any] = dataclasses.field(default_factory=list) + parts: list[str] = dataclasses.field(default_factory=list) + + @property + def incomplete(self) -> bool: + """True if parsing is incomplete (unparsed input remains), otherwise False.""" + return self.remainder is not None + + def match(self, value: str, choices: list[str]) -> bool: + """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False.""" + if self.current_boundary: + delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match + else: + delimiters, delimiter = '', None + + for choice in choices: + if choice.rstrip(delimiters) == choice: + # choice is not delimited + if value == choice: + return True # value matched + else: + # choice is delimited + if f'{value}{delimiter}' == choice: + return True # value and delimiter matched + + return False + + def read(self) -> str: + """Read and return the next input segment, taking into account parsing boundaries.""" + delimiters = "".join(boundary.delimiters for boundary in self.boundaries) + + if delimiters: + pattern = '([' + re.escape(delimiters) + '])' + regex = re.compile(pattern) + parts = regex.split(self.remainder, 1) + else: + parts = [self.remainder] + + if len(parts) > 1: + value, delimiter, remainder = parts + else: + value, delimiter, remainder = parts[0], None, None + + for boundary in reversed(self.boundaries): + if delimiter and delimiter in boundary.delimiters: + boundary.match = delimiter + self.consumed += value + delimiter + break + + boundary.match = None + boundary.ready = False + + if boundary.required: + break + + self.remainder = remainder + + return value + + @property + def root_namespace(self) -> t.Any: + """THe root namespace.""" + return self.namespaces[0] + + @property + def current_namespace(self) -> t.Any: + """The current namespace.""" + return self.namespaces[-1] + + @property + def current_boundary(self) -> t.Optional[ParserBoundary]: + """The current parser boundary, if any, otherwise None.""" + return self.boundaries[-1] if self.boundaries else None + + def set_namespace(self, namespace: t.Any) -> None: + """Set the current namespace.""" + self.namespaces.append(namespace) + + @contextlib.contextmanager + def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]: + """Context manager for delimiting parsing of input.""" + boundary = ParserBoundary(delimiters=delimiters, required=required) + + self.boundaries.append(boundary) + + try: + yield boundary + finally: + self.boundaries.pop() + + if boundary.required and not boundary.match: + raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead') + + +@dataclasses.dataclass +class DocumentationState: + """State of the composite argument parser's generated documentation.""" + sections: dict[str, str] = dataclasses.field(default_factory=dict) + + +class Parser(metaclass=abc.ABCMeta): + """Base class for all composite argument parsers.""" + @abc.abstractmethod + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + raise Exception(f'Undocumented parser: {type(self)}') + + +class MatchConditions(enum.Flag): + """Acceptable condition(s) for matching user input to available choices.""" + CHOICE = enum.auto() + """Match any choice.""" + ANY = enum.auto() + """Match any non-empty string.""" + NOTHING = enum.auto() + """Match an empty string which is not followed by a boundary match.""" + + +class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which use a list of choices that can be generated during completion.""" + def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.conditions = conditions + + @abc.abstractmethod + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + + def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + return CompletionUnavailable() + + def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument + """Return an instance of ParserError when parsing fails and no choices are available.""" + return ParserError('No choices available.') + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = state.read() + choices = self.get_choices(value) + + if state.mode == ParserMode.PARSE or state.incomplete: + if self.conditions & MatchConditions.CHOICE and state.match(value, choices): + return value + + if self.conditions & MatchConditions.ANY and value: + return value + + if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match: + return value + + if state.mode == ParserMode.PARSE: + if choices: + raise ParserError(f'"{value}" not in: {", ".join(choices)}') + + raise self.no_choices_available(value) + + raise CompletionUnavailable() + + matches = [choice for choice in choices if choice.startswith(value)] + + if not matches: + raise self.no_completion_match(value) + + continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else '' + + raise CompletionSuccess( + list_mode=state.mode == ParserMode.LIST, + consumed=state.consumed, + continuation=continuation, + matches=matches, + ) + + +class ChoicesParser(DynamicChoicesParser): + """Composite argument parser which relies on a static list of choices.""" + def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.choices = choices + + super().__init__(conditions=conditions) + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + return self.choices + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '|'.join(self.choices) + + +class EnumValueChoicesParser(ChoicesParser): + """Composite argument parser which relies on a static list of choices derived from the values of an enum.""" + def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None: + self.enum_type = enum_type + + super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return self.enum_type(value) + + +class IntegerParser(DynamicChoicesParser): + """Composite argument parser for integers.""" + PATTERN = re.compile('^[1-9][0-9]*$') + + def __init__(self, maximum: t.Optional[int] = None) -> None: + self.maximum = maximum + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + if not value: + numbers = list(range(1, 10)) + elif self.PATTERN.search(value): + int_prefix = int(value) + base = int_prefix * 10 + numbers = [int_prefix] + [base + i for i in range(0, 10)] + else: + numbers = [] + + # NOTE: the minimum is currently fixed at 1 + + if self.maximum is not None: + numbers = [n for n in numbers if n <= self.maximum] + + return [str(n) for n in numbers] + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return int(value) + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{integer}' + + +class BooleanParser(ChoicesParser): + """Composite argument parser for boolean (yes/no) values.""" + def __init__(self) -> None: + super().__init__(['yes', 'no']) + + def parse(self, state: ParserState) -> bool: + """Parse the input from the given state and return the result.""" + value = super().parse(state) + return value == 'yes' + + +class AnyParser(ChoicesParser): + """Composite argument parser which accepts any input value.""" + def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None: + self.no_match_message = no_match_message + + conditions = MatchConditions.ANY + + if nothing: + conditions |= MatchConditions.NOTHING + + super().__init__([], conditions=conditions) + + def no_completion_match(self, value: str) -> CompletionUnavailable: + """Return an instance of CompletionUnavailable when no match was found for the given value.""" + if self.no_match_message: + return CompletionUnavailable(message=self.no_match_message) + + return super().no_completion_match(value) + + def no_choices_available(self, value: str) -> ParserError: + """Return an instance of ParserError when parsing fails and no choices are available.""" + if self.no_match_message: + return ParserError(self.no_match_message) + + return super().no_choices_available(value) + + +class RelativePathNameParser(DynamicChoicesParser): + """Composite argument parser for relative path names.""" + RELATIVE_NAMES = ['.', '..'] + + def __init__(self, choices: list[str]) -> None: + self.choices = choices + + super().__init__() + + def get_choices(self, value: str) -> list[str]: + """Return a list of valid choices based on the given input value.""" + choices = list(self.choices) + + if value in self.RELATIVE_NAMES: + # complete relative names, but avoid suggesting them unless the current name is relative + # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../") + choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES) + + return choices + + +class FileParser(Parser): + """Composite argument parser for absolute or relative file paths.""" + def parse(self, state: ParserState) -> str: + """Parse the input from the given state and return the result.""" + if state.mode == ParserMode.PARSE: + path = AnyParser().parse(state) + + if not os.path.isfile(path): + raise ParserError(f'Not a file: {path}') + else: + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + directory = path or '.' + + try: + with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry] + choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan] + except OSError: + choices = [] + + if not path: + choices.append(PATH_DELIMITER) # allow absolute paths + choices.append('../') # suggest relative paths + + part = RelativePathNameParser(choices).parse(state) + path += f'{part}{boundary.match or ""}' + + return path + + +class AbsolutePathParser(Parser): + """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + path = '' + + with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary + while boundary.ready: + if path: + path += AnyParser(nothing=True).parse(state) + else: + path += ChoicesParser([PATH_DELIMITER]).parse(state) + + path += (boundary.match or '') + + return path + + +class NamespaceParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers that store their results in a namespace.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + current = getattr(namespace, self.dest) + + if current and self.limit_one: + if state.mode == ParserMode.PARSE: + raise ParserError('Option cannot be specified more than once.') + + raise CompletionError('Option cannot be specified more than once.') + + value = self.get_value(state) + + if self.use_list: + if not current: + current = [] + setattr(namespace, self.dest, current) + + current.append(value) + else: + setattr(namespace, self.dest, value) + + return value + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return super().parse(state) + + @property + def use_list(self) -> bool: + """True if the destination is a list, otherwise False.""" + return False + + @property + def limit_one(self) -> bool: + """True if only one target is allowed, otherwise False.""" + return not self.use_list + + @property + @abc.abstractmethod + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + + +class NamespaceWrappedParser(NamespaceParser): + """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace.""" + def __init__(self, dest: str, parser: Parser) -> None: + self._dest = dest + self.parser = parser + + def get_value(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result, without storing the result in the namespace.""" + return self.parser.parse(state) + + @property + def dest(self) -> str: + """The name of the attribute where the value should be stored.""" + return self._dest + + +class KeyValueParser(Parser, metaclass=abc.ABCMeta): + """Base class for key/value composite argument parsers.""" + @abc.abstractmethod + def get_parsers(self, state: ParserState) -> dict[str, Parser]: + """Return a dictionary of key names and value parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = state.current_namespace + parsers = self.get_parsers(state) + keys = list(parsers) + + with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary + while pair.ready: + with state.delimit(ASSIGNMENT_DELIMITER): + key = ChoicesParser(keys).parse(state) + + value = parsers[key].parse(state) + + setattr(namespace, key, value) + + keys.remove(key) + + return namespace + + +class PairParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = self.create_namespace() + + state.set_namespace(namespace) + + with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary + choice = self.get_left_parser(state).parse(state) + + if boundary.match: + self.get_right_parser(choice).parse(state) + + return namespace + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return False + + @property + def delimiter(self) -> str: + """The delimiter to use between the left and right parser.""" + return PAIR_DELIMITER + + @abc.abstractmethod + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + + @abc.abstractmethod + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + + @abc.abstractmethod + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + + +class TypeParser(Parser, metaclass=abc.ABCMeta): + """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name.""" + def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument + """Return a dictionary of type names and type parsers.""" + return self.get_stateless_parsers() + + @abc.abstractmethod + def get_stateless_parsers(self) -> dict[str, Parser]: + """Return a dictionary of type names and type parsers.""" + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + parsers = self.get_parsers(state) + + with state.delimit(':'): + key = ChoicesParser(list(parsers)).parse(state) + + value = parsers[key].parse(state) + + return value |