diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/completion.py')
-rw-r--r-- | test/lib/ansible_test/_internal/completion.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/completion.py b/test/lib/ansible_test/_internal/completion.py new file mode 100644 index 0000000..f443181 --- /dev/null +++ b/test/lib/ansible_test/_internal/completion.py @@ -0,0 +1,310 @@ +"""Loading, parsing and storing of completion configurations.""" +from __future__ import annotations + +import abc +import dataclasses +import enum +import os +import typing as t + +from .constants import ( + CONTROLLER_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from .util import ( + ANSIBLE_TEST_DATA_ROOT, + cache, + read_lines_without_comments, +) + +from .data import ( + data_context, +) + +from .become import ( + SUPPORTED_BECOME_METHODS, +) + + +class CGroupVersion(enum.Enum): + """The control group version(s) required by a container.""" + NONE = 'none' + V1_ONLY = 'v1-only' + V2_ONLY = 'v2-only' + V1_V2 = 'v1-v2' + + def __repr__(self) -> str: + return f'{self.__class__.__name__}.{self.name}' + + +class AuditMode(enum.Enum): + """The audit requirements of a container.""" + NONE = 'none' + REQUIRED = 'required' + + def __repr__(self) -> str: + return f'{self.__class__.__name__}.{self.name}' + + +@dataclasses.dataclass(frozen=True) +class CompletionConfig(metaclass=abc.ABCMeta): + """Base class for completion configuration.""" + name: str + + @property + @abc.abstractmethod + def is_default(self) -> bool: + """True if the completion entry is only used for defaults, otherwise False.""" + + +@dataclasses.dataclass(frozen=True) +class PosixCompletionConfig(CompletionConfig, metaclass=abc.ABCMeta): + """Base class for completion configuration of POSIX environments.""" + @property + @abc.abstractmethod + def supported_pythons(self) -> list[str]: + """Return a list of the supported Python versions.""" + + @abc.abstractmethod + def get_python_path(self, version: str) -> str: + """Return the path of the requested Python version.""" + + def get_default_python(self, controller: bool) -> str: + """Return the default Python version for a controller or target as specified.""" + context_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS + version = [python for python in self.supported_pythons if python in context_pythons][0] + return version + + @property + def controller_supported(self) -> bool: + """True if at least one Python version is provided which supports the controller, otherwise False.""" + return any(version in CONTROLLER_PYTHON_VERSIONS for version in self.supported_pythons) + + +@dataclasses.dataclass(frozen=True) +class PythonCompletionConfig(PosixCompletionConfig, metaclass=abc.ABCMeta): + """Base class for completion configuration of Python environments.""" + python: str = '' + python_dir: str = '/usr/bin' + + @property + def supported_pythons(self) -> list[str]: + """Return a list of the supported Python versions.""" + versions = self.python.split(',') if self.python else [] + versions = [version for version in versions if version in SUPPORTED_PYTHON_VERSIONS] + return versions + + def get_python_path(self, version: str) -> str: + """Return the path of the requested Python version.""" + return os.path.join(self.python_dir, f'python{version}') + + +@dataclasses.dataclass(frozen=True) +class RemoteCompletionConfig(CompletionConfig): + """Base class for completion configuration of remote environments provisioned through Ansible Core CI.""" + provider: t.Optional[str] = None + arch: t.Optional[str] = None + + @property + def platform(self) -> str: + """The name of the platform.""" + return self.name.partition('/')[0] + + @property + def version(self) -> str: + """The version of the platform.""" + return self.name.partition('/')[2] + + @property + def is_default(self) -> bool: + """True if the completion entry is only used for defaults, otherwise False.""" + return not self.version + + def __post_init__(self): + if not self.provider: + raise Exception(f'Remote completion entry "{self.name}" must provide a "provider" setting.') + + if not self.arch: + raise Exception(f'Remote completion entry "{self.name}" must provide a "arch" setting.') + + +@dataclasses.dataclass(frozen=True) +class InventoryCompletionConfig(CompletionConfig): + """Configuration for inventory files.""" + def __init__(self) -> None: + super().__init__(name='inventory') + + @property + def is_default(self) -> bool: + """True if the completion entry is only used for defaults, otherwise False.""" + return False + + +@dataclasses.dataclass(frozen=True) +class PosixSshCompletionConfig(PythonCompletionConfig): + """Configuration for a POSIX host reachable over SSH.""" + def __init__(self, user: str, host: str) -> None: + super().__init__( + name=f'{user}@{host}', + python=','.join(SUPPORTED_PYTHON_VERSIONS), + ) + + @property + def is_default(self) -> bool: + """True if the completion entry is only used for defaults, otherwise False.""" + return False + + +@dataclasses.dataclass(frozen=True) +class DockerCompletionConfig(PythonCompletionConfig): + """Configuration for Docker containers.""" + image: str = '' + seccomp: str = 'default' + cgroup: str = CGroupVersion.V1_V2.value + audit: str = AuditMode.REQUIRED.value # most containers need this, so the default is required, leaving it to be opt-out for containers which don't need it + placeholder: bool = False + + @property + def is_default(self) -> bool: + """True if the completion entry is only used for defaults, otherwise False.""" + return False + + @property + def audit_enum(self) -> AuditMode: + """The audit requirements for the container. Raises an exception if the value is invalid.""" + try: + return AuditMode(self.audit) + except ValueError: + raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.audit}" for the "audit" setting.') from None + + @property + def cgroup_enum(self) -> CGroupVersion: + """The control group version(s) required by the container. Raises an exception if the value is invalid.""" + try: + return CGroupVersion(self.cgroup) + except ValueError: + raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.cgroup}" for the "cgroup" setting.') from None + + def __post_init__(self): + if not self.image: + raise Exception(f'Docker completion entry "{self.name}" must provide an "image" setting.') + + if not self.supported_pythons and not self.placeholder: + raise Exception(f'Docker completion entry "{self.name}" must provide a "python" setting.') + + # verify properties can be correctly parsed to enums + assert self.audit_enum + assert self.cgroup_enum + + +@dataclasses.dataclass(frozen=True) +class NetworkRemoteCompletionConfig(RemoteCompletionConfig): + """Configuration for remote network platforms.""" + collection: str = '' + connection: str = '' + placeholder: bool = False + + def __post_init__(self): + if not self.placeholder: + super().__post_init__() + + +@dataclasses.dataclass(frozen=True) +class PosixRemoteCompletionConfig(RemoteCompletionConfig, PythonCompletionConfig): + """Configuration for remote POSIX platforms.""" + become: t.Optional[str] = None + placeholder: bool = False + + def __post_init__(self): + if not self.placeholder: + super().__post_init__() + + if self.become and self.become not in SUPPORTED_BECOME_METHODS: + raise Exception(f'POSIX remote completion entry "{self.name}" setting "become" must be omitted or one of: {", ".join(SUPPORTED_BECOME_METHODS)}') + + if not self.supported_pythons: + if self.version and not self.placeholder: + raise Exception(f'POSIX remote completion entry "{self.name}" must provide a "python" setting.') + else: + if not self.version: + raise Exception(f'POSIX remote completion entry "{self.name}" is a platform default and cannot provide a "python" setting.') + + +@dataclasses.dataclass(frozen=True) +class WindowsRemoteCompletionConfig(RemoteCompletionConfig): + """Configuration for remote Windows platforms.""" + + +TCompletionConfig = t.TypeVar('TCompletionConfig', bound=CompletionConfig) + + +def load_completion(name: str, completion_type: t.Type[TCompletionConfig]) -> dict[str, TCompletionConfig]: + """Load the named completion entries, returning them in dictionary form using the specified completion type.""" + lines = read_lines_without_comments(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'completion', '%s.txt' % name), remove_blank_lines=True) + + if data_context().content.collection: + context = 'collection' + else: + context = 'ansible-core' + + items = {name: data for name, data in [parse_completion_entry(line) for line in lines] if data.get('context', context) == context} + + for item in items.values(): + item.pop('context', None) + item.pop('placeholder', None) + + completion = {name: completion_type(name=name, **data) for name, data in items.items()} + + return completion + + +def parse_completion_entry(value: str) -> tuple[str, dict[str, str]]: + """Parse the given completion entry, returning the entry name and a dictionary of key/value settings.""" + values = value.split() + + name = values[0] + data = {kvp[0]: kvp[1] if len(kvp) > 1 else '' for kvp in [item.split('=', 1) for item in values[1:]]} + + return name, data + + +def filter_completion( + completion: dict[str, TCompletionConfig], + controller_only: bool = False, + include_defaults: bool = False, +) -> dict[str, TCompletionConfig]: + """Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified.""" + if controller_only: + # The cast is needed because mypy gets confused here and forgets that completion values are TCompletionConfig. + completion = {name: t.cast(TCompletionConfig, config) for name, config in completion.items() if + isinstance(config, PosixCompletionConfig) and config.controller_supported} + + if not include_defaults: + completion = {name: config for name, config in completion.items() if not config.is_default} + + return completion + + +@cache +def docker_completion() -> dict[str, DockerCompletionConfig]: + """Return docker completion entries.""" + return load_completion('docker', DockerCompletionConfig) + + +@cache +def remote_completion() -> dict[str, PosixRemoteCompletionConfig]: + """Return remote completion entries.""" + return load_completion('remote', PosixRemoteCompletionConfig) + + +@cache +def windows_completion() -> dict[str, WindowsRemoteCompletionConfig]: + """Return windows completion entries.""" + return load_completion('windows', WindowsRemoteCompletionConfig) + + +@cache +def network_completion() -> dict[str, NetworkRemoteCompletionConfig]: + """Return network completion entries.""" + return load_completion('network', NetworkRemoteCompletionConfig) |