diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py')
-rw-r--r-- | test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py new file mode 100644 index 0000000..ee6f146 --- /dev/null +++ b/test/lib/ansible_test/_internal/cli/parsers/host_config_parsers.py @@ -0,0 +1,310 @@ +"""Composite parsers for the various types of hosts.""" +from __future__ import annotations + +import typing as t + +from ...completion import ( + docker_completion, + network_completion, + remote_completion, + windows_completion, + filter_completion, +) + +from ...host_configs import ( + ControllerConfig, + DockerConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixRemoteConfig, + PosixSshConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ..compat import ( + get_fallback_remote_controller, +) + +from ..argparsing.parsers import ( + ChoicesParser, + DocumentationState, + FileParser, + MatchConditions, + NamespaceWrappedParser, + PairParser, + Parser, + ParserError, + ParserState, +) + +from .value_parsers import ( + PlatformParser, + SshConnectionParser, +) + +from .key_value_parsers import ( + ControllerKeyValueParser, + DockerKeyValueParser, + EmptyKeyValueParser, + NetworkRemoteKeyValueParser, + OriginKeyValueParser, + PosixRemoteKeyValueParser, + PosixSshKeyValueParser, + WindowsRemoteKeyValueParser, +) + +from .helpers import ( + get_docker_pythons, + get_remote_pythons, +) + + +class OriginParser(Parser): + """Composite argument parser for the origin.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = OriginConfig() + + state.set_namespace(namespace) + + parser = OriginKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return OriginKeyValueParser().document(state) + + +class ControllerParser(Parser): + """Composite argument parser for the controller.""" + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + namespace = ControllerConfig() + + state.set_namespace(namespace) + + parser = ControllerKeyValueParser() + parser.parse(state) + + return namespace + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return ControllerKeyValueParser().document(state) + + +class DockerParser(PairParser): + """Composite argument parser for a docker host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return DockerConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', ChoicesParser(list(filter_completion(docker_completion(), controller_only=self.controller)), + conditions=MatchConditions.CHOICE | MatchConditions.ANY)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return DockerKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: DockerConfig = super().parse(state) + + if not value.python and not get_docker_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for docker image: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = 'default' + content = '\n'.join([f' {image} ({", ".join(get_docker_pythons(image, self.controller, False))})' + for image, item in filter_completion(docker_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {image} # python must be specified for custom images', + ]) + + state.sections[f'{"controller" if self.controller else "target"} docker images and supported python version (choose one):'] = content + + return f'{{image}}[,{DockerKeyValueParser(default, self.controller).document(state)}]' + + +class PosixRemoteParser(PairParser): + """Composite argument parser for a POSIX remote host.""" + def __init__(self, controller: bool) -> None: + self.controller = controller + + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('name', PlatformParser(list(filter_completion(remote_completion(), controller_only=self.controller)))) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixRemoteKeyValueParser(choice, self.controller) + + def parse(self, state: ParserState) -> t.Any: + """Parse the input from the given state and return the result.""" + value: PosixRemoteConfig = super().parse(state) + + if not value.python and not get_remote_pythons(value.name, self.controller, True): + raise ParserError(f'Python version required for remote: {value.name}') + + return value + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + default = get_fallback_remote_controller() + content = '\n'.join([f' {name} ({", ".join(get_remote_pythons(name, self.controller, False))})' + for name, item in filter_completion(remote_completion(), controller_only=self.controller).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # python must be specified for unknown systems', + ]) + + state.sections[f'{"controller" if self.controller else "target"} remote systems and supported python versions (choose one):'] = content + + return f'{{system}}[,{PosixRemoteKeyValueParser(default, self.controller).document(state)}]' + + +class WindowsRemoteParser(PairParser): + """Composite argument parser for a Windows remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(windows_completion())) + + for target in state.root_namespace.targets or []: # type: WindowsRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return WindowsRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(windows_completion()).items()]) + + content += '\n'.join([ + '', + ' windows/{version} # use an unknown windows version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{WindowsRemoteKeyValueParser().document(state)}]' + + +class NetworkRemoteParser(PairParser): + """Composite argument parser for a network remote host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkRemoteConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + names = list(filter_completion(network_completion())) + + for target in state.root_namespace.targets or []: # type: NetworkRemoteConfig + names.remove(target.name) + + return NamespaceWrappedParser('name', PlatformParser(names)) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return NetworkRemoteKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + content = '\n'.join([f' {name}' for name, item in filter_completion(network_completion()).items()]) + + content += '\n'.join([ + '', + ' {platform}/{version} # use an unknown platform and version', + ]) + + state.sections['target remote systems (choose one):'] = content + + return f'{{system}}[,{NetworkRemoteKeyValueParser().document(state)}]' + + +class WindowsInventoryParser(PairParser): + """Composite argument parser for a Windows inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return WindowsInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class NetworkInventoryParser(PairParser): + """Composite argument parser for a network inventory.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return NetworkInventoryConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return NamespaceWrappedParser('path', FileParser()) + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return EmptyKeyValueParser() + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return '{path} # INI format inventory file' + + +class PosixSshParser(PairParser): + """Composite argument parser for a POSIX SSH host.""" + def create_namespace(self) -> t.Any: + """Create and return a namespace.""" + return PosixSshConfig() + + def get_left_parser(self, state: ParserState) -> Parser: + """Return the parser for the left side.""" + return SshConnectionParser() + + def get_right_parser(self, choice: t.Any) -> Parser: + """Return the parser for the right side.""" + return PosixSshKeyValueParser() + + @property + def required(self) -> bool: + """True if the delimiter (and thus right parser) is required, otherwise False.""" + return True + + def document(self, state: DocumentationState) -> t.Optional[str]: + """Generate and return documentation for this parser.""" + return f'{SshConnectionParser().document(state)}[,{PosixSshKeyValueParser().document(state)}]' |