summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli/compat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/cli/compat.py')
-rw-r--r--test/lib/ansible_test/_internal/cli/compat.py505
1 files changed, 505 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/cli/compat.py b/test/lib/ansible_test/_internal/cli/compat.py
new file mode 100644
index 0000000..93006d5
--- /dev/null
+++ b/test/lib/ansible_test/_internal/cli/compat.py
@@ -0,0 +1,505 @@
+"""Provides compatibility with first-generation host delegation options in ansible-test."""
+from __future__ import annotations
+
+import argparse
+import collections.abc as c
+import dataclasses
+import enum
+import os
+import types
+import typing as t
+
+from ..constants import (
+ CONTROLLER_PYTHON_VERSIONS,
+ SUPPORTED_PYTHON_VERSIONS,
+)
+
+from ..util import (
+ ApplicationError,
+ display,
+ filter_args,
+ sorted_versions,
+ str_to_version,
+)
+
+from ..docker_util import (
+ docker_available,
+)
+
+from ..completion import (
+ docker_completion,
+ remote_completion,
+ filter_completion,
+)
+
+from ..host_configs import (
+ ControllerConfig,
+ ControllerHostConfig,
+ DockerConfig,
+ FallbackDetail,
+ FallbackReason,
+ HostConfig,
+ HostContext,
+ HostSettings,
+ NativePythonConfig,
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+ OriginConfig,
+ PosixRemoteConfig,
+ VirtualPythonConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ..data import (
+ data_context,
+)
+
+
+def filter_python(version: t.Optional[str], versions: t.Optional[c.Sequence[str]]) -> t.Optional[str]:
+ """If a Python version is given and is in the given version list, return that Python version, otherwise return None."""
+ return version if version in versions else None
+
+
+def controller_python(version: t.Optional[str]) -> t.Optional[str]:
+ """If a Python version is given and is supported by the controller, return that Python version, otherwise return None."""
+ return filter_python(version, CONTROLLER_PYTHON_VERSIONS)
+
+
+def get_fallback_remote_controller() -> str:
+ """Return the remote fallback platform for the controller."""
+ platform = 'freebsd' # lower cost than RHEL and macOS
+ candidates = [item for item in filter_completion(remote_completion()).values() if item.controller_supported and item.platform == platform]
+ fallback = sorted(candidates, key=lambda value: str_to_version(value.version), reverse=True)[0]
+ return fallback.name
+
+
+def get_option_name(name: str) -> str:
+ """Return a command-line option name from the given option name."""
+ if name == 'targets':
+ name = 'target'
+
+ return f'--{name.replace("_", "-")}'
+
+
+class PythonVersionUnsupportedError(ApplicationError):
+ """A Python version was requested for a context which does not support that version."""
+ def __init__(self, context: str, version: str, versions: c.Iterable[str]) -> None:
+ super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}')
+
+
+class PythonVersionUnspecifiedError(ApplicationError):
+ """A Python version was not specified for a context which is unknown, thus the Python version is unknown."""
+ def __init__(self, context: str) -> None:
+ super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.')
+
+
+class ControllerNotSupportedError(ApplicationError):
+ """Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target."""
+ def __init__(self, context: str) -> None:
+ super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.')
+
+
+class OptionsConflictError(ApplicationError):
+ """Option(s) were specified which conflict with other options."""
+ def __init__(self, first: c.Iterable[str], second: c.Iterable[str]) -> None:
+ super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.')
+
+
+@dataclasses.dataclass(frozen=True)
+class LegacyHostOptions:
+ """Legacy host options used prior to the availability of separate controller and target host configuration."""
+ python: t.Optional[str] = None
+ python_interpreter: t.Optional[str] = None
+ local: t.Optional[bool] = None
+ venv: t.Optional[bool] = None
+ venv_system_site_packages: t.Optional[bool] = None
+ remote: t.Optional[str] = None
+ remote_provider: t.Optional[str] = None
+ remote_arch: t.Optional[str] = None
+ docker: t.Optional[str] = None
+ docker_privileged: t.Optional[bool] = None
+ docker_seccomp: t.Optional[str] = None
+ docker_memory: t.Optional[int] = None
+ windows: t.Optional[list[str]] = None
+ platform: t.Optional[list[str]] = None
+ platform_collection: t.Optional[list[tuple[str, str]]] = None
+ platform_connection: t.Optional[list[tuple[str, str]]] = None
+ inventory: t.Optional[str] = None
+
+ @staticmethod
+ def create(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> LegacyHostOptions:
+ """Create legacy host options from the given namespace."""
+ kwargs = {field.name: getattr(namespace, field.name, None) for field in dataclasses.fields(LegacyHostOptions)}
+
+ if kwargs['python'] == 'default':
+ kwargs['python'] = None
+
+ return LegacyHostOptions(**kwargs)
+
+ @staticmethod
+ def purge_namespace(namespace: t.Union[argparse.Namespace, types.SimpleNamespace]) -> None:
+ """Purge legacy host options fields from the given namespace."""
+ for field in dataclasses.fields(LegacyHostOptions):
+ if hasattr(namespace, field.name):
+ delattr(namespace, field.name)
+
+ @staticmethod
+ def purge_args(args: list[str]) -> list[str]:
+ """Purge legacy host options from the given command line arguments."""
+ fields: tuple[dataclasses.Field, ...] = dataclasses.fields(LegacyHostOptions)
+ filters: dict[str, int] = {get_option_name(field.name): 0 if field.type is t.Optional[bool] else 1 for field in fields}
+
+ return filter_args(args, filters)
+
+ def get_options_used(self) -> tuple[str, ...]:
+ """Return a tuple of the command line options used."""
+ fields: tuple[dataclasses.Field, ...] = dataclasses.fields(self)
+ options = tuple(sorted(get_option_name(field.name) for field in fields if getattr(self, field.name)))
+ return options
+
+
+class TargetMode(enum.Enum):
+ """Type of provisioning to use for the targets."""
+ WINDOWS_INTEGRATION = enum.auto() # windows-integration
+ NETWORK_INTEGRATION = enum.auto() # network-integration
+ POSIX_INTEGRATION = enum.auto() # integration
+ SANITY = enum.auto() # sanity
+ UNITS = enum.auto() # units
+ SHELL = enum.auto() # shell
+ NO_TARGETS = enum.auto() # coverage
+
+ @property
+ def one_host(self) -> bool:
+ """Return True if only one host (the controller) should be used, otherwise return False."""
+ return self in (TargetMode.SANITY, TargetMode.UNITS, TargetMode.NO_TARGETS)
+
+ @property
+ def no_fallback(self) -> bool:
+ """Return True if no fallback is acceptable for the controller (due to options not applying to the target), otherwise return False."""
+ return self in (TargetMode.WINDOWS_INTEGRATION, TargetMode.NETWORK_INTEGRATION, TargetMode.NO_TARGETS)
+
+ @property
+ def multiple_pythons(self) -> bool:
+ """Return True if multiple Python versions are allowed, otherwise False."""
+ return self in (TargetMode.SANITY, TargetMode.UNITS)
+
+ @property
+ def has_python(self) -> bool:
+ """Return True if this mode uses Python, otherwise False."""
+ return self in (TargetMode.POSIX_INTEGRATION, TargetMode.SANITY, TargetMode.UNITS, TargetMode.SHELL)
+
+
+def convert_legacy_args(
+ argv: list[str],
+ args: t.Union[argparse.Namespace, types.SimpleNamespace],
+ mode: TargetMode,
+) -> HostSettings:
+ """Convert pre-split host arguments in the given namespace to their split counterparts."""
+ old_options = LegacyHostOptions.create(args)
+ old_options.purge_namespace(args)
+
+ new_options = [
+ '--controller',
+ '--target',
+ '--target-python',
+ '--target-posix',
+ '--target-windows',
+ '--target-network',
+ ]
+
+ used_old_options = old_options.get_options_used()
+ used_new_options = [name for name in new_options if name in argv]
+
+ if used_old_options:
+ if used_new_options:
+ raise OptionsConflictError(used_old_options, used_new_options)
+
+ controller, targets, controller_fallback = get_legacy_host_config(mode, old_options)
+
+ if controller_fallback:
+ if mode.one_host:
+ display.info(controller_fallback.message, verbosity=1)
+ else:
+ display.warning(controller_fallback.message)
+
+ used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS) and not native_python(old_options)
+ else:
+ controller = args.controller or OriginConfig()
+ controller_fallback = None
+
+ if mode == TargetMode.NO_TARGETS:
+ targets = []
+ used_default_pythons = False
+ elif args.targets:
+ targets = args.targets
+ used_default_pythons = False
+ else:
+ targets = default_targets(mode, controller)
+ used_default_pythons = mode in (TargetMode.SANITY, TargetMode.UNITS)
+
+ args.controller = controller
+ args.targets = targets
+
+ if used_default_pythons:
+ control_targets = t.cast(list[ControllerConfig], targets)
+ skipped_python_versions = sorted_versions(list(set(SUPPORTED_PYTHON_VERSIONS) - {target.python.version for target in control_targets}))
+ else:
+ skipped_python_versions = []
+
+ filtered_args = old_options.purge_args(argv)
+ filtered_args = filter_args(filtered_args, {name: 1 for name in new_options})
+
+ host_settings = HostSettings(
+ controller=controller,
+ targets=targets,
+ skipped_python_versions=skipped_python_versions,
+ filtered_args=filtered_args,
+ controller_fallback=controller_fallback,
+ )
+
+ return host_settings
+
+
+def controller_targets(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+ controller: ControllerHostConfig,
+) -> list[HostConfig]:
+ """Return the configuration for controller targets."""
+ python = native_python(options)
+
+ targets: list[HostConfig]
+
+ if python:
+ targets = [ControllerConfig(python=python)]
+ else:
+ targets = default_targets(mode, controller)
+
+ return targets
+
+
+def native_python(options: LegacyHostOptions) -> t.Optional[NativePythonConfig]:
+ """Return a NativePythonConfig for the given version if it is not None, otherwise return None."""
+ if not options.python and not options.python_interpreter:
+ return None
+
+ return NativePythonConfig(version=options.python, path=options.python_interpreter)
+
+
+def get_legacy_host_config(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+) -> tuple[ControllerHostConfig, list[HostConfig], t.Optional[FallbackDetail]]:
+ """
+ Returns controller and target host configs derived from the provided legacy host options.
+ The goal is to match the original behavior, by using non-split testing whenever possible.
+ When the options support the controller, use the options for the controller and use ControllerConfig for the targets.
+ When the options do not support the controller, use the options for the targets and use a default controller config influenced by the options.
+ """
+ venv_fallback = 'venv/default'
+ docker_fallback = 'default'
+ remote_fallback = get_fallback_remote_controller()
+
+ controller_fallback: t.Optional[tuple[str, str, FallbackReason]] = None
+
+ controller: t.Optional[ControllerHostConfig]
+ targets: list[HostConfig]
+
+ if options.venv:
+ if controller_python(options.python) or not options.python:
+ controller = OriginConfig(python=VirtualPythonConfig(version=options.python or 'default', system_site_packages=options.venv_system_site_packages))
+ else:
+ controller_fallback = f'origin:python={venv_fallback}', f'--venv --python {options.python}', FallbackReason.PYTHON
+ controller = OriginConfig(python=VirtualPythonConfig(version='default', system_site_packages=options.venv_system_site_packages))
+
+ if mode in (TargetMode.SANITY, TargetMode.UNITS):
+ python = native_python(options)
+
+ if python:
+ control_targets = [ControllerConfig(python=python)]
+ else:
+ control_targets = controller.get_default_targets(HostContext(controller_config=controller))
+
+ # Target sanity tests either have no Python requirements or manage their own virtual environments.
+ # Thus, there is no point in setting up virtual environments ahead of time for them.
+
+ if mode == TargetMode.UNITS:
+ targets = [ControllerConfig(python=VirtualPythonConfig(version=target.python.version, path=target.python.path,
+ system_site_packages=options.venv_system_site_packages)) for target in control_targets]
+ else:
+ targets = t.cast(list[HostConfig], control_targets)
+ else:
+ targets = [ControllerConfig(python=VirtualPythonConfig(version=options.python or 'default',
+ system_site_packages=options.venv_system_site_packages))]
+ elif options.docker:
+ docker_config = filter_completion(docker_completion()).get(options.docker)
+
+ if docker_config:
+ if options.python and options.python not in docker_config.supported_pythons:
+ raise PythonVersionUnsupportedError(f'--docker {options.docker}', options.python, docker_config.supported_pythons)
+
+ if docker_config.controller_supported:
+ if controller_python(options.python) or not options.python:
+ controller = DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{options.docker}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON
+ controller = DockerConfig(name=options.docker)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker}', FallbackReason.ENVIRONMENT
+ controller = DockerConfig(name=docker_fallback)
+ targets = [DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)]
+ else:
+ if not options.python:
+ raise PythonVersionUnspecifiedError(f'--docker {options.docker}')
+
+ if controller_python(options.python):
+ controller = DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'docker:{docker_fallback}', f'--docker {options.docker} --python {options.python}', FallbackReason.PYTHON
+ controller = DockerConfig(name=docker_fallback)
+ targets = [DockerConfig(name=options.docker, python=native_python(options),
+ privileged=options.docker_privileged, seccomp=options.docker_seccomp, memory=options.docker_memory)]
+ elif options.remote:
+ remote_config = filter_completion(remote_completion()).get(options.remote)
+ context, reason = None, None
+
+ if remote_config:
+ if options.python and options.python not in remote_config.supported_pythons:
+ raise PythonVersionUnsupportedError(f'--remote {options.remote}', options.python, remote_config.supported_pythons)
+
+ if remote_config.controller_supported:
+ if controller_python(options.python) or not options.python:
+ controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider,
+ arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = f'remote:{options.remote}', f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON
+ controller = PosixRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ context, reason = f'--remote {options.remote}', FallbackReason.ENVIRONMENT
+ controller = None
+ targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)]
+ elif mode == TargetMode.SHELL and options.remote.startswith('windows/'):
+ if options.python and options.python not in CONTROLLER_PYTHON_VERSIONS:
+ raise ControllerNotSupportedError(f'--python {options.python}')
+
+ controller = OriginConfig(python=native_python(options))
+ targets = [WindowsRemoteConfig(name=options.remote, provider=options.remote_provider, arch=options.remote_arch)]
+ else:
+ if not options.python:
+ raise PythonVersionUnspecifiedError(f'--remote {options.remote}')
+
+ if controller_python(options.python):
+ controller = PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)
+ targets = controller_targets(mode, options, controller)
+ else:
+ context, reason = f'--remote {options.remote} --python {options.python}', FallbackReason.PYTHON
+ controller = None
+ targets = [PosixRemoteConfig(name=options.remote, python=native_python(options), provider=options.remote_provider, arch=options.remote_arch)]
+
+ if not controller:
+ if docker_available():
+ controller_fallback = f'docker:{docker_fallback}', context, reason
+ controller = DockerConfig(name=docker_fallback)
+ else:
+ controller_fallback = f'remote:{remote_fallback}', context, reason
+ controller = PosixRemoteConfig(name=remote_fallback)
+ else: # local/unspecified
+ # There are several changes in behavior from the legacy implementation when using no delegation (or the `--local` option).
+ # These changes are due to ansible-test now maintaining consistency between its own Python and that of controller Python subprocesses.
+ #
+ # 1) The `--python-interpreter` option (if different from sys.executable) now affects controller subprocesses and triggers re-execution of ansible-test.
+ # Previously this option was completely ignored except when used with the `--docker` or `--remote` options.
+ # 2) The `--python` option now triggers re-execution of ansible-test if it differs from sys.version_info.
+ # Previously it affected Python subprocesses, but not ansible-test itself.
+
+ if controller_python(options.python) or not options.python:
+ controller = OriginConfig(python=native_python(options))
+ targets = controller_targets(mode, options, controller)
+ else:
+ controller_fallback = 'origin:python=default', f'--python {options.python}', FallbackReason.PYTHON
+ controller = OriginConfig()
+ targets = controller_targets(mode, options, controller)
+
+ if controller_fallback:
+ controller_option, context, reason = controller_fallback
+
+ if mode.no_fallback:
+ raise ControllerNotSupportedError(context)
+
+ fallback_detail = FallbackDetail(
+ reason=reason,
+ message=f'Using `--controller {controller_option}` since `{context}` does not support the controller.',
+ )
+ else:
+ fallback_detail = None
+
+ if mode.one_host and any(not isinstance(target, ControllerConfig) for target in targets):
+ raise ControllerNotSupportedError(controller_fallback[1])
+
+ if mode == TargetMode.NO_TARGETS:
+ targets = []
+ else:
+ targets = handle_non_posix_targets(mode, options, targets)
+
+ return controller, targets, fallback_detail
+
+
+def handle_non_posix_targets(
+ mode: TargetMode,
+ options: LegacyHostOptions,
+ targets: list[HostConfig],
+) -> list[HostConfig]:
+ """Return a list of non-POSIX targets if the target mode is non-POSIX."""
+ if mode == TargetMode.WINDOWS_INTEGRATION:
+ if options.windows:
+ targets = [WindowsRemoteConfig(name=f'windows/{version}', provider=options.remote_provider, arch=options.remote_arch)
+ for version in options.windows]
+ else:
+ targets = [WindowsInventoryConfig(path=options.inventory)]
+ elif mode == TargetMode.NETWORK_INTEGRATION:
+ if options.platform:
+ network_targets = [NetworkRemoteConfig(name=platform, provider=options.remote_provider, arch=options.remote_arch) for platform in options.platform]
+
+ for platform, collection in options.platform_collection or []:
+ for entry in network_targets:
+ if entry.platform == platform:
+ entry.collection = collection
+
+ for platform, connection in options.platform_connection or []:
+ for entry in network_targets:
+ if entry.platform == platform:
+ entry.connection = connection
+
+ targets = t.cast(list[HostConfig], network_targets)
+ else:
+ targets = [NetworkInventoryConfig(path=options.inventory)]
+
+ return targets
+
+
+def default_targets(
+ mode: TargetMode,
+ controller: ControllerHostConfig,
+) -> list[HostConfig]:
+ """Return a list of default targets for the given target mode."""
+ targets: list[HostConfig]
+
+ if mode == TargetMode.WINDOWS_INTEGRATION:
+ targets = [WindowsInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.winrm')))]
+ elif mode == TargetMode.NETWORK_INTEGRATION:
+ targets = [NetworkInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.networking')))]
+ elif mode.multiple_pythons:
+ targets = t.cast(list[HostConfig], controller.get_default_targets(HostContext(controller_config=controller)))
+ else:
+ targets = [ControllerConfig()]
+
+ return targets