summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli/argparsing
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/argparsing')
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/__init__.py265
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/actions.py18
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py124
-rw-r--r--test/lib/ansible_test/_internal/cli/argparsing/parsers.py597
4 files changed, 1004 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/__init__.py b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py
new file mode 100644
index 0000000..540cf55
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/__init__.py
@@ -0,0 +1,265 @@
+"""Completion finder which brings together custom options and completion logic."""
+from __future__ import annotations
+
+import abc
+import argparse
+import os
+import re
+import typing as t
+
+from .argcompletion import (
+ OptionCompletionFinder,
+ get_comp_type,
+ register_safe_action,
+ warn,
+)
+
+from .parsers import (
+ Completion,
+ CompletionError,
+ CompletionSuccess,
+ CompletionUnavailable,
+ DocumentationState,
+ NamespaceParser,
+ Parser,
+ ParserError,
+ ParserMode,
+ ParserState,
+)
+
+
+class RegisteredCompletionFinder(OptionCompletionFinder):
+ """
+ Custom option completion finder for argcomplete which allows completion results to be registered.
+ These registered completions, if provided, are used to filter the final completion results.
+ This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221
+ """
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self.registered_completions: t.Optional[list[str]] = None
+
+ def completer(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ **kwargs,
+ ) -> list[str]:
+ """
+ Return a list of completions for the specified prefix and action.
+ Use this as the completer function for argcomplete.
+ """
+ kwargs.clear()
+ del kwargs
+
+ completions = self.get_completions(prefix, action, parsed_args)
+
+ if action.nargs and not isinstance(action.nargs, int):
+ # prevent argcomplete from including unrelated arguments in the completion results
+ self.registered_completions = completions
+
+ return completions
+
+ @abc.abstractmethod
+ def get_completions(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ ) -> list[str]:
+ """
+ Return a list of completions for the specified prefix and action.
+ Called by the complete function.
+ """
+
+ def quote_completions(self, completions, cword_prequote, last_wordbreak_pos):
+ """Modify completion results before returning them."""
+ if self.registered_completions is not None:
+ # If one of the completion handlers registered their results, only allow those exact results to be returned.
+ # This prevents argcomplete from adding results from other completers when they are known to be invalid.
+ allowed_completions = set(self.registered_completions)
+ completions = [completion for completion in completions if completion in allowed_completions]
+
+ return super().quote_completions(completions, cword_prequote, last_wordbreak_pos)
+
+
+class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
+ """Base class for actions that parse composite arguments."""
+ documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {}
+
+ def __init__(
+ self,
+ *args,
+ **kwargs,
+ ):
+ self.definition = self.create_parser()
+ self.documentation_state[type(self)] = documentation_state = DocumentationState()
+ self.definition.document(documentation_state)
+
+ kwargs.update(dest=self.definition.dest)
+
+ super().__init__(*args, **kwargs)
+
+ register_safe_action(type(self))
+
+ @abc.abstractmethod
+ def create_parser(self) -> NamespaceParser:
+ """Return a namespace parser to parse the argument associated with this action."""
+
+ def __call__(
+ self,
+ parser,
+ namespace,
+ values,
+ option_string=None,
+ ):
+ state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values)
+
+ try:
+ self.definition.parse(state)
+ except ParserError as ex:
+ error = str(ex)
+ except CompletionError as ex:
+ error = ex.message
+ else:
+ return
+
+ if get_comp_type():
+ # FUTURE: It may be possible to enhance error handling by surfacing this error message during downstream completion.
+ return # ignore parse errors during completion to avoid breaking downstream completion
+
+ raise argparse.ArgumentError(self, error)
+
+
+class CompositeActionCompletionFinder(RegisteredCompletionFinder):
+ """Completion finder with support for composite argument parsing."""
+ def get_completions(
+ self,
+ prefix: str,
+ action: argparse.Action,
+ parsed_args: argparse.Namespace,
+ ) -> list[str]:
+ """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
+ assert isinstance(action, CompositeAction)
+
+ state = ParserState(
+ mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE,
+ remainder=prefix,
+ namespaces=[parsed_args],
+ )
+
+ answer = complete(action.definition, state)
+
+ completions = []
+
+ if isinstance(answer, CompletionSuccess):
+ self.disable_completion_mangling = answer.preserve
+ completions = answer.completions
+
+ if isinstance(answer, CompletionError):
+ warn(answer.message)
+
+ return completions
+
+
+def detect_file_listing(value: str, mode: ParserMode) -> bool:
+ """
+ Return True if Bash will show a file listing and redraw the prompt, otherwise return False.
+
+ If there are no list results, a file listing will be shown if the value after the last `=` or `:` character:
+
+ - is empty
+ - matches a full path
+ - matches a partial path
+
+ Otherwise Bash will play the bell sound and display nothing.
+
+ see: https://github.com/kislyuk/argcomplete/issues/328
+ see: https://github.com/kislyuk/argcomplete/pull/284
+ """
+ listing = False
+
+ if mode == ParserMode.LIST:
+ right = re.split('[=:]', value)[-1]
+ listing = not right or os.path.exists(right)
+
+ if not listing:
+ directory = os.path.dirname(right)
+
+ # noinspection PyBroadException
+ try:
+ filenames = os.listdir(directory or '.')
+ except Exception: # pylint: disable=broad-except
+ pass
+ else:
+ listing = any(filename.startswith(right) for filename in filenames)
+
+ return listing
+
+
+def detect_false_file_completion(value: str, mode: ParserMode) -> bool:
+ """
+ Return True if Bash will provide an incorrect file completion, otherwise return False.
+
+ If there are no completion results, a filename will be automatically completed if the value after the last `=` or `:` character:
+
+ - matches exactly one partial path
+
+ Otherwise Bash will play the bell sound and display nothing.
+
+ see: https://github.com/kislyuk/argcomplete/issues/328
+ see: https://github.com/kislyuk/argcomplete/pull/284
+ """
+ completion = False
+
+ if mode == ParserMode.COMPLETE:
+ completion = True
+
+ right = re.split('[=:]', value)[-1]
+ directory, prefix = os.path.split(right)
+
+ # noinspection PyBroadException
+ try:
+ filenames = os.listdir(directory or '.')
+ except Exception: # pylint: disable=broad-except
+ pass
+ else:
+ matches = [filename for filename in filenames if filename.startswith(prefix)]
+ completion = len(matches) == 1
+
+ return completion
+
+
+def complete(
+ completer: Parser,
+ state: ParserState,
+) -> Completion:
+ """Perform argument completion using the given completer and return the completion result."""
+ value = state.remainder
+
+ answer: Completion
+
+ try:
+ completer.parse(state)
+ raise ParserError('completion expected')
+ except CompletionUnavailable as ex:
+ if detect_file_listing(value, state.mode):
+ # Displaying a warning before the file listing informs the user it is invalid. Bash will redraw the prompt after the list.
+ # If the file listing is not shown, a warning could be helpful, but would introduce noise on the terminal since the prompt is not redrawn.
+ answer = CompletionError(ex.message)
+ elif detect_false_file_completion(value, state.mode):
+ # When the current prefix provides no matches, but matches files a single file on disk, Bash will perform an incorrect completion.
+ # Returning multiple invalid matches instead of no matches will prevent Bash from using its own completion logic in this case.
+ answer = CompletionSuccess(
+ list_mode=True, # abuse list mode to enable preservation of the literal results
+ consumed='',
+ continuation='',
+ matches=['completion', 'invalid']
+ )
+ else:
+ answer = ex
+ except Completion as ex:
+ answer = ex
+
+ return answer
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/actions.py b/test/lib/ansible_test/_internal/cli/argparsing/actions.py
new file mode 100644
index 0000000..2bcf982
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/actions.py
@@ -0,0 +1,18 @@
+"""Actions for argparse."""
+from __future__ import annotations
+
+import argparse
+import enum
+import typing as t
+
+
+class EnumAction(argparse.Action):
+ """Parse an enum using the lowercase enum names."""
+ def __init__(self, **kwargs: t.Any) -> None:
+ self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None)
+ kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type))
+ super().__init__(**kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ value = self.enum_type[values.upper()]
+ setattr(namespace, self.dest, value)
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py
new file mode 100644
index 0000000..cf5776d
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/argcompletion.py
@@ -0,0 +1,124 @@
+"""Wrapper around argcomplete providing bug fixes and additional features."""
+from __future__ import annotations
+
+import argparse
+import enum
+import os
+import typing as t
+
+
+class Substitute:
+ """Substitute for missing class which accepts all arguments."""
+ def __init__(self, *args, **kwargs) -> None:
+ pass
+
+
+try:
+ import argcomplete
+
+ from argcomplete import (
+ CompletionFinder,
+ default_validator,
+ )
+
+ warn = argcomplete.warn # pylint: disable=invalid-name
+except ImportError:
+ argcomplete = None
+
+ CompletionFinder = Substitute
+ default_validator = Substitute # pylint: disable=invalid-name
+ warn = Substitute # pylint: disable=invalid-name
+
+
+class CompType(enum.Enum):
+ """
+ Bash COMP_TYPE argument completion types.
+ For documentation, see: https://www.gnu.org/software/bash/manual/html_node/Bash-Variables.html#index-COMP_005fTYPE
+ """
+ COMPLETION = '\t'
+ """
+ Standard completion, typically triggered by a single tab.
+ """
+ MENU_COMPLETION = '%'
+ """
+ Menu completion, which cycles through each completion instead of showing a list.
+ For help using this feature, see: https://stackoverflow.com/questions/12044574/getting-complete-and-menu-complete-to-work-together
+ """
+ LIST = '?'
+ """
+ Standard list, typically triggered by a double tab.
+ """
+ LIST_AMBIGUOUS = '!'
+ """
+ Listing with `show-all-if-ambiguous` set.
+ For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dambiguous
+ For additional details, see: https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type
+ """
+ LIST_UNMODIFIED = '@'
+ """
+ Listing with `show-all-if-unmodified` set.
+ For documentation, see https://www.gnu.org/software/bash/manual/html_node/Readline-Init-File-Syntax.html#index-show_002dall_002dif_002dunmodified
+ For additional details, see: : https://unix.stackexchange.com/questions/614123/explanation-of-bash-completion-comp-type
+ """
+
+ @property
+ def list_mode(self) -> bool:
+ """True if completion is running in list mode, otherwise False."""
+ return self in (CompType.LIST, CompType.LIST_AMBIGUOUS, CompType.LIST_UNMODIFIED)
+
+
+def register_safe_action(action_type: t.Type[argparse.Action]) -> None:
+ """Register the given action as a safe action for argcomplete to use during completion if it is not already registered."""
+ if argcomplete and action_type not in argcomplete.safe_actions:
+ argcomplete.safe_actions += (action_type,)
+
+
+def get_comp_type() -> t.Optional[CompType]:
+ """Parse the COMP_TYPE environment variable (if present) and return the associated CompType enum value."""
+ value = os.environ.get('COMP_TYPE')
+ comp_type = CompType(chr(int(value))) if value else None
+ return comp_type
+
+
+class OptionCompletionFinder(CompletionFinder):
+ """
+ Custom completion finder for argcomplete.
+ It provides support for running completion in list mode, which argcomplete natively handles the same as standard completion.
+ """
+ enabled = bool(argcomplete)
+
+ def __init__(self, *args, validator=None, **kwargs) -> None:
+ if validator:
+ raise ValueError()
+
+ self.comp_type = get_comp_type()
+ self.list_mode = self.comp_type.list_mode if self.comp_type else False
+ self.disable_completion_mangling = False
+
+ finder = self
+
+ def custom_validator(completion, prefix):
+ """Completion validator used to optionally bypass validation."""
+ if finder.disable_completion_mangling:
+ return True
+
+ return default_validator(completion, prefix)
+
+ super().__init__(
+ *args,
+ validator=custom_validator,
+ **kwargs,
+ )
+
+ def __call__(self, *args, **kwargs):
+ if self.enabled:
+ super().__call__(*args, **kwargs)
+
+ def quote_completions(self, completions, cword_prequote, last_wordbreak_pos):
+ """Intercept default quoting behavior to optionally block mangling of completion entries."""
+ if self.disable_completion_mangling:
+ # Word breaks have already been handled when generating completions, don't mangle them further.
+ # This is needed in many cases when returning completion lists which lack the existing completion prefix.
+ last_wordbreak_pos = None
+
+ return super().quote_completions(completions, cword_prequote, last_wordbreak_pos)
diff --git a/test/lib/ansible_test/_internal/cli/argparsing/parsers.py b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py
new file mode 100644
index 0000000..d07e03c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/argparsing/parsers.py
@@ -0,0 +1,597 @@
+"""General purpose composite argument parsing and completion."""
+from __future__ import annotations
+
+import abc
+import collections.abc as c
+import contextlib
+import dataclasses
+import enum
+import os
+import re
+import typing as t
+
+# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior.
+#
+# Recommended characters for assignment and/or continuation: `/` `:` `=`
+#
+# The recommended assignment_character list is due to how argcomplete handles continuation characters.
+# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558
+
+PAIR_DELIMITER = ','
+ASSIGNMENT_DELIMITER = '='
+PATH_DELIMITER = '/'
+
+
+# This class was originally frozen. However, that causes issues when running under Python 3.11.
+# See: https://github.com/python/cpython/issues/99856
+@dataclasses.dataclass
+class Completion(Exception):
+ """Base class for argument completion results."""
+
+
+@dataclasses.dataclass
+class CompletionUnavailable(Completion):
+ """Argument completion unavailable."""
+ message: str = 'No completions available.'
+
+
+@dataclasses.dataclass
+class CompletionError(Completion):
+ """Argument completion error."""
+ message: t.Optional[str] = None
+
+
+@dataclasses.dataclass
+class CompletionSuccess(Completion):
+ """Successful argument completion result."""
+ list_mode: bool
+ consumed: str
+ continuation: str
+ matches: list[str] = dataclasses.field(default_factory=list)
+
+ @property
+ def preserve(self) -> bool:
+ """
+ True if argcomplete should not mangle completion values, otherwise False.
+ Only used when more than one completion exists to avoid overwriting the word undergoing completion.
+ """
+ return len(self.matches) > 1 and self.list_mode
+
+ @property
+ def completions(self) -> list[str]:
+ """List of completion values to return to argcomplete."""
+ completions = self.matches
+ continuation = '' if self.list_mode else self.continuation
+
+ if not self.preserve:
+ # include the existing prefix to avoid rewriting the word undergoing completion
+ completions = [f'{self.consumed}{completion}{continuation}' for completion in completions]
+
+ return completions
+
+
+class ParserMode(enum.Enum):
+ """Mode the parser is operating in."""
+ PARSE = enum.auto()
+ COMPLETE = enum.auto()
+ LIST = enum.auto()
+
+
+class ParserError(Exception):
+ """Base class for all parsing exceptions."""
+
+
+@dataclasses.dataclass
+class ParserBoundary:
+ """Boundary details for parsing composite input."""
+ delimiters: str
+ required: bool
+ match: t.Optional[str] = None
+ ready: bool = True
+
+
+@dataclasses.dataclass
+class ParserState:
+ """State of the composite argument parser."""
+ mode: ParserMode
+ remainder: str = ''
+ consumed: str = ''
+ boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list)
+ namespaces: list[t.Any] = dataclasses.field(default_factory=list)
+ parts: list[str] = dataclasses.field(default_factory=list)
+
+ @property
+ def incomplete(self) -> bool:
+ """True if parsing is incomplete (unparsed input remains), otherwise False."""
+ return self.remainder is not None
+
+ def match(self, value: str, choices: list[str]) -> bool:
+ """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False."""
+ if self.current_boundary:
+ delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match
+ else:
+ delimiters, delimiter = '', None
+
+ for choice in choices:
+ if choice.rstrip(delimiters) == choice:
+ # choice is not delimited
+ if value == choice:
+ return True # value matched
+ else:
+ # choice is delimited
+ if f'{value}{delimiter}' == choice:
+ return True # value and delimiter matched
+
+ return False
+
+ def read(self) -> str:
+ """Read and return the next input segment, taking into account parsing boundaries."""
+ delimiters = "".join(boundary.delimiters for boundary in self.boundaries)
+
+ if delimiters:
+ pattern = '([' + re.escape(delimiters) + '])'
+ regex = re.compile(pattern)
+ parts = regex.split(self.remainder, 1)
+ else:
+ parts = [self.remainder]
+
+ if len(parts) > 1:
+ value, delimiter, remainder = parts
+ else:
+ value, delimiter, remainder = parts[0], None, None
+
+ for boundary in reversed(self.boundaries):
+ if delimiter and delimiter in boundary.delimiters:
+ boundary.match = delimiter
+ self.consumed += value + delimiter
+ break
+
+ boundary.match = None
+ boundary.ready = False
+
+ if boundary.required:
+ break
+
+ self.remainder = remainder
+
+ return value
+
+ @property
+ def root_namespace(self) -> t.Any:
+ """THe root namespace."""
+ return self.namespaces[0]
+
+ @property
+ def current_namespace(self) -> t.Any:
+ """The current namespace."""
+ return self.namespaces[-1]
+
+ @property
+ def current_boundary(self) -> t.Optional[ParserBoundary]:
+ """The current parser boundary, if any, otherwise None."""
+ return self.boundaries[-1] if self.boundaries else None
+
+ def set_namespace(self, namespace: t.Any) -> None:
+ """Set the current namespace."""
+ self.namespaces.append(namespace)
+
+ @contextlib.contextmanager
+ def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]:
+ """Context manager for delimiting parsing of input."""
+ boundary = ParserBoundary(delimiters=delimiters, required=required)
+
+ self.boundaries.append(boundary)
+
+ try:
+ yield boundary
+ finally:
+ self.boundaries.pop()
+
+ if boundary.required and not boundary.match:
+ raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead')
+
+
+@dataclasses.dataclass
+class DocumentationState:
+ """State of the composite argument parser's generated documentation."""
+ sections: dict[str, str] = dataclasses.field(default_factory=dict)
+
+
+class Parser(metaclass=abc.ABCMeta):
+ """Base class for all composite argument parsers."""
+ @abc.abstractmethod
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ raise Exception(f'Undocumented parser: {type(self)}')
+
+
+class MatchConditions(enum.Flag):
+ """Acceptable condition(s) for matching user input to available choices."""
+ CHOICE = enum.auto()
+ """Match any choice."""
+ ANY = enum.auto()
+ """Match any non-empty string."""
+ NOTHING = enum.auto()
+ """Match an empty string which is not followed by a boundary match."""
+
+
+class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers which use a list of choices that can be generated during completion."""
+ def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.conditions = conditions
+
+ @abc.abstractmethod
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+
+ def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument
+ """Return an instance of CompletionUnavailable when no match was found for the given value."""
+ return CompletionUnavailable()
+
+ def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument
+ """Return an instance of ParserError when parsing fails and no choices are available."""
+ return ParserError('No choices available.')
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = state.read()
+ choices = self.get_choices(value)
+
+ if state.mode == ParserMode.PARSE or state.incomplete:
+ if self.conditions & MatchConditions.CHOICE and state.match(value, choices):
+ return value
+
+ if self.conditions & MatchConditions.ANY and value:
+ return value
+
+ if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match:
+ return value
+
+ if state.mode == ParserMode.PARSE:
+ if choices:
+ raise ParserError(f'"{value}" not in: {", ".join(choices)}')
+
+ raise self.no_choices_available(value)
+
+ raise CompletionUnavailable()
+
+ matches = [choice for choice in choices if choice.startswith(value)]
+
+ if not matches:
+ raise self.no_completion_match(value)
+
+ continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else ''
+
+ raise CompletionSuccess(
+ list_mode=state.mode == ParserMode.LIST,
+ consumed=state.consumed,
+ continuation=continuation,
+ matches=matches,
+ )
+
+
+class ChoicesParser(DynamicChoicesParser):
+ """Composite argument parser which relies on a static list of choices."""
+ def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.choices = choices
+
+ super().__init__(conditions=conditions)
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ return self.choices
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '|'.join(self.choices)
+
+
+class EnumValueChoicesParser(ChoicesParser):
+ """Composite argument parser which relies on a static list of choices derived from the values of an enum."""
+ def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
+ self.enum_type = enum_type
+
+ super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions)
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return self.enum_type(value)
+
+
+class IntegerParser(DynamicChoicesParser):
+ """Composite argument parser for integers."""
+ PATTERN = re.compile('^[1-9][0-9]*$')
+
+ def __init__(self, maximum: t.Optional[int] = None) -> None:
+ self.maximum = maximum
+
+ super().__init__()
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ if not value:
+ numbers = list(range(1, 10))
+ elif self.PATTERN.search(value):
+ int_prefix = int(value)
+ base = int_prefix * 10
+ numbers = [int_prefix] + [base + i for i in range(0, 10)]
+ else:
+ numbers = []
+
+ # NOTE: the minimum is currently fixed at 1
+
+ if self.maximum is not None:
+ numbers = [n for n in numbers if n <= self.maximum]
+
+ return [str(n) for n in numbers]
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return int(value)
+
+ def document(self, state: DocumentationState) -> t.Optional[str]:
+ """Generate and return documentation for this parser."""
+ return '{integer}'
+
+
+class BooleanParser(ChoicesParser):
+ """Composite argument parser for boolean (yes/no) values."""
+ def __init__(self) -> None:
+ super().__init__(['yes', 'no'])
+
+ def parse(self, state: ParserState) -> bool:
+ """Parse the input from the given state and return the result."""
+ value = super().parse(state)
+ return value == 'yes'
+
+
+class AnyParser(ChoicesParser):
+ """Composite argument parser which accepts any input value."""
+ def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
+ self.no_match_message = no_match_message
+
+ conditions = MatchConditions.ANY
+
+ if nothing:
+ conditions |= MatchConditions.NOTHING
+
+ super().__init__([], conditions=conditions)
+
+ def no_completion_match(self, value: str) -> CompletionUnavailable:
+ """Return an instance of CompletionUnavailable when no match was found for the given value."""
+ if self.no_match_message:
+ return CompletionUnavailable(message=self.no_match_message)
+
+ return super().no_completion_match(value)
+
+ def no_choices_available(self, value: str) -> ParserError:
+ """Return an instance of ParserError when parsing fails and no choices are available."""
+ if self.no_match_message:
+ return ParserError(self.no_match_message)
+
+ return super().no_choices_available(value)
+
+
+class RelativePathNameParser(DynamicChoicesParser):
+ """Composite argument parser for relative path names."""
+ RELATIVE_NAMES = ['.', '..']
+
+ def __init__(self, choices: list[str]) -> None:
+ self.choices = choices
+
+ super().__init__()
+
+ def get_choices(self, value: str) -> list[str]:
+ """Return a list of valid choices based on the given input value."""
+ choices = list(self.choices)
+
+ if value in self.RELATIVE_NAMES:
+ # complete relative names, but avoid suggesting them unless the current name is relative
+ # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../")
+ choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES)
+
+ return choices
+
+
+class FileParser(Parser):
+ """Composite argument parser for absolute or relative file paths."""
+ def parse(self, state: ParserState) -> str:
+ """Parse the input from the given state and return the result."""
+ if state.mode == ParserMode.PARSE:
+ path = AnyParser().parse(state)
+
+ if not os.path.isfile(path):
+ raise ParserError(f'Not a file: {path}')
+ else:
+ path = ''
+
+ with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
+ while boundary.ready:
+ directory = path or '.'
+
+ try:
+ with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry]
+ choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan]
+ except OSError:
+ choices = []
+
+ if not path:
+ choices.append(PATH_DELIMITER) # allow absolute paths
+ choices.append('../') # suggest relative paths
+
+ part = RelativePathNameParser(choices).parse(state)
+ path += f'{part}{boundary.match or ""}'
+
+ return path
+
+
+class AbsolutePathParser(Parser):
+ """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ path = ''
+
+ with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
+ while boundary.ready:
+ if path:
+ path += AnyParser(nothing=True).parse(state)
+ else:
+ path += ChoicesParser([PATH_DELIMITER]).parse(state)
+
+ path += (boundary.match or '')
+
+ return path
+
+
+class NamespaceParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers that store their results in a namespace."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+ current = getattr(namespace, self.dest)
+
+ if current and self.limit_one:
+ if state.mode == ParserMode.PARSE:
+ raise ParserError('Option cannot be specified more than once.')
+
+ raise CompletionError('Option cannot be specified more than once.')
+
+ value = self.get_value(state)
+
+ if self.use_list:
+ if not current:
+ current = []
+ setattr(namespace, self.dest, current)
+
+ current.append(value)
+ else:
+ setattr(namespace, self.dest, value)
+
+ return value
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ return super().parse(state)
+
+ @property
+ def use_list(self) -> bool:
+ """True if the destination is a list, otherwise False."""
+ return False
+
+ @property
+ def limit_one(self) -> bool:
+ """True if only one target is allowed, otherwise False."""
+ return not self.use_list
+
+ @property
+ @abc.abstractmethod
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+
+
+class NamespaceWrappedParser(NamespaceParser):
+ """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""
+ def __init__(self, dest: str, parser: Parser) -> None:
+ self._dest = dest
+ self.parser = parser
+
+ def get_value(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result, without storing the result in the namespace."""
+ return self.parser.parse(state)
+
+ @property
+ def dest(self) -> str:
+ """The name of the attribute where the value should be stored."""
+ return self._dest
+
+
+class KeyValueParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for key/value composite argument parsers."""
+ @abc.abstractmethod
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]:
+ """Return a dictionary of key names and value parsers."""
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = state.current_namespace
+ parsers = self.get_parsers(state)
+ keys = list(parsers)
+
+ with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary
+ while pair.ready:
+ with state.delimit(ASSIGNMENT_DELIMITER):
+ key = ChoicesParser(keys).parse(state)
+
+ value = parsers[key].parse(state)
+
+ setattr(namespace, key, value)
+
+ keys.remove(key)
+
+ return namespace
+
+
+class PairParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ namespace = self.create_namespace()
+
+ state.set_namespace(namespace)
+
+ with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary
+ choice = self.get_left_parser(state).parse(state)
+
+ if boundary.match:
+ self.get_right_parser(choice).parse(state)
+
+ return namespace
+
+ @property
+ def required(self) -> bool:
+ """True if the delimiter (and thus right parser) is required, otherwise False."""
+ return False
+
+ @property
+ def delimiter(self) -> str:
+ """The delimiter to use between the left and right parser."""
+ return PAIR_DELIMITER
+
+ @abc.abstractmethod
+ def create_namespace(self) -> t.Any:
+ """Create and return a namespace."""
+
+ @abc.abstractmethod
+ def get_left_parser(self, state: ParserState) -> Parser:
+ """Return the parser for the left side."""
+
+ @abc.abstractmethod
+ def get_right_parser(self, choice: t.Any) -> Parser:
+ """Return the parser for the right side."""
+
+
+class TypeParser(Parser, metaclass=abc.ABCMeta):
+ """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""
+ def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument
+ """Return a dictionary of type names and type parsers."""
+ return self.get_stateless_parsers()
+
+ @abc.abstractmethod
+ def get_stateless_parsers(self) -> dict[str, Parser]:
+ """Return a dictionary of type names and type parsers."""
+
+ def parse(self, state: ParserState) -> t.Any:
+ """Parse the input from the given state and return the result."""
+ parsers = self.get_parsers(state)
+
+ with state.delimit(':'):
+ key = ChoicesParser(list(parsers)).parse(state)
+
+ value = parsers[key].parse(state)
+
+ return value