summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/integration
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/integration')
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/__init__.py967
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py389
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/acme.py79
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/aws.py131
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/azure.py166
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py62
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/cs.py174
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py55
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py94
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py168
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py55
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py106
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py92
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/nios.py97
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py60
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py114
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py56
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py138
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py55
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/coverage.py417
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/filters.py279
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/network.py77
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/posix.py48
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/windows.py81
24 files changed, 3960 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/integration/__init__.py b/test/lib/ansible_test/_internal/commands/integration/__init__.py
new file mode 100644
index 0000000..8864d2e
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/__init__.py
@@ -0,0 +1,967 @@
+"""Ansible integration test infrastructure."""
+from __future__ import annotations
+
+import collections.abc as c
+import contextlib
+import datetime
+import json
+import os
+import re
+import shutil
+import tempfile
+import time
+import typing as t
+
+from ...encoding import (
+ to_bytes,
+)
+
+from ...ansible_util import (
+ ansible_environment,
+)
+
+from ...executor import (
+ get_changes_filter,
+ AllTargetsSkipped,
+ Delegate,
+ ListTargets,
+)
+
+from ...python_requirements import (
+ install_requirements,
+)
+
+from ...ci import (
+ get_ci_provider,
+)
+
+from ...target import (
+ analyze_integration_target_dependencies,
+ walk_integration_targets,
+ IntegrationTarget,
+ walk_internal_targets,
+ TIntegrationTarget,
+ IntegrationTargetType,
+)
+
+from ...config import (
+ IntegrationConfig,
+ NetworkIntegrationConfig,
+ PosixIntegrationConfig,
+ WindowsIntegrationConfig,
+ TIntegrationConfig,
+)
+
+from ...io import (
+ make_dirs,
+ read_text_file,
+)
+
+from ...util import (
+ ApplicationError,
+ display,
+ SubprocessError,
+ remove_tree,
+)
+
+from ...util_common import (
+ named_temporary_file,
+ ResultType,
+ run_command,
+ write_json_test_results,
+ check_pyyaml,
+)
+
+from ...coverage_util import (
+ cover_python,
+)
+
+from ...cache import (
+ CommonCache,
+)
+
+from .cloud import (
+ CloudEnvironmentConfig,
+ cloud_filter,
+ cloud_init,
+ get_cloud_environment,
+ get_cloud_platforms,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ...host_configs import (
+ InventoryConfig,
+ OriginConfig,
+)
+
+from ...host_profiles import (
+ ControllerProfile,
+ ControllerHostProfile,
+ HostProfile,
+ PosixProfile,
+ SshTargetHostProfile,
+)
+
+from ...provisioning import (
+ HostState,
+ prepare_profiles,
+)
+
+from ...pypi_proxy import (
+ configure_pypi_proxy,
+)
+
+from ...inventory import (
+ create_controller_inventory,
+ create_windows_inventory,
+ create_network_inventory,
+ create_posix_inventory,
+)
+
+from .filters import (
+ get_target_filter,
+)
+
+from .coverage import (
+ CoverageManager,
+)
+
+THostProfile = t.TypeVar('THostProfile', bound=HostProfile)
+
+
+def generate_dependency_map(integration_targets: list[IntegrationTarget]) -> dict[str, set[IntegrationTarget]]:
+ """Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend."""
+ targets_dict = dict((target.name, target) for target in integration_targets)
+ target_dependencies = analyze_integration_target_dependencies(integration_targets)
+ dependency_map: dict[str, set[IntegrationTarget]] = {}
+
+ invalid_targets = set()
+
+ for dependency, dependents in target_dependencies.items():
+ dependency_target = targets_dict.get(dependency)
+
+ if not dependency_target:
+ invalid_targets.add(dependency)
+ continue
+
+ for dependent in dependents:
+ if dependent not in dependency_map:
+ dependency_map[dependent] = set()
+
+ dependency_map[dependent].add(dependency_target)
+
+ if invalid_targets:
+ raise ApplicationError('Non-existent target dependencies: %s' % ', '.join(sorted(invalid_targets)))
+
+ return dependency_map
+
+
+def get_files_needed(target_dependencies: list[IntegrationTarget]) -> list[str]:
+ """Return a list of files needed by the given list of target dependencies."""
+ files_needed: list[str] = []
+
+ for target_dependency in target_dependencies:
+ files_needed += target_dependency.needs_file
+
+ files_needed = sorted(set(files_needed))
+
+ invalid_paths = [path for path in files_needed if not os.path.isfile(path)]
+
+ if invalid_paths:
+ raise ApplicationError('Invalid "needs/file/*" aliases:\n%s' % '\n'.join(invalid_paths))
+
+ return files_needed
+
+
+def check_inventory(args: IntegrationConfig, inventory_path: str) -> None:
+ """Check the given inventory for issues."""
+ if not isinstance(args.controller, OriginConfig):
+ if os.path.exists(inventory_path):
+ inventory = read_text_file(inventory_path)
+
+ if 'ansible_ssh_private_key_file' in inventory:
+ display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.')
+
+
+def get_inventory_absolute_path(args: IntegrationConfig, target: InventoryConfig) -> str:
+ """Return the absolute inventory path used for the given integration configuration or target inventory config (if provided)."""
+ path = target.path or os.path.basename(get_inventory_relative_path(args))
+
+ if args.host_path:
+ path = os.path.join(data_context().content.root, path) # post-delegation, path is relative to the content root
+ else:
+ path = os.path.join(data_context().content.root, data_context().content.integration_path, path)
+
+ return path
+
+
+def get_inventory_relative_path(args: IntegrationConfig) -> str:
+ """Return the inventory path used for the given integration configuration relative to the content root."""
+ inventory_names: dict[t.Type[IntegrationConfig], str] = {
+ PosixIntegrationConfig: 'inventory',
+ WindowsIntegrationConfig: 'inventory.winrm',
+ NetworkIntegrationConfig: 'inventory.networking',
+ }
+
+ return os.path.join(data_context().content.integration_path, inventory_names[type(args)])
+
+
+def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None:
+ """Make the given inventory available during delegation."""
+ if isinstance(args, PosixIntegrationConfig):
+ return
+
+ def inventory_callback(files: list[tuple[str, str]]) -> None:
+ """
+ Add the inventory file to the payload file list.
+ This will preserve the file during delegation even if it is ignored or is outside the content and install roots.
+ """
+ inventory_path = get_inventory_relative_path(args)
+ inventory_tuple = inventory_path_src, inventory_path
+
+ if os.path.isfile(inventory_path_src) and inventory_tuple not in files:
+ originals = [item for item in files if item[1] == inventory_path]
+
+ if originals:
+ for original in originals:
+ files.remove(original)
+
+ display.warning('Overriding inventory file "%s" with "%s".' % (inventory_path, inventory_path_src))
+ else:
+ display.notice('Sourcing inventory file "%s" from "%s".' % (inventory_path, inventory_path_src))
+
+ files.append(inventory_tuple)
+
+ data_context().register_payload_callback(inventory_callback)
+
+
+@contextlib.contextmanager
+def integration_test_environment(
+ args: IntegrationConfig,
+ target: IntegrationTarget,
+ inventory_path_src: str,
+) -> c.Iterator[IntegrationEnvironment]:
+ """Context manager that prepares the integration test environment and cleans it up."""
+ ansible_config_src = args.get_ansible_config()
+ ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command)
+
+ if args.no_temp_workdir or 'no/temp_workdir/' in target.aliases:
+ display.warning('Disabling the temp work dir is a temporary debugging feature that may be removed in the future without notice.')
+
+ integration_dir = os.path.join(data_context().content.root, data_context().content.integration_path)
+ targets_dir = os.path.join(data_context().content.root, data_context().content.integration_targets_path)
+ inventory_path = inventory_path_src
+ ansible_config = ansible_config_src
+ vars_file = os.path.join(data_context().content.root, data_context().content.integration_vars_path)
+
+ yield IntegrationEnvironment(data_context().content.root, integration_dir, targets_dir, inventory_path, ansible_config, vars_file)
+ return
+
+ # When testing a collection, the temporary directory must reside within the collection.
+ # This is necessary to enable support for the default collection for non-collection content (playbooks and roles).
+ root_temp_dir = os.path.join(ResultType.TMP.path, 'integration')
+
+ prefix = '%s-' % target.name
+ suffix = '-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8'
+
+ if args.no_temp_unicode or 'no/temp_unicode/' in target.aliases:
+ display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.')
+ suffix = '-ansible'
+
+ if args.explain:
+ temp_dir = os.path.join(root_temp_dir, '%stemp%s' % (prefix, suffix))
+ else:
+ make_dirs(root_temp_dir)
+ temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir)
+
+ try:
+ display.info('Preparing temporary directory: %s' % temp_dir, verbosity=2)
+
+ inventory_relative_path = get_inventory_relative_path(args)
+ inventory_path = os.path.join(temp_dir, inventory_relative_path)
+
+ cache = IntegrationCache(args)
+
+ target_dependencies = sorted([target] + list(cache.dependency_map.get(target.name, set())))
+
+ files_needed = get_files_needed(target_dependencies)
+
+ integration_dir = os.path.join(temp_dir, data_context().content.integration_path)
+ targets_dir = os.path.join(temp_dir, data_context().content.integration_targets_path)
+ ansible_config = os.path.join(temp_dir, ansible_config_relative)
+
+ vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path)
+ vars_file = os.path.join(temp_dir, data_context().content.integration_vars_path)
+
+ file_copies = [
+ (ansible_config_src, ansible_config),
+ (inventory_path_src, inventory_path),
+ ]
+
+ if os.path.exists(vars_file_src):
+ file_copies.append((vars_file_src, vars_file))
+
+ file_copies += [(path, os.path.join(temp_dir, path)) for path in files_needed]
+
+ integration_targets_relative_path = data_context().content.integration_targets_path
+
+ directory_copies = [
+ (
+ os.path.join(integration_targets_relative_path, target.relative_path),
+ os.path.join(temp_dir, integration_targets_relative_path, target.relative_path)
+ )
+ for target in target_dependencies
+ ]
+
+ directory_copies = sorted(set(directory_copies))
+ file_copies = sorted(set(file_copies))
+
+ if not args.explain:
+ make_dirs(integration_dir)
+
+ for dir_src, dir_dst in directory_copies:
+ display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2)
+
+ if not args.explain:
+ shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True) # type: ignore[arg-type] # incorrect type stub omits bytes path support
+
+ for file_src, file_dst in file_copies:
+ display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2)
+
+ if not args.explain:
+ make_dirs(os.path.dirname(file_dst))
+ shutil.copy2(file_src, file_dst)
+
+ yield IntegrationEnvironment(temp_dir, integration_dir, targets_dir, inventory_path, ansible_config, vars_file)
+ finally:
+ if not args.explain:
+ remove_tree(temp_dir)
+
+
+@contextlib.contextmanager
+def integration_test_config_file(
+ args: IntegrationConfig,
+ env_config: CloudEnvironmentConfig,
+ integration_dir: str,
+) -> c.Iterator[t.Optional[str]]:
+ """Context manager that provides a config file for integration tests, if needed."""
+ if not env_config:
+ yield None
+ return
+
+ config_vars = (env_config.ansible_vars or {}).copy()
+
+ config_vars.update(dict(
+ ansible_test=dict(
+ environment=env_config.env_vars,
+ module_defaults=env_config.module_defaults,
+ )
+ ))
+
+ config_file = json.dumps(config_vars, indent=4, sort_keys=True)
+
+ with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path: # type: str
+ filename = os.path.relpath(path, integration_dir)
+
+ display.info('>>> Config File: %s\n%s' % (filename, config_file), verbosity=3)
+
+ yield path
+
+
+def create_inventory(
+ args: IntegrationConfig,
+ host_state: HostState,
+ inventory_path: str,
+ target: IntegrationTarget,
+) -> None:
+ """Create inventory."""
+ if isinstance(args, PosixIntegrationConfig):
+ if target.target_type == IntegrationTargetType.CONTROLLER:
+ display.info('Configuring controller inventory.', verbosity=1)
+ create_controller_inventory(args, inventory_path, host_state.controller_profile)
+ elif target.target_type == IntegrationTargetType.TARGET:
+ display.info('Configuring target inventory.', verbosity=1)
+ create_posix_inventory(args, inventory_path, host_state.target_profiles, 'needs/ssh/' in target.aliases)
+ else:
+ raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}')
+ elif isinstance(args, WindowsIntegrationConfig):
+ display.info('Configuring target inventory.', verbosity=1)
+ target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target)
+ create_windows_inventory(args, inventory_path, target_profiles)
+ elif isinstance(args, NetworkIntegrationConfig):
+ display.info('Configuring target inventory.', verbosity=1)
+ target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target)
+ create_network_inventory(args, inventory_path, target_profiles)
+
+
+def command_integration_filtered(
+ args: IntegrationConfig,
+ host_state: HostState,
+ targets: tuple[IntegrationTarget, ...],
+ all_targets: tuple[IntegrationTarget, ...],
+ inventory_path: str,
+ pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None,
+ post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None,
+):
+ """Run integration tests for the specified targets."""
+ found = False
+ passed = []
+ failed = []
+
+ targets_iter = iter(targets)
+ all_targets_dict = dict((target.name, target) for target in all_targets)
+
+ setup_errors = []
+ setup_targets_executed: set[str] = set()
+
+ for target in all_targets:
+ for setup_target in target.setup_once + target.setup_always:
+ if setup_target not in all_targets_dict:
+ setup_errors.append('Target "%s" contains invalid setup target: %s' % (target.name, setup_target))
+
+ if setup_errors:
+ raise ApplicationError('Found %d invalid setup aliases:\n%s' % (len(setup_errors), '\n'.join(setup_errors)))
+
+ check_pyyaml(host_state.controller_profile.python)
+
+ test_dir = os.path.join(ResultType.TMP.path, 'output_dir')
+
+ if not args.explain and any('needs/ssh/' in target.aliases for target in targets):
+ max_tries = 20
+ display.info('SSH connection to controller required by tests. Checking the connection.')
+ for i in range(1, max_tries + 1):
+ try:
+ run_command(args, ['ssh', '-o', 'BatchMode=yes', 'localhost', 'id'], capture=True)
+ display.info('SSH service responded.')
+ break
+ except SubprocessError:
+ if i == max_tries:
+ raise
+ seconds = 3
+ display.warning('SSH service not responding. Waiting %d second(s) before checking again.' % seconds)
+ time.sleep(seconds)
+
+ start_at_task = args.start_at_task
+
+ results = {}
+
+ target_profile = host_state.target_profiles[0]
+
+ if isinstance(target_profile, PosixProfile):
+ target_python = target_profile.python
+
+ if isinstance(target_profile, ControllerProfile):
+ if host_state.controller_profile.python.path != target_profile.python.path:
+ install_requirements(args, target_python, command=True, controller=False) # integration
+ elif isinstance(target_profile, SshTargetHostProfile):
+ connection = target_profile.get_controller_target_connections()[0]
+ install_requirements(args, target_python, command=True, controller=False, connection=connection) # integration
+
+ coverage_manager = CoverageManager(args, host_state, inventory_path)
+ coverage_manager.setup()
+
+ try:
+ for target in targets_iter:
+ if args.start_at and not found:
+ found = target.name == args.start_at
+
+ if not found:
+ continue
+
+ create_inventory(args, host_state, inventory_path, target)
+
+ tries = 2 if args.retry_on_error else 1
+ verbosity = args.verbosity
+
+ cloud_environment = get_cloud_environment(args, target)
+
+ try:
+ while tries:
+ tries -= 1
+
+ try:
+ if cloud_environment:
+ cloud_environment.setup_once()
+
+ run_setup_targets(args, host_state, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path,
+ coverage_manager, False)
+
+ start_time = time.time()
+
+ if pre_target:
+ pre_target(target)
+
+ run_setup_targets(args, host_state, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path,
+ coverage_manager, True)
+
+ if not args.explain:
+ # create a fresh test directory for each test target
+ remove_tree(test_dir)
+ make_dirs(test_dir)
+
+ try:
+ if target.script_path:
+ command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager)
+ else:
+ command_integration_role(args, host_state, target, start_at_task, test_dir, inventory_path, coverage_manager)
+ start_at_task = None
+ finally:
+ if post_target:
+ post_target(target)
+
+ end_time = time.time()
+
+ results[target.name] = dict(
+ name=target.name,
+ type=target.type,
+ aliases=target.aliases,
+ modules=target.modules,
+ run_time_seconds=int(end_time - start_time),
+ setup_once=target.setup_once,
+ setup_always=target.setup_always,
+ )
+
+ break
+ except SubprocessError:
+ if cloud_environment:
+ cloud_environment.on_failure(target, tries)
+
+ if not tries:
+ raise
+
+ if target.retry_never:
+ display.warning(f'Skipping retry of test target "{target.name}" since it has been excluded from retries.')
+ raise
+
+ display.warning('Retrying test target "%s" with maximum verbosity.' % target.name)
+ display.verbosity = args.verbosity = 6
+
+ passed.append(target)
+ except Exception as ex:
+ failed.append(target)
+
+ if args.continue_on_error:
+ display.error(str(ex))
+ continue
+
+ display.notice('To resume at this test target, use the option: --start-at %s' % target.name)
+
+ next_target = next(targets_iter, None)
+
+ if next_target:
+ display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name)
+
+ raise
+ finally:
+ display.verbosity = args.verbosity = verbosity
+
+ finally:
+ if not args.explain:
+ coverage_manager.teardown()
+
+ result_name = '%s-%s.json' % (
+ args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0))))
+
+ data = dict(
+ targets=results,
+ )
+
+ write_json_test_results(ResultType.DATA, result_name, data)
+
+ if failed:
+ raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % (
+ len(failed), len(passed) + len(failed), '\n'.join(target.name for target in failed)))
+
+
+def command_integration_script(
+ args: IntegrationConfig,
+ host_state: HostState,
+ target: IntegrationTarget,
+ test_dir: str,
+ inventory_path: str,
+ coverage_manager: CoverageManager,
+):
+ """Run an integration test script."""
+ display.info('Running %s integration test script' % target.name)
+
+ env_config = None
+
+ if isinstance(args, PosixIntegrationConfig):
+ cloud_environment = get_cloud_environment(args, target)
+
+ if cloud_environment:
+ env_config = cloud_environment.get_environment_config()
+
+ if env_config:
+ display.info('>>> Environment Config\n%s' % json.dumps(dict(
+ env_vars=env_config.env_vars,
+ ansible_vars=env_config.ansible_vars,
+ callback_plugins=env_config.callback_plugins,
+ module_defaults=env_config.module_defaults,
+ ), indent=4, sort_keys=True), verbosity=3)
+
+ with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment
+ cmd = ['./%s' % os.path.basename(target.script_path)]
+
+ if args.verbosity:
+ cmd.append('-' + ('v' * args.verbosity))
+
+ env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env)
+ cwd = os.path.join(test_env.targets_dir, target.relative_path)
+
+ env.update(dict(
+ # support use of adhoc ansible commands in collections without specifying the fully qualified collection name
+ ANSIBLE_PLAYBOOK_DIR=cwd,
+ ))
+
+ if env_config and env_config.env_vars:
+ env.update(env_config.env_vars)
+
+ with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path: # type: t.Optional[str]
+ if config_path:
+ cmd += ['-e', '@%s' % config_path]
+
+ env.update(coverage_manager.get_environment(target.name, target.aliases))
+ cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False)
+
+
+def command_integration_role(
+ args: IntegrationConfig,
+ host_state: HostState,
+ target: IntegrationTarget,
+ start_at_task: t.Optional[str],
+ test_dir: str,
+ inventory_path: str,
+ coverage_manager: CoverageManager,
+):
+ """Run an integration test role."""
+ display.info('Running %s integration test role' % target.name)
+
+ env_config = None
+
+ vars_files = []
+ variables = dict(
+ output_dir=test_dir,
+ )
+
+ if isinstance(args, WindowsIntegrationConfig):
+ hosts = 'windows'
+ gather_facts = False
+ variables.update(dict(
+ win_output_dir=r'C:\ansible_testing',
+ ))
+ elif isinstance(args, NetworkIntegrationConfig):
+ hosts = target.network_platform
+ gather_facts = False
+ else:
+ hosts = 'testhost'
+ gather_facts = True
+
+ if 'gather_facts/yes/' in target.aliases:
+ gather_facts = True
+ elif 'gather_facts/no/' in target.aliases:
+ gather_facts = False
+
+ if not isinstance(args, NetworkIntegrationConfig):
+ cloud_environment = get_cloud_environment(args, target)
+
+ if cloud_environment:
+ env_config = cloud_environment.get_environment_config()
+
+ if env_config:
+ display.info('>>> Environment Config\n%s' % json.dumps(dict(
+ env_vars=env_config.env_vars,
+ ansible_vars=env_config.ansible_vars,
+ callback_plugins=env_config.callback_plugins,
+ module_defaults=env_config.module_defaults,
+ ), indent=4, sort_keys=True), verbosity=3)
+
+ with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment
+ if os.path.exists(test_env.vars_file):
+ vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir))
+
+ play = dict(
+ hosts=hosts,
+ gather_facts=gather_facts,
+ vars_files=vars_files,
+ vars=variables,
+ roles=[
+ target.name,
+ ],
+ )
+
+ if env_config:
+ if env_config.ansible_vars:
+ variables.update(env_config.ansible_vars)
+
+ play.update(dict(
+ environment=env_config.env_vars,
+ module_defaults=env_config.module_defaults,
+ ))
+
+ playbook = json.dumps([play], indent=4, sort_keys=True)
+
+ with named_temporary_file(args=args, directory=test_env.integration_dir, prefix='%s-' % target.name, suffix='.yml', content=playbook) as playbook_path:
+ filename = os.path.basename(playbook_path)
+
+ display.info('>>> Playbook: %s\n%s' % (filename, playbook.strip()), verbosity=3)
+
+ cmd = ['ansible-playbook', filename, '-i', os.path.relpath(test_env.inventory_path, test_env.integration_dir)]
+
+ if start_at_task:
+ cmd += ['--start-at-task', start_at_task]
+
+ if args.tags:
+ cmd += ['--tags', args.tags]
+
+ if args.skip_tags:
+ cmd += ['--skip-tags', args.skip_tags]
+
+ if args.diff:
+ cmd += ['--diff']
+
+ if isinstance(args, NetworkIntegrationConfig):
+ if args.testcase:
+ cmd += ['-e', 'testcase=%s' % args.testcase]
+
+ if args.verbosity:
+ cmd.append('-' + ('v' * args.verbosity))
+
+ env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env)
+ cwd = test_env.integration_dir
+
+ env.update(dict(
+ # support use of adhoc ansible commands in collections without specifying the fully qualified collection name
+ ANSIBLE_PLAYBOOK_DIR=cwd,
+ ))
+
+ if env_config and env_config.env_vars:
+ env.update(env_config.env_vars)
+
+ env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir
+
+ env.update(coverage_manager.get_environment(target.name, target.aliases))
+ cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False)
+
+
+def run_setup_targets(
+ args: IntegrationConfig,
+ host_state: HostState,
+ test_dir: str,
+ target_names: c.Sequence[str],
+ targets_dict: dict[str, IntegrationTarget],
+ targets_executed: set[str],
+ inventory_path: str,
+ coverage_manager: CoverageManager,
+ always: bool,
+):
+ """Run setup targets."""
+ for target_name in target_names:
+ if not always and target_name in targets_executed:
+ continue
+
+ target = targets_dict[target_name]
+
+ if not args.explain:
+ # create a fresh test directory for each test target
+ remove_tree(test_dir)
+ make_dirs(test_dir)
+
+ if target.script_path:
+ command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager)
+ else:
+ command_integration_role(args, host_state, target, None, test_dir, inventory_path, coverage_manager)
+
+ targets_executed.add(target_name)
+
+
+def integration_environment(
+ args: IntegrationConfig,
+ target: IntegrationTarget,
+ test_dir: str,
+ inventory_path: str,
+ ansible_config: t.Optional[str],
+ env_config: t.Optional[CloudEnvironmentConfig],
+ test_env: IntegrationEnvironment,
+) -> dict[str, str]:
+ """Return a dictionary of environment variables to use when running the given integration test target."""
+ env = ansible_environment(args, ansible_config=ansible_config)
+
+ callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else [])
+
+ integration = dict(
+ JUNIT_OUTPUT_DIR=ResultType.JUNIT.path,
+ JUNIT_TASK_RELATIVE_PATH=test_env.test_dir,
+ JUNIT_REPLACE_OUT_OF_TREE_PATH='out-of-tree:',
+ ANSIBLE_CALLBACKS_ENABLED=','.join(sorted(set(callback_plugins))),
+ ANSIBLE_TEST_CI=args.metadata.ci_provider or get_ci_provider().code,
+ ANSIBLE_TEST_COVERAGE='check' if args.coverage_check else ('yes' if args.coverage else ''),
+ OUTPUT_DIR=test_dir,
+ INVENTORY_PATH=os.path.abspath(inventory_path),
+ )
+
+ if args.debug_strategy:
+ env.update(dict(ANSIBLE_STRATEGY='debug'))
+
+ if 'non_local/' in target.aliases:
+ if args.coverage:
+ display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name)
+
+ env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER=''))
+
+ env.update(integration)
+
+ return env
+
+
+class IntegrationEnvironment:
+ """Details about the integration environment."""
+ def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None:
+ self.test_dir = test_dir
+ self.integration_dir = integration_dir
+ self.targets_dir = targets_dir
+ self.inventory_path = inventory_path
+ self.ansible_config = ansible_config
+ self.vars_file = vars_file
+
+
+class IntegrationCache(CommonCache):
+ """Integration cache."""
+ @property
+ def integration_targets(self) -> list[IntegrationTarget]:
+ """The list of integration test targets."""
+ return self.get('integration_targets', lambda: list(walk_integration_targets()))
+
+ @property
+ def dependency_map(self) -> dict[str, set[IntegrationTarget]]:
+ """The dependency map of integration test targets."""
+ return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets))
+
+
+def filter_profiles_for_target(args: IntegrationConfig, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
+ """Return a list of profiles after applying target filters."""
+ if target.target_type == IntegrationTargetType.CONTROLLER:
+ profile_filter = get_target_filter(args, [args.controller], True)
+ elif target.target_type == IntegrationTargetType.TARGET:
+ profile_filter = get_target_filter(args, args.targets, False)
+ else:
+ raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}')
+
+ profiles = profile_filter.filter_profiles(profiles, target)
+
+ return profiles
+
+
+def get_integration_filter(args: IntegrationConfig, targets: list[IntegrationTarget]) -> set[str]:
+ """Return a list of test targets to skip based on the host(s) that will be used to run the specified test targets."""
+ invalid_targets = sorted(target.name for target in targets if target.target_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET))
+
+ if invalid_targets and not args.list_targets:
+ message = f'''Unable to determine context for the following test targets: {", ".join(invalid_targets)}
+
+Make sure the test targets are correctly named:
+
+ - Modules - The target name should match the module name.
+ - Plugins - The target name should be "{{plugin_type}}_{{plugin_name}}".
+
+If necessary, context can be controlled by adding entries to the "aliases" file for a test target:
+
+ - Add the name(s) of modules which are tested.
+ - Add "context/target" for module and module_utils tests (these will run on the target host).
+ - Add "context/controller" for other test types (these will run on the controller).'''
+
+ raise ApplicationError(message)
+
+ invalid_targets = sorted(target.name for target in targets if target.actual_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET))
+
+ if invalid_targets:
+ if data_context().content.is_ansible:
+ display.warning(f'Unable to determine context for the following test targets: {", ".join(invalid_targets)}')
+ else:
+ display.warning(f'Unable to determine context for the following test targets, they will be run on the target host: {", ".join(invalid_targets)}')
+
+ exclude: set[str] = set()
+
+ controller_targets = [target for target in targets if target.target_type == IntegrationTargetType.CONTROLLER]
+ target_targets = [target for target in targets if target.target_type == IntegrationTargetType.TARGET]
+
+ controller_filter = get_target_filter(args, [args.controller], True)
+ target_filter = get_target_filter(args, args.targets, False)
+
+ controller_filter.filter_targets(controller_targets, exclude)
+ target_filter.filter_targets(target_targets, exclude)
+
+ return exclude
+
+
+def command_integration_filter(args: TIntegrationConfig,
+ targets: c.Iterable[TIntegrationTarget],
+ ) -> tuple[HostState, tuple[TIntegrationTarget, ...]]:
+ """Filter the given integration test targets."""
+ targets = tuple(target for target in targets if 'hidden/' not in target.aliases)
+ changes = get_changes_filter(args)
+
+ # special behavior when the --changed-all-target target is selected based on changes
+ if args.changed_all_target in changes:
+ # act as though the --changed-all-target target was in the include list
+ if args.changed_all_mode == 'include' and args.changed_all_target not in args.include:
+ args.include.append(args.changed_all_target)
+ args.delegate_args += ['--include', args.changed_all_target]
+ # act as though the --changed-all-target target was in the exclude list
+ elif args.changed_all_mode == 'exclude' and args.changed_all_target not in args.exclude:
+ args.exclude.append(args.changed_all_target)
+
+ require = args.require + changes
+ exclude = args.exclude
+
+ internal_targets = walk_internal_targets(targets, args.include, exclude, require)
+ environment_exclude = get_integration_filter(args, list(internal_targets))
+
+ environment_exclude |= set(cloud_filter(args, internal_targets))
+
+ if environment_exclude:
+ exclude = sorted(set(exclude) | environment_exclude)
+ internal_targets = walk_internal_targets(targets, args.include, exclude, require)
+
+ if not internal_targets:
+ raise AllTargetsSkipped()
+
+ if args.start_at and not any(target.name == args.start_at for target in internal_targets):
+ raise ApplicationError('Start at target matches nothing: %s' % args.start_at)
+
+ cloud_init(args, internal_targets)
+
+ vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path)
+
+ if os.path.exists(vars_file_src):
+ def integration_config_callback(files: list[tuple[str, str]]) -> None:
+ """
+ Add the integration config vars file to the payload file list.
+ This will preserve the file during delegation even if the file is ignored by source control.
+ """
+ files.append((vars_file_src, data_context().content.integration_vars_path))
+
+ data_context().register_payload_callback(integration_config_callback)
+
+ if args.list_targets:
+ raise ListTargets([target.name for target in internal_targets])
+
+ # requirements are installed using a callback since the windows-integration and network-integration host status checks depend on them
+ host_state = prepare_profiles(args, targets_use_pypi=True, requirements=requirements) # integration, windows-integration, network-integration
+
+ if args.delegate:
+ raise Delegate(host_state=host_state, require=require, exclude=exclude)
+
+ return host_state, internal_targets
+
+
+def requirements(host_profile: HostProfile) -> None:
+ """Install requirements after bootstrapping and delegation."""
+ if isinstance(host_profile, ControllerHostProfile) and host_profile.controller:
+ configure_pypi_proxy(host_profile.args, host_profile) # integration, windows-integration, network-integration
+ install_requirements(host_profile.args, host_profile.python, ansible=True, command=True) # integration, windows-integration, network-integration
+ elif isinstance(host_profile, PosixProfile) and not isinstance(host_profile, ControllerProfile):
+ configure_pypi_proxy(host_profile.args, host_profile) # integration
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py
new file mode 100644
index 0000000..0c078b9
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py
@@ -0,0 +1,389 @@
+"""Plugin system for cloud providers and environments for use in integration tests."""
+from __future__ import annotations
+
+import abc
+import atexit
+import datetime
+import os
+import re
+import tempfile
+import time
+import typing as t
+
+from ....encoding import (
+ to_bytes,
+)
+
+from ....io import (
+ read_text_file,
+)
+
+from ....util import (
+ ANSIBLE_TEST_CONFIG_ROOT,
+ ApplicationError,
+ display,
+ import_plugins,
+ load_plugins,
+ cache,
+)
+
+from ....util_common import (
+ ResultType,
+ write_json_test_results,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....config import (
+ IntegrationConfig,
+ TestConfig,
+)
+
+from ....ci import (
+ get_ci_provider,
+)
+
+from ....data import (
+ data_context,
+)
+
+from ....docker_util import (
+ docker_available,
+)
+
+
+@cache
+def get_cloud_plugins() -> tuple[dict[str, t.Type[CloudProvider]], dict[str, t.Type[CloudEnvironment]]]:
+ """Import cloud plugins and load them into the plugin dictionaries."""
+ import_plugins('commands/integration/cloud')
+
+ providers: dict[str, t.Type[CloudProvider]] = {}
+ environments: dict[str, t.Type[CloudEnvironment]] = {}
+
+ load_plugins(CloudProvider, providers)
+ load_plugins(CloudEnvironment, environments)
+
+ return providers, environments
+
+
+@cache
+def get_provider_plugins() -> dict[str, t.Type[CloudProvider]]:
+ """Return a dictionary of the available cloud provider plugins."""
+ return get_cloud_plugins()[0]
+
+
+@cache
+def get_environment_plugins() -> dict[str, t.Type[CloudEnvironment]]:
+ """Return a dictionary of the available cloud environment plugins."""
+ return get_cloud_plugins()[1]
+
+
+def get_cloud_platforms(args: TestConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[str]:
+ """Return cloud platform names for the specified targets."""
+ if isinstance(args, IntegrationConfig):
+ if args.list_targets:
+ return []
+
+ if targets is None:
+ cloud_platforms = set(args.metadata.cloud_config or [])
+ else:
+ cloud_platforms = set(get_cloud_platform(target) for target in targets)
+
+ cloud_platforms.discard(None)
+
+ return sorted(cloud_platforms)
+
+
+def get_cloud_platform(target: IntegrationTarget) -> t.Optional[str]:
+ """Return the name of the cloud platform used for the given target, or None if no cloud platform is used."""
+ cloud_platforms = set(a.split('/')[1] for a in target.aliases if a.startswith('cloud/') and a.endswith('/') and a != 'cloud/')
+
+ if not cloud_platforms:
+ return None
+
+ if len(cloud_platforms) == 1:
+ cloud_platform = cloud_platforms.pop()
+
+ if cloud_platform not in get_provider_plugins():
+ raise ApplicationError('Target %s aliases contains unknown cloud platform: %s' % (target.name, cloud_platform))
+
+ return cloud_platform
+
+ raise ApplicationError('Target %s aliases contains multiple cloud platforms: %s' % (target.name, ', '.join(sorted(cloud_platforms))))
+
+
+def get_cloud_providers(args: IntegrationConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[CloudProvider]:
+ """Return a list of cloud providers for the given targets."""
+ return [get_provider_plugins()[p](args) for p in get_cloud_platforms(args, targets)]
+
+
+def get_cloud_environment(args: IntegrationConfig, target: IntegrationTarget) -> t.Optional[CloudEnvironment]:
+ """Return the cloud environment for the given target, or None if no cloud environment is used for the target."""
+ cloud_platform = get_cloud_platform(target)
+
+ if not cloud_platform:
+ return None
+
+ return get_environment_plugins()[cloud_platform](args)
+
+
+def cloud_filter(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> list[str]:
+ """Return a list of target names to exclude based on the given targets."""
+ if args.metadata.cloud_config is not None:
+ return [] # cloud filter already performed prior to delegation
+
+ exclude: list[str] = []
+
+ for provider in get_cloud_providers(args, targets):
+ provider.filter(targets, exclude)
+
+ return exclude
+
+
+def cloud_init(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> None:
+ """Initialize cloud plugins for the given targets."""
+ if args.metadata.cloud_config is not None:
+ return # cloud configuration already established prior to delegation
+
+ args.metadata.cloud_config = {}
+
+ results = {}
+
+ for provider in get_cloud_providers(args, targets):
+ if args.prime_containers and not provider.uses_docker:
+ continue
+
+ args.metadata.cloud_config[provider.platform] = {}
+
+ start_time = time.time()
+ provider.setup()
+ end_time = time.time()
+
+ results[provider.platform] = dict(
+ platform=provider.platform,
+ setup_seconds=int(end_time - start_time),
+ targets=[target.name for target in targets],
+ )
+
+ if not args.explain and results:
+ result_name = '%s-%s.json' % (
+ args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0))))
+
+ data = dict(
+ clouds=results,
+ )
+
+ write_json_test_results(ResultType.DATA, result_name, data)
+
+
+class CloudBase(metaclass=abc.ABCMeta):
+ """Base class for cloud plugins."""
+ _CONFIG_PATH = 'config_path'
+ _RESOURCE_PREFIX = 'resource_prefix'
+ _MANAGED = 'managed'
+ _SETUP_EXECUTED = 'setup_executed'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ self.args = args
+ self.platform = self.__module__.rsplit('.', 1)[-1]
+
+ def config_callback(files: list[tuple[str, str]]) -> None:
+ """Add the config file to the payload file list."""
+ if self.platform not in self.args.metadata.cloud_config:
+ return # platform was initialized, but not used -- such as being skipped due to all tests being disabled
+
+ if self._get_cloud_config(self._CONFIG_PATH, ''):
+ pair = (self.config_path, os.path.relpath(self.config_path, data_context().content.root))
+
+ if pair not in files:
+ display.info('Including %s config: %s -> %s' % (self.platform, pair[0], pair[1]), verbosity=3)
+ files.append(pair)
+
+ data_context().register_payload_callback(config_callback)
+
+ @property
+ def setup_executed(self) -> bool:
+ """True if setup has been executed, otherwise False."""
+ return t.cast(bool, self._get_cloud_config(self._SETUP_EXECUTED, False))
+
+ @setup_executed.setter
+ def setup_executed(self, value: bool) -> None:
+ """True if setup has been executed, otherwise False."""
+ self._set_cloud_config(self._SETUP_EXECUTED, value)
+
+ @property
+ def config_path(self) -> str:
+ """Path to the configuration file."""
+ return os.path.join(data_context().content.root, str(self._get_cloud_config(self._CONFIG_PATH)))
+
+ @config_path.setter
+ def config_path(self, value: str) -> None:
+ """Path to the configuration file."""
+ self._set_cloud_config(self._CONFIG_PATH, value)
+
+ @property
+ def resource_prefix(self) -> str:
+ """Resource prefix."""
+ return str(self._get_cloud_config(self._RESOURCE_PREFIX))
+
+ @resource_prefix.setter
+ def resource_prefix(self, value: str) -> None:
+ """Resource prefix."""
+ self._set_cloud_config(self._RESOURCE_PREFIX, value)
+
+ @property
+ def managed(self) -> bool:
+ """True if resources are managed by ansible-test, otherwise False."""
+ return t.cast(bool, self._get_cloud_config(self._MANAGED))
+
+ @managed.setter
+ def managed(self, value: bool) -> None:
+ """True if resources are managed by ansible-test, otherwise False."""
+ self._set_cloud_config(self._MANAGED, value)
+
+ def _get_cloud_config(self, key: str, default: t.Optional[t.Union[str, int, bool]] = None) -> t.Union[str, int, bool]:
+ """Return the specified value from the internal configuration."""
+ if default is not None:
+ return self.args.metadata.cloud_config[self.platform].get(key, default)
+
+ return self.args.metadata.cloud_config[self.platform][key]
+
+ def _set_cloud_config(self, key: str, value: t.Union[str, int, bool]) -> None:
+ """Set the specified key and value in the internal configuration."""
+ self.args.metadata.cloud_config[self.platform][key] = value
+
+
+class CloudProvider(CloudBase):
+ """Base class for cloud provider plugins. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig, config_extension: str = '.ini') -> None:
+ super().__init__(args)
+
+ self.ci_provider = get_ci_provider()
+ self.remove_config = False
+ self.config_static_name = 'cloud-config-%s%s' % (self.platform, config_extension)
+ self.config_static_path = os.path.join(data_context().content.integration_path, self.config_static_name)
+ self.config_template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, '%s.template' % self.config_static_name)
+ self.config_extension = config_extension
+
+ self.uses_config = False
+ self.uses_docker = False
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ if not self.uses_docker and not self.uses_config:
+ return
+
+ if self.uses_docker and docker_available():
+ return
+
+ if self.uses_config and os.path.exists(self.config_static_path):
+ return
+
+ skip = 'cloud/%s/' % self.platform
+ skipped = [target.name for target in targets if skip in target.aliases]
+
+ if skipped:
+ exclude.append(skip)
+
+ if not self.uses_docker and self.uses_config:
+ display.warning('Excluding tests marked "%s" which require a "%s" config file (see "%s"): %s'
+ % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped)))
+ elif self.uses_docker and not self.uses_config:
+ display.warning('Excluding tests marked "%s" which requires container support: %s'
+ % (skip.rstrip('/'), ', '.join(skipped)))
+ elif self.uses_docker and self.uses_config:
+ display.warning('Excluding tests marked "%s" which requires container support or a "%s" config file (see "%s"): %s'
+ % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped)))
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ self.resource_prefix = self.ci_provider.generate_resource_prefix()
+ self.resource_prefix = re.sub(r'[^a-zA-Z0-9]+', '-', self.resource_prefix)[:63].lower().rstrip('-')
+
+ atexit.register(self.cleanup)
+
+ def cleanup(self) -> None:
+ """Clean up the cloud resource and any temporary configuration files after tests complete."""
+ if self.remove_config:
+ os.remove(self.config_path)
+
+ def _use_static_config(self) -> bool:
+ """Use a static config file if available. Returns True if static config is used, otherwise returns False."""
+ if os.path.isfile(self.config_static_path):
+ display.info('Using existing %s cloud config: %s' % (self.platform, self.config_static_path), verbosity=1)
+ self.config_path = self.config_static_path
+ static = True
+ else:
+ static = False
+
+ self.managed = not static
+
+ return static
+
+ def _write_config(self, content: str) -> None:
+ """Write the given content to the config file."""
+ prefix = '%s-' % os.path.splitext(os.path.basename(self.config_static_path))[0]
+
+ with tempfile.NamedTemporaryFile(dir=data_context().content.integration_path, prefix=prefix, suffix=self.config_extension, delete=False) as config_fd:
+ filename = os.path.join(data_context().content.integration_path, os.path.basename(config_fd.name))
+
+ self.config_path = filename
+ self.remove_config = True
+
+ display.info('>>> Config: %s\n%s' % (filename, content.strip()), verbosity=3)
+
+ config_fd.write(to_bytes(content))
+ config_fd.flush()
+
+ def _read_config_template(self) -> str:
+ """Read and return the configuration template."""
+ lines = read_text_file(self.config_template_path).splitlines()
+ lines = [line for line in lines if not line.startswith('#')]
+ config = '\n'.join(lines).strip() + '\n'
+ return config
+
+ @staticmethod
+ def _populate_config_template(template: str, values: dict[str, str]) -> str:
+ """Populate and return the given template with the provided values."""
+ for key in sorted(values):
+ value = values[key]
+ template = template.replace('@%s' % key, value)
+
+ return template
+
+
+class CloudEnvironment(CloudBase):
+ """Base class for cloud environment plugins. Updates integration test environment after delegation."""
+ def setup_once(self) -> None:
+ """Run setup if it has not already been run."""
+ if self.setup_executed:
+ return
+
+ self.setup()
+ self.setup_executed = True
+
+ def setup(self) -> None:
+ """Setup which should be done once per environment instead of once per test target."""
+
+ @abc.abstractmethod
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+
+
+class CloudEnvironmentConfig:
+ """Configuration for the environment."""
+ def __init__(self,
+ env_vars: t.Optional[dict[str, str]] = None,
+ ansible_vars: t.Optional[dict[str, t.Any]] = None,
+ module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None,
+ callback_plugins: t.Optional[list[str]] = None,
+ ):
+ self.env_vars = env_vars
+ self.ansible_vars = ansible_vars
+ self.module_defaults = module_defaults
+ self.callback_plugins = callback_plugins
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py
new file mode 100644
index 0000000..007d383
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py
@@ -0,0 +1,79 @@
+"""ACME plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ACMEProvider(CloudProvider):
+ """ACME plugin. Sets up cloud resources for tests."""
+ DOCKER_SIMULATOR_NAME = 'acme-simulator'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # The simulator must be pinned to a specific version to guarantee CI passes with the version used.
+ if os.environ.get('ANSIBLE_ACME_CONTAINER'):
+ self.image = os.environ.get('ANSIBLE_ACME_CONTAINER')
+ else:
+ self.image = 'quay.io/ansible/acme-test-container:2.1.0'
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Create a ACME test container using docker."""
+ ports = [
+ 5000, # control port for flask app in container
+ 14000, # Pebble ACME CA
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('acme_host', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class ACMEEnvironment(CloudEnvironment):
+ """ACME environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ ansible_vars = dict(
+ acme_host=self._get_cloud_config('acme_host'),
+ )
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py
new file mode 100644
index 0000000..234f311
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py
@@ -0,0 +1,131 @@
+"""AWS plugin for integration tests."""
+from __future__ import annotations
+
+import os
+import uuid
+import configparser
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from ....host_configs import (
+ OriginConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class AwsCloudProvider(CloudProvider):
+ """AWS cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ aws_config_path = os.path.expanduser('~/.aws')
+
+ if os.path.exists(aws_config_path) and isinstance(self.args.controller, OriginConfig):
+ raise ApplicationError('Rename "%s" or use the --docker or --remote option to isolate tests.' % aws_config_path)
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Request AWS credentials through the Ansible Core CI service."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+
+ aci = self._create_ansible_core_ci()
+
+ response = aci.start()
+
+ if not self.args.explain:
+ credentials = response['aws']['credentials']
+
+ values = dict(
+ ACCESS_KEY=credentials['access_key'],
+ SECRET_KEY=credentials['secret_key'],
+ SECURITY_TOKEN=credentials['session_token'],
+ REGION='us-east-1',
+ )
+
+ display.sensitive.add(values['SECRET_KEY'])
+ display.sensitive.add(values['SECURITY_TOKEN'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return an AWS instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='aws'))
+
+
+class AwsCloudEnvironment(CloudEnvironment):
+ """AWS cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars: dict[str, t.Any] = dict(
+ resource_prefix=self.resource_prefix,
+ tiny_prefix=uuid.uuid4().hex[0:12]
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ display.sensitive.add(ansible_vars.get('aws_secret_key'))
+ display.sensitive.add(ansible_vars.get('security_token'))
+
+ if 'aws_cleanup' not in ansible_vars:
+ ansible_vars['aws_cleanup'] = not self.managed
+
+ env_vars = {'ANSIBLE_DEBUG_BOTOCORE_LOGS': 'True'}
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ callback_plugins=['aws_resource_actions'],
+ )
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+ if not tries and self.managed:
+ display.notice('If %s failed due to permissions, the IAM test policy may need to be updated. '
+ 'https://docs.ansible.com/ansible/devel/collections/amazon/aws/docsite/dev_guidelines.html#aws-permissions-for-integration-tests'
+ % target.name)
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py
new file mode 100644
index 0000000..dc5136a
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py
@@ -0,0 +1,166 @@
+"""Azure plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class AzureCloudProvider(CloudProvider):
+ """Azure cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.aci: t.Optional[AnsibleCoreCI] = None
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ get_config(self.config_path) # check required variables
+
+ def cleanup(self) -> None:
+ """Clean up the cloud resource and any temporary configuration files after tests complete."""
+ if self.aci:
+ self.aci.stop()
+
+ super().cleanup()
+
+ def _setup_dynamic(self) -> None:
+ """Request Azure credentials through ansible-core-ci."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+ response = {}
+
+ aci = self._create_ansible_core_ci()
+
+ aci_result = aci.start()
+
+ if not self.args.explain:
+ response = aci_result['azure']
+ self.aci = aci
+
+ if not self.args.explain:
+ values = dict(
+ AZURE_CLIENT_ID=response['clientId'],
+ AZURE_SECRET=response['clientSecret'],
+ AZURE_SUBSCRIPTION_ID=response['subscriptionId'],
+ AZURE_TENANT=response['tenantId'],
+ RESOURCE_GROUP=response['resourceGroupNames'][0],
+ RESOURCE_GROUP_SECONDARY=response['resourceGroupNames'][1],
+ )
+
+ display.sensitive.add(values['AZURE_SECRET'])
+
+ config = '\n'.join('%s: %s' % (key, values[key]) for key in sorted(values))
+
+ config = '[default]\n' + config
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return an Azure instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='azure'))
+
+
+class AzureCloudEnvironment(CloudEnvironment):
+ """Azure cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = get_config(self.config_path)
+
+ display.sensitive.add(env_vars.get('AZURE_SECRET'))
+ display.sensitive.add(env_vars.get('AZURE_PASSWORD'))
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+ if not tries and self.managed:
+ display.notice('If %s failed due to permissions, the test policy may need to be updated.' % target.name)
+
+
+def get_config(config_path: str) -> dict[str, str]:
+ """Return a configuration dictionary parsed from the given configuration path."""
+ parser = configparser.ConfigParser()
+ parser.read(config_path)
+
+ config = dict((key.upper(), value) for key, value in parser.items('default'))
+
+ rg_vars = (
+ 'RESOURCE_GROUP',
+ 'RESOURCE_GROUP_SECONDARY',
+ )
+
+ sp_vars = (
+ 'AZURE_CLIENT_ID',
+ 'AZURE_SECRET',
+ 'AZURE_SUBSCRIPTION_ID',
+ 'AZURE_TENANT',
+ )
+
+ ad_vars = (
+ 'AZURE_AD_USER',
+ 'AZURE_PASSWORD',
+ 'AZURE_SUBSCRIPTION_ID',
+ )
+
+ rg_ok = all(var in config for var in rg_vars)
+ sp_ok = all(var in config for var in sp_vars)
+ ad_ok = all(var in config for var in ad_vars)
+
+ if not rg_ok:
+ raise ApplicationError('Resource groups must be defined with: %s' % ', '.join(sorted(rg_vars)))
+
+ if not sp_ok and not ad_ok:
+ raise ApplicationError('Credentials must be defined using either:\nService Principal: %s\nActive Directory: %s' % (
+ ', '.join(sorted(sp_vars)), ', '.join(sorted(ad_vars))))
+
+ return config
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py
new file mode 100644
index 0000000..f453ef3
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+#
+# (c) 2018, Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+"""Cloudscale plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class CloudscaleCloudProvider(CloudProvider):
+ """Cloudscale cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class CloudscaleCloudEnvironment(CloudEnvironment):
+ """Cloudscale cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ CLOUDSCALE_API_TOKEN=parser.get('default', 'cloudscale_api_token'),
+ )
+
+ display.sensitive.add(env_vars['CLOUDSCALE_API_TOKEN'])
+
+ ansible_vars = dict(
+ cloudscale_resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py
new file mode 100644
index 0000000..0037b42
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py
@@ -0,0 +1,174 @@
+"""CloudStack plugin for integration tests."""
+from __future__ import annotations
+
+import json
+import configparser
+import os
+import urllib.parse
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....docker_util import (
+ docker_exec,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+ wait_for_file,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class CsCloudProvider(CloudProvider):
+ """CloudStack cloud provider plugin. Sets up cloud resources before delegation."""
+ DOCKER_SIMULATOR_NAME = 'cloudstack-sim'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.image = os.environ.get('ANSIBLE_CLOUDSTACK_CONTAINER', 'quay.io/ansible/cloudstack-test-container:1.4.0')
+ self.host = ''
+ self.port = 0
+
+ self.uses_docker = True
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_static(self) -> None:
+ """Configure CloudStack tests for use with static configuration."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_static_path)
+
+ endpoint = parser.get('cloudstack', 'endpoint')
+
+ parts = urllib.parse.urlparse(endpoint)
+
+ self.host = parts.hostname
+
+ if not self.host:
+ raise ApplicationError('Could not determine host from endpoint: %s' % endpoint)
+
+ if parts.port:
+ self.port = parts.port
+ elif parts.scheme == 'http':
+ self.port = 80
+ elif parts.scheme == 'https':
+ self.port = 443
+ else:
+ raise ApplicationError('Could not determine port from endpoint: %s' % endpoint)
+
+ display.info('Read cs host "%s" and port %d from config: %s' % (self.host, self.port, self.config_static_path), verbosity=1)
+
+ def _setup_dynamic(self) -> None:
+ """Create a CloudStack simulator using docker."""
+ config = self._read_config_template()
+
+ self.port = 8888
+
+ ports = [
+ self.port,
+ ]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ if not descriptor:
+ return
+
+ # apply work-around for OverlayFS issue
+ # https://github.com/docker/for-linux/issues/72#issuecomment-319904698
+ docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'], capture=True)
+
+ if self.args.explain:
+ values = dict(
+ HOST=self.host,
+ PORT=str(self.port),
+ )
+ else:
+ credentials = self._get_credentials(self.DOCKER_SIMULATOR_NAME)
+
+ values = dict(
+ HOST=self.DOCKER_SIMULATOR_NAME,
+ PORT=str(self.port),
+ KEY=credentials['apikey'],
+ SECRET=credentials['secretkey'],
+ )
+
+ display.sensitive.add(values['SECRET'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _get_credentials(self, container_name: str) -> dict[str, t.Any]:
+ """Wait for the CloudStack simulator to return credentials."""
+ def check(value) -> bool:
+ """Return True if the given configuration is valid JSON, otherwise return False."""
+ # noinspection PyBroadException
+ try:
+ json.loads(value)
+ except Exception: # pylint: disable=broad-except
+ return False # sometimes the file exists but is not yet valid JSON
+
+ return True
+
+ stdout = wait_for_file(self.args, container_name, '/var/www/html/admin.json', sleep=10, tries=30, check=check)
+
+ return json.loads(stdout)
+
+
+class CsCloudEnvironment(CloudEnvironment):
+ """CloudStack cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ config = dict(parser.items('default'))
+
+ env_vars = dict(
+ CLOUDSTACK_ENDPOINT=config['endpoint'],
+ CLOUDSTACK_KEY=config['key'],
+ CLOUDSTACK_SECRET=config['secret'],
+ CLOUDSTACK_TIMEOUT=config['timeout'],
+ )
+
+ display.sensitive.add(env_vars['CLOUDSTACK_SECRET'])
+
+ ansible_vars = dict(
+ cs_resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py
new file mode 100644
index 0000000..a46bf70
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py
@@ -0,0 +1,55 @@
+"""DigitalOcean plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class DigitalOceanCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class DigitalOceanCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ DO_API_KEY=parser.get('default', 'key'),
+ )
+
+ display.sensitive.add(env_vars['DO_API_KEY'])
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py
new file mode 100644
index 0000000..c2413ee
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py
@@ -0,0 +1,94 @@
+"""Foreman plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ForemanProvider(CloudProvider):
+ """Foreman plugin. Sets up Foreman stub server for tests."""
+ DOCKER_SIMULATOR_NAME = 'foreman-stub'
+
+ # Default image to run Foreman stub from.
+ #
+ # The simulator must be pinned to a specific version
+ # to guarantee CI passes with the version used.
+ #
+ # It's source source itself resides at:
+ # https://github.com/ansible/foreman-test-container
+ DOCKER_IMAGE = 'quay.io/ansible/foreman-test-container:1.4.0'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.__container_from_env = os.environ.get('ANSIBLE_FRMNSIM_CONTAINER')
+ """
+ Overrides target container, might be used for development.
+
+ Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want
+ to use other image. Omit/empty otherwise.
+ """
+ self.image = self.__container_from_env or self.DOCKER_IMAGE
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Spawn a Foreman stub within docker container."""
+ foreman_port = 8080
+
+ ports = [
+ foreman_port,
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('FOREMAN_HOST', self.DOCKER_SIMULATOR_NAME)
+ self._set_cloud_config('FOREMAN_PORT', str(foreman_port))
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class ForemanEnvironment(CloudEnvironment):
+ """Foreman environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = dict(
+ FOREMAN_HOST=str(self._get_cloud_config('FOREMAN_HOST')),
+ FOREMAN_PORT=str(self._get_cloud_config('FOREMAN_PORT')),
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py
new file mode 100644
index 0000000..e180a02
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py
@@ -0,0 +1,168 @@
+"""Galaxy (ansible-galaxy) plugin for integration tests."""
+from __future__ import annotations
+
+import os
+import tempfile
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....docker_util import (
+ docker_cp_to,
+)
+
+from ....containers import (
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+# We add BasicAuthentication, to make the tasks that deal with
+# direct API access easier to deal with across galaxy_ng and pulp
+SETTINGS = b'''
+CONTENT_ORIGIN = 'http://ansible-ci-pulp:80'
+ANSIBLE_API_HOSTNAME = 'http://ansible-ci-pulp:80'
+ANSIBLE_CONTENT_HOSTNAME = 'http://ansible-ci-pulp:80/pulp/content'
+TOKEN_AUTH_DISABLED = True
+GALAXY_REQUIRE_CONTENT_APPROVAL = False
+GALAXY_AUTHENTICATION_CLASSES = [
+ "rest_framework.authentication.SessionAuthentication",
+ "rest_framework.authentication.TokenAuthentication",
+ "rest_framework.authentication.BasicAuthentication",
+]
+'''
+
+SET_ADMIN_PASSWORD = b'''#!/usr/bin/execlineb -S0
+foreground {
+ redirfd -w 1 /dev/null
+ redirfd -w 2 /dev/null
+ export DJANGO_SETTINGS_MODULE pulpcore.app.settings
+ export PULP_CONTENT_ORIGIN localhost
+ s6-setuidgid postgres
+ if { /usr/local/bin/django-admin reset-admin-password --password password }
+ if { /usr/local/bin/pulpcore-manager create-group system:partner-engineers --users admin }
+}
+'''
+
+# There are 2 overrides here:
+# 1. Change the gunicorn bind address from 127.0.0.1 to 0.0.0.0 now that Galaxy NG does not allow us to access the
+# Pulp API through it.
+# 2. Grant access allowing us to DELETE a namespace in Galaxy NG. This is as CI deletes and recreates repos and
+# distributions in Pulp which now breaks the namespace in Galaxy NG. Recreating it is the "simple" fix to get it
+# working again.
+# These may not be needed in the future, especially if 1 becomes configurable by an env var but for now they must be
+# done.
+OVERRIDES = b'''#!/usr/bin/execlineb -S0
+foreground {
+ sed -i "0,/\\"127.0.0.1:24817\\"/s//\\"0.0.0.0:24817\\"/" /etc/services.d/pulpcore-api/run
+}
+
+# This sed calls changes the first occurrence to "allow" which is conveniently the delete operation for a namespace.
+# https://github.com/ansible/galaxy_ng/blob/master/galaxy_ng/app/access_control/statements/standalone.py#L9-L11.
+backtick NG_PREFIX { python -c "import galaxy_ng; print(galaxy_ng.__path__[0], end='')" }
+importas ng_prefix NG_PREFIX
+foreground {
+ sed -i "0,/\\"effect\\": \\"deny\\"/s//\\"effect\\": \\"allow\\"/" ${ng_prefix}/app/access_control/statements/standalone.py
+}'''
+
+
+class GalaxyProvider(CloudProvider):
+ """
+ Galaxy plugin. Sets up pulp (ansible-galaxy) servers for tests.
+ The pulp source itself resides at: https://github.com/pulp/pulp-oci-images
+ """
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # Cannot use the latest container image as either galaxy_ng 4.2.0rc2 or pulp 0.5.0 has sporatic issues with
+ # dropping published collections in CI. Try running the tests multiple times when updating. Will also need to
+ # comment out the cache tests in 'test/integration/targets/ansible-galaxy-collection/tasks/install.yml' when
+ # the newer update is available.
+ self.pulp = os.environ.get(
+ 'ANSIBLE_PULP_CONTAINER',
+ 'quay.io/ansible/pulp-galaxy-ng:b79a7be64eff'
+ )
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ galaxy_port = 80
+ pulp_host = 'ansible-ci-pulp'
+ pulp_port = 24817
+
+ ports = [
+ galaxy_port,
+ pulp_port,
+ ]
+
+ # Create the container, don't run it, we need to inject configs before it starts
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.pulp,
+ pulp_host,
+ ports,
+ start=False,
+ allow_existing=True,
+ )
+
+ if not descriptor:
+ return
+
+ if not descriptor.running:
+ pulp_id = descriptor.container_id
+
+ injected_files = {
+ '/etc/pulp/settings.py': SETTINGS,
+ '/etc/cont-init.d/111-postgres': SET_ADMIN_PASSWORD,
+ '/etc/cont-init.d/000-ansible-test-overrides': OVERRIDES,
+ }
+ for path, content in injected_files.items():
+ with tempfile.NamedTemporaryFile() as temp_fd:
+ temp_fd.write(content)
+ temp_fd.flush()
+ docker_cp_to(self.args, pulp_id, temp_fd.name, path)
+
+ descriptor.start(self.args)
+
+ self._set_cloud_config('PULP_HOST', pulp_host)
+ self._set_cloud_config('PULP_PORT', str(pulp_port))
+ self._set_cloud_config('GALAXY_PORT', str(galaxy_port))
+ self._set_cloud_config('PULP_USER', 'admin')
+ self._set_cloud_config('PULP_PASSWORD', 'password')
+
+
+class GalaxyEnvironment(CloudEnvironment):
+ """Galaxy environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ pulp_user = str(self._get_cloud_config('PULP_USER'))
+ pulp_password = str(self._get_cloud_config('PULP_PASSWORD'))
+ pulp_host = self._get_cloud_config('PULP_HOST')
+ galaxy_port = self._get_cloud_config('GALAXY_PORT')
+ pulp_port = self._get_cloud_config('PULP_PORT')
+
+ return CloudEnvironmentConfig(
+ ansible_vars=dict(
+ pulp_user=pulp_user,
+ pulp_password=pulp_password,
+ pulp_api='http://%s:%s' % (pulp_host, pulp_port),
+ pulp_server='http://%s:%s/pulp_ansible/galaxy/' % (pulp_host, pulp_port),
+ galaxy_ng_server='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port),
+ ),
+ env_vars=dict(
+ PULP_USER=pulp_user,
+ PULP_PASSWORD=pulp_password,
+ PULP_SERVER='http://%s:%s/pulp_ansible/galaxy/api/' % (pulp_host, pulp_port),
+ GALAXY_NG_SERVER='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port),
+ ),
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py
new file mode 100644
index 0000000..28ffb7b
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py
@@ -0,0 +1,55 @@
+# Copyright: (c) 2018, Google Inc.
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""GCP plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class GcpCloudProvider(CloudProvider):
+ """GCP cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ display.notice(
+ 'static configuration could not be used. are you missing a template file?'
+ )
+
+
+class GcpCloudEnvironment(CloudEnvironment):
+ """GCP cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py
new file mode 100644
index 0000000..4d75f22
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py
@@ -0,0 +1,106 @@
+"""Hetzner Cloud plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class HcloudCloudProvider(CloudProvider):
+ """Hetzner Cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Request Hetzner credentials through the Ansible Core CI service."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+
+ aci = self._create_ansible_core_ci()
+
+ response = aci.start()
+
+ if not self.args.explain:
+ token = response['hetzner']['token']
+
+ display.sensitive.add(token)
+ display.info('Hetzner Cloud Token: %s' % token, verbosity=1)
+
+ values = dict(
+ TOKEN=token,
+ )
+
+ display.sensitive.add(values['TOKEN'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return a Heztner instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='hetzner'))
+
+
+class HcloudCloudEnvironment(CloudEnvironment):
+ """Hetzner Cloud cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ HCLOUD_TOKEN=parser.get('default', 'hcloud_api_token'),
+ )
+
+ display.sensitive.add(env_vars['HCLOUD_TOKEN'])
+
+ ansible_vars = dict(
+ hcloud_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py
new file mode 100644
index 0000000..e250eed
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py
@@ -0,0 +1,92 @@
+"""HTTP Tester plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....util import (
+ display,
+ generate_password,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+KRB5_PASSWORD_ENV = 'KRB5_PASSWORD'
+
+
+class HttptesterProvider(CloudProvider):
+ """HTTP Tester provider plugin. Sets up resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.image = os.environ.get('ANSIBLE_HTTP_TEST_CONTAINER', 'quay.io/ansible/http-test-container:2.1.0')
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup resources before delegation."""
+ super().setup()
+
+ ports = [
+ 80,
+ 88,
+ 443,
+ 444,
+ 749,
+ ]
+
+ aliases = [
+ 'ansible.http.tests',
+ 'sni1.ansible.http.tests',
+ 'fail.ansible.http.tests',
+ 'self-signed.ansible.http.tests',
+ ]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ 'http-test-container',
+ ports,
+ aliases=aliases,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ env={
+ KRB5_PASSWORD_ENV: generate_password(),
+ },
+ )
+
+ if not descriptor:
+ return
+
+ # Read the password from the container environment.
+ # This allows the tests to work when re-using an existing container.
+ # The password is marked as sensitive, since it may differ from the one we generated.
+ krb5_password = descriptor.details.container.env_dict()[KRB5_PASSWORD_ENV]
+ display.sensitive.add(krb5_password)
+
+ self._set_cloud_config(KRB5_PASSWORD_ENV, krb5_password)
+
+
+class HttptesterEnvironment(CloudEnvironment):
+ """HTTP Tester environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ return CloudEnvironmentConfig(
+ env_vars=dict(
+ HTTPTESTER='1', # backwards compatibility for tests intended to work with or without HTTP Tester
+ KRB5_PASSWORD=str(self._get_cloud_config(KRB5_PASSWORD_ENV)),
+ )
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py
new file mode 100644
index 0000000..df0ebb0
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py
@@ -0,0 +1,97 @@
+"""NIOS plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class NiosProvider(CloudProvider):
+ """Nios plugin. Sets up NIOS mock server for tests."""
+ DOCKER_SIMULATOR_NAME = 'nios-simulator'
+
+ # Default image to run the nios simulator.
+ #
+ # The simulator must be pinned to a specific version
+ # to guarantee CI passes with the version used.
+ #
+ # It's source source itself resides at:
+ # https://github.com/ansible/nios-test-container
+ DOCKER_IMAGE = 'quay.io/ansible/nios-test-container:1.4.0'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.__container_from_env = os.environ.get('ANSIBLE_NIOSSIM_CONTAINER')
+ """
+ Overrides target container, might be used for development.
+
+ Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want
+ to use other image. Omit/empty otherwise.
+ """
+
+ self.image = self.__container_from_env or self.DOCKER_IMAGE
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Spawn a NIOS simulator within docker container."""
+ nios_port = 443
+
+ ports = [
+ nios_port,
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('NIOS_HOST', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class NiosEnvironment(CloudEnvironment):
+ """NIOS environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ ansible_vars = dict(
+ nios_provider=dict(
+ host=self._get_cloud_config('NIOS_HOST'),
+ username='admin',
+ password='infoblox',
+ ),
+ )
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py
new file mode 100644
index 0000000..d005a3c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py
@@ -0,0 +1,60 @@
+"""OpenNebula plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class OpenNebulaCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ self.uses_config = True
+
+ def _setup_dynamic(self) -> None:
+ display.info('No config file provided, will run test from fixtures')
+
+ config = self._read_config_template()
+ values = dict(
+ URL="http://localhost/RPC2",
+ USERNAME='oneadmin',
+ PASSWORD='onepass',
+ FIXTURES='true',
+ REPLAY='true',
+ )
+ config = self._populate_config_template(config, values)
+ self._write_config(config)
+
+
+class OpenNebulaCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ display.sensitive.add(ansible_vars.get('opennebula_password'))
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py
new file mode 100644
index 0000000..da930c0
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py
@@ -0,0 +1,114 @@
+"""OpenShift plugin for integration tests."""
+from __future__ import annotations
+
+import re
+
+from ....io import (
+ read_text_file,
+)
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+ wait_for_file,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class OpenShiftCloudProvider(CloudProvider):
+ """OpenShift cloud provider plugin. Sets up cloud resources before delegation."""
+ DOCKER_CONTAINER_NAME = 'openshift-origin'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args, config_extension='.kubeconfig')
+
+ # The image must be pinned to a specific version to guarantee CI passes with the version used.
+ self.image = 'quay.io/ansible/openshift-origin:v3.9.0'
+
+ self.uses_docker = True
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_static(self) -> None:
+ """Configure OpenShift tests for use with static configuration."""
+ config = read_text_file(self.config_static_path)
+
+ match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE)
+
+ if not match:
+ display.warning('Could not find OpenShift endpoint in kubeconfig.')
+
+ def _setup_dynamic(self) -> None:
+ """Create a OpenShift container using docker."""
+ port = 8443
+
+ ports = [
+ port,
+ ]
+
+ cmd = ['start', 'master', '--listen', 'https://0.0.0.0:%d' % port]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_CONTAINER_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ cmd=cmd,
+ )
+
+ if not descriptor:
+ return
+
+ if self.args.explain:
+ config = '# Unknown'
+ else:
+ config = self._get_config(self.DOCKER_CONTAINER_NAME, 'https://%s:%s/' % (self.DOCKER_CONTAINER_NAME, port))
+
+ self._write_config(config)
+
+ def _get_config(self, container_name: str, server: str) -> str:
+ """Get OpenShift config from container."""
+ stdout = wait_for_file(self.args, container_name, '/var/lib/origin/openshift.local.config/master/admin.kubeconfig', sleep=10, tries=30)
+
+ config = stdout
+ config = re.sub(r'^( *)certificate-authority-data: .*$', r'\1insecure-skip-tls-verify: true', config, flags=re.MULTILINE)
+ config = re.sub(r'^( *)server: .*$', r'\1server: %s' % server, config, flags=re.MULTILINE)
+
+ return config
+
+
+class OpenShiftCloudEnvironment(CloudEnvironment):
+ """OpenShift cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = dict(
+ K8S_AUTH_KUBECONFIG=self.config_path,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py
new file mode 100644
index 0000000..04c2d89
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py
@@ -0,0 +1,56 @@
+"""Scaleway plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ScalewayCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class ScalewayCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ SCW_API_KEY=parser.get('default', 'key'),
+ SCW_ORG=parser.get('default', 'org')
+ )
+
+ display.sensitive.add(env_vars['SCW_API_KEY'])
+
+ ansible_vars = dict(
+ scw_org=parser.get('default', 'org'),
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py
new file mode 100644
index 0000000..df1651f
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py
@@ -0,0 +1,138 @@
+"""VMware vCenter plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+import os
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class VcenterProvider(CloudProvider):
+ """VMware vcenter/esx plugin. Sets up cloud resources for tests."""
+ DOCKER_SIMULATOR_NAME = 'vcenter-simulator'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # The simulator must be pinned to a specific version to guarantee CI passes with the version used.
+ if os.environ.get('ANSIBLE_VCSIM_CONTAINER'):
+ self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER')
+ else:
+ self.image = 'quay.io/ansible/vcenter-test-container:1.7.0'
+
+ # VMware tests can be run on govcsim or BYO with a static config file.
+ # The simulator is the default if no config is provided.
+ self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim')
+
+ if self.vmware_test_platform == 'govcsim':
+ self.uses_docker = True
+ self.uses_config = False
+ elif self.vmware_test_platform == 'static':
+ self.uses_docker = False
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._set_cloud_config('vmware_test_platform', self.vmware_test_platform)
+
+ if self.vmware_test_platform == 'govcsim':
+ self._setup_dynamic_simulator()
+ self.managed = True
+ elif self.vmware_test_platform == 'static':
+ self._use_static_config()
+ self._setup_static()
+ else:
+ raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform)
+
+ def _setup_dynamic_simulator(self) -> None:
+ """Create a vcenter simulator using docker."""
+ ports = [
+ 443,
+ 8080,
+ 8989,
+ 5000, # control port for flask app in simulator
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ if not os.path.exists(self.config_static_path):
+ raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path)
+
+
+class VcenterEnvironment(CloudEnvironment):
+ """VMware vcenter/esx environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ try:
+ # We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM,
+ # We do a try/except instead
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path) # static
+
+ env_vars = {}
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+ ansible_vars.update(dict(parser.items('DEFAULT', raw=True)))
+ except KeyError: # govcsim
+ env_vars = dict(
+ VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')),
+ VCENTER_USERNAME='user',
+ VCENTER_PASSWORD='pass',
+ )
+
+ ansible_vars = dict(
+ vcsim=str(self._get_cloud_config('vcenter_hostname')),
+ vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')),
+ vcenter_username='user',
+ vcenter_password='pass',
+ )
+
+ for key, value in ansible_vars.items():
+ if key.endswith('_password'):
+ display.sensitive.add(value)
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ module_defaults={
+ 'group/vmware': {
+ 'hostname': ansible_vars['vcenter_hostname'],
+ 'username': ansible_vars['vcenter_username'],
+ 'password': ansible_vars['vcenter_password'],
+ 'port': ansible_vars.get('vcenter_port', '443'),
+ 'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'),
+ },
+ },
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py
new file mode 100644
index 0000000..1993cda
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py
@@ -0,0 +1,55 @@
+"""Vultr plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class VultrCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class VultrCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ VULTR_API_KEY=parser.get('default', 'key'),
+ )
+
+ display.sensitive.add(env_vars['VULTR_API_KEY'])
+
+ ansible_vars = dict(
+ vultr_resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/coverage.py b/test/lib/ansible_test/_internal/commands/integration/coverage.py
new file mode 100644
index 0000000..5a486e9
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/coverage.py
@@ -0,0 +1,417 @@
+"""Code coverage support for integration tests."""
+from __future__ import annotations
+
+import abc
+import os
+import shutil
+import tempfile
+import typing as t
+import zipfile
+
+from ...io import (
+ write_text_file,
+)
+
+from ...ansible_util import (
+ run_playbook,
+)
+
+from ...config import (
+ IntegrationConfig,
+)
+
+from ...util import (
+ COVERAGE_CONFIG_NAME,
+ MODE_DIRECTORY,
+ MODE_DIRECTORY_WRITE,
+ MODE_FILE,
+ SubprocessError,
+ cache,
+ display,
+ generate_name,
+ get_generic_type,
+ get_type_map,
+ remove_tree,
+ sanitize_host_name,
+ verified_chmod,
+)
+
+from ...util_common import (
+ ResultType,
+)
+
+from ...coverage_util import (
+ generate_coverage_config,
+ get_coverage_platform,
+)
+
+from ...host_configs import (
+ HostConfig,
+ PosixConfig,
+ WindowsConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ...host_profiles import (
+ ControllerProfile,
+ HostProfile,
+ PosixProfile,
+ SshTargetHostProfile,
+)
+
+from ...provisioning import (
+ HostState,
+)
+
+from ...connections import (
+ LocalConnection,
+)
+
+from ...inventory import (
+ create_windows_inventory,
+ create_posix_inventory,
+)
+
+THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
+
+
+class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta):
+ """Base class for configuring hosts for integration test code coverage."""
+ def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
+ self.args = args
+ self.host_state = host_state
+ self.inventory_path = inventory_path
+ self.profiles = self.get_profiles()
+
+ def get_profiles(self) -> list[HostProfile]:
+ """Return a list of profiles relevant for this handler."""
+ profile_type = get_generic_type(type(self), HostConfig)
+ profiles = [profile for profile in self.host_state.target_profiles if isinstance(profile.config, profile_type)]
+
+ return profiles
+
+ @property
+ @abc.abstractmethod
+ def is_active(self) -> bool:
+ """True if the handler should be used, otherwise False."""
+
+ @abc.abstractmethod
+ def setup(self) -> None:
+ """Perform setup for code coverage."""
+
+ @abc.abstractmethod
+ def teardown(self) -> None:
+ """Perform teardown for code coverage."""
+
+ @abc.abstractmethod
+ def create_inventory(self) -> None:
+ """Create inventory, if needed."""
+
+ @abc.abstractmethod
+ def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
+ """Return a dictionary of environment variables for running tests with code coverage."""
+
+ def run_playbook(self, playbook: str, variables: dict[str, str]) -> None:
+ """Run the specified playbook using the current inventory."""
+ self.create_inventory()
+ run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables)
+
+
+class PosixCoverageHandler(CoverageHandler[PosixConfig]):
+ """Configure integration test code coverage for POSIX hosts."""
+ def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
+ super().__init__(args, host_state, inventory_path)
+
+ # Common temporary directory used on all POSIX hosts that will be created world writeable.
+ self.common_temp_path = f'/tmp/ansible-test-{generate_name()}'
+
+ def get_profiles(self) -> list[HostProfile]:
+ """Return a list of profiles relevant for this handler."""
+ profiles = super().get_profiles()
+ profiles = [profile for profile in profiles if not isinstance(profile, ControllerProfile) or
+ profile.python.path != self.host_state.controller_profile.python.path]
+
+ return profiles
+
+ @property
+ def is_active(self) -> bool:
+ """True if the handler should be used, otherwise False."""
+ return True
+
+ @property
+ def target_profile(self) -> t.Optional[PosixProfile]:
+ """The POSIX target profile, if it uses a different Python interpreter than the controller, otherwise None."""
+ return t.cast(PosixProfile, self.profiles[0]) if self.profiles else None
+
+ def setup(self) -> None:
+ """Perform setup for code coverage."""
+ self.setup_controller()
+ self.setup_target()
+
+ def teardown(self) -> None:
+ """Perform teardown for code coverage."""
+ self.teardown_controller()
+ self.teardown_target()
+
+ def setup_controller(self) -> None:
+ """Perform setup for code coverage on the controller."""
+ coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME)
+ coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name)
+
+ coverage_config = generate_coverage_config(self.args)
+
+ write_text_file(coverage_config_path, coverage_config, create_directories=True)
+
+ verified_chmod(coverage_config_path, MODE_FILE)
+ os.mkdir(coverage_output_path)
+ verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE)
+
+ def setup_target(self) -> None:
+ """Perform setup for code coverage on the target."""
+ if not self.target_profile:
+ return
+
+ if isinstance(self.target_profile, ControllerProfile):
+ return
+
+ self.run_playbook('posix_coverage_setup.yml', self.get_playbook_variables())
+
+ def teardown_controller(self) -> None:
+ """Perform teardown for code coverage on the controller."""
+ coverage_temp_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name)
+ platform = get_coverage_platform(self.args.controller)
+
+ for filename in os.listdir(coverage_temp_path):
+ shutil.copyfile(os.path.join(coverage_temp_path, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform)))
+
+ remove_tree(self.common_temp_path)
+
+ def teardown_target(self) -> None:
+ """Perform teardown for code coverage on the target."""
+ if not self.target_profile:
+ return
+
+ if isinstance(self.target_profile, ControllerProfile):
+ return
+
+ profile = t.cast(SshTargetHostProfile, self.target_profile)
+ platform = get_coverage_platform(profile.config)
+ con = profile.get_controller_target_connections()[0]
+
+ with tempfile.NamedTemporaryFile(prefix='ansible-test-coverage-', suffix='.tgz') as coverage_tgz:
+ try:
+ con.create_archive(chdir=self.common_temp_path, name=ResultType.COVERAGE.name, dst=coverage_tgz)
+ except SubprocessError as ex:
+ display.warning(f'Failed to download coverage results: {ex}')
+ else:
+ coverage_tgz.seek(0)
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ local_con = LocalConnection(self.args)
+ local_con.extract_archive(chdir=temp_dir, src=coverage_tgz)
+
+ base_dir = os.path.join(temp_dir, ResultType.COVERAGE.name)
+
+ for filename in os.listdir(base_dir):
+ shutil.copyfile(os.path.join(base_dir, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform)))
+
+ self.run_playbook('posix_coverage_teardown.yml', self.get_playbook_variables())
+
+ def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
+ """Return a dictionary of environment variables for running tests with code coverage."""
+
+ # Enable code coverage collection on Ansible modules (both local and remote).
+ # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage.
+ config_file = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME)
+
+ # Include the command, target and platform marker so the remote host can create a filename with that info.
+ # The generated AnsiballZ wrapper is responsible for adding '=python-{X.Y}=coverage.{hostname}.{pid}.{id}'
+ coverage_file = os.path.join(self.common_temp_path, ResultType.COVERAGE.name, '='.join((self.args.command, target_name, 'platform')))
+
+ if self.args.coverage_check:
+ # cause the 'coverage' module to be found, but not imported or enabled
+ coverage_file = ''
+
+ variables = dict(
+ _ANSIBLE_COVERAGE_CONFIG=config_file,
+ _ANSIBLE_COVERAGE_OUTPUT=coverage_file,
+ )
+
+ return variables
+
+ def create_inventory(self) -> None:
+ """Create inventory."""
+ create_posix_inventory(self.args, self.inventory_path, self.host_state.target_profiles)
+
+ def get_playbook_variables(self) -> dict[str, str]:
+ """Return a dictionary of variables for setup and teardown of POSIX coverage."""
+ return dict(
+ common_temp_dir=self.common_temp_path,
+ coverage_config=generate_coverage_config(self.args),
+ coverage_config_path=os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME),
+ coverage_output_path=os.path.join(self.common_temp_path, ResultType.COVERAGE.name),
+ mode_directory=f'{MODE_DIRECTORY:04o}',
+ mode_directory_write=f'{MODE_DIRECTORY_WRITE:04o}',
+ mode_file=f'{MODE_FILE:04o}',
+ )
+
+
+class WindowsCoverageHandler(CoverageHandler[WindowsConfig]):
+ """Configure integration test code coverage for Windows hosts."""
+ def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
+ super().__init__(args, host_state, inventory_path)
+
+ # Common temporary directory used on all Windows hosts that will be created writable by everyone.
+ self.remote_temp_path = f'C:\\ansible_test_coverage_{generate_name()}'
+
+ @property
+ def is_active(self) -> bool:
+ """True if the handler should be used, otherwise False."""
+ return bool(self.profiles) and not self.args.coverage_check
+
+ def setup(self) -> None:
+ """Perform setup for code coverage."""
+ self.run_playbook('windows_coverage_setup.yml', self.get_playbook_variables())
+
+ def teardown(self) -> None:
+ """Perform teardown for code coverage."""
+ with tempfile.TemporaryDirectory() as local_temp_path:
+ variables = self.get_playbook_variables()
+ variables.update(
+ local_temp_path=local_temp_path,
+ )
+
+ self.run_playbook('windows_coverage_teardown.yml', variables)
+
+ for filename in os.listdir(local_temp_path):
+ if all(isinstance(profile.config, WindowsRemoteConfig) for profile in self.profiles):
+ prefix = 'remote'
+ elif all(isinstance(profile.config, WindowsInventoryConfig) for profile in self.profiles):
+ prefix = 'inventory'
+ else:
+ raise NotImplementedError()
+
+ platform = f'{prefix}-{sanitize_host_name(os.path.splitext(filename)[0])}'
+
+ with zipfile.ZipFile(os.path.join(local_temp_path, filename)) as coverage_zip:
+ for item in coverage_zip.infolist():
+ if item.is_dir():
+ raise Exception(f'Unexpected directory in zip file: {item.filename}')
+
+ item.filename = update_coverage_filename(item.filename, platform)
+
+ coverage_zip.extract(item, ResultType.COVERAGE.path)
+
+ def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
+ """Return a dictionary of environment variables for running tests with code coverage."""
+
+ # Include the command, target and platform marker so the remote host can create a filename with that info.
+ # The remote is responsible for adding '={language-version}=coverage.{hostname}.{pid}.{id}'
+ coverage_name = '='.join((self.args.command, target_name, 'platform'))
+
+ variables = dict(
+ _ANSIBLE_COVERAGE_REMOTE_OUTPUT=os.path.join(self.remote_temp_path, coverage_name),
+ _ANSIBLE_COVERAGE_REMOTE_PATH_FILTER=os.path.join(data_context().content.root, '*'),
+ )
+
+ return variables
+
+ def create_inventory(self) -> None:
+ """Create inventory."""
+ create_windows_inventory(self.args, self.inventory_path, self.host_state.target_profiles)
+
+ def get_playbook_variables(self) -> dict[str, str]:
+ """Return a dictionary of variables for setup and teardown of Windows coverage."""
+ return dict(
+ remote_temp_path=self.remote_temp_path,
+ )
+
+
+class CoverageManager:
+ """Manager for code coverage configuration and state."""
+ def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
+ self.args = args
+ self.host_state = host_state
+ self.inventory_path = inventory_path
+
+ if self.args.coverage:
+ handler_types = set(get_handler_type(type(profile.config)) for profile in host_state.profiles)
+ handler_types.discard(None)
+ else:
+ handler_types = set()
+
+ handlers = [handler_type(args=args, host_state=host_state, inventory_path=inventory_path) for handler_type in handler_types]
+
+ self.handlers = [handler for handler in handlers if handler.is_active]
+
+ def setup(self) -> None:
+ """Perform setup for code coverage."""
+ if not self.args.coverage:
+ return
+
+ for handler in self.handlers:
+ handler.setup()
+
+ def teardown(self) -> None:
+ """Perform teardown for code coverage."""
+ if not self.args.coverage:
+ return
+
+ for handler in self.handlers:
+ handler.teardown()
+
+ def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]:
+ """Return a dictionary of environment variables for running tests with code coverage."""
+ if not self.args.coverage or 'non_local/' in aliases:
+ return {}
+
+ env = {}
+
+ for handler in self.handlers:
+ env.update(handler.get_environment(target_name, aliases))
+
+ return env
+
+
+@cache
+def get_config_handler_type_map() -> dict[t.Type[HostConfig], t.Type[CoverageHandler]]:
+ """Create and return a mapping of HostConfig types to CoverageHandler types."""
+ return get_type_map(CoverageHandler, HostConfig)
+
+
+def get_handler_type(config_type: t.Type[HostConfig]) -> t.Optional[t.Type[CoverageHandler]]:
+ """Return the coverage handler type associated with the given host config type if found, otherwise return None."""
+ queue = [config_type]
+ type_map = get_config_handler_type_map()
+
+ while queue:
+ config_type = queue.pop(0)
+ handler_type = type_map.get(config_type)
+
+ if handler_type:
+ return handler_type
+
+ queue.extend(config_type.__bases__)
+
+ return None
+
+
+def update_coverage_filename(original_filename: str, platform: str) -> str:
+ """Validate the given filename and insert the specified platform, then return the result."""
+ parts = original_filename.split('=')
+
+ if original_filename != os.path.basename(original_filename) or len(parts) != 5 or parts[2] != 'platform':
+ raise Exception(f'Unexpected coverage filename: {original_filename}')
+
+ parts[2] = platform
+
+ updated_filename = '='.join(parts)
+
+ display.info(f'Coverage file for platform "{platform}": {original_filename} -> {updated_filename}', verbosity=3)
+
+ return updated_filename
diff --git a/test/lib/ansible_test/_internal/commands/integration/filters.py b/test/lib/ansible_test/_internal/commands/integration/filters.py
new file mode 100644
index 0000000..be03d7f
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/filters.py
@@ -0,0 +1,279 @@
+"""Logic for filtering out integration test targets which are unsupported for the currently provided arguments and available hosts."""
+from __future__ import annotations
+
+import abc
+import typing as t
+
+from ...config import (
+ IntegrationConfig,
+)
+
+from ...util import (
+ cache,
+ detect_architecture,
+ display,
+ get_type_map,
+)
+
+from ...target import (
+ IntegrationTarget,
+)
+
+from ...host_configs import (
+ ControllerConfig,
+ DockerConfig,
+ FallbackReason,
+ HostConfig,
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+ OriginConfig,
+ PosixConfig,
+ PosixRemoteConfig,
+ PosixSshConfig,
+ RemoteConfig,
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from ...host_profiles import (
+ HostProfile,
+)
+
+THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
+TPosixConfig = t.TypeVar('TPosixConfig', bound=PosixConfig)
+TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig)
+THostProfile = t.TypeVar('THostProfile', bound=HostProfile)
+
+
+class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
+ """Base class for target filters."""
+ def __init__(self, args: IntegrationConfig, configs: list[THostConfig], controller: bool) -> None:
+ self.args = args
+ self.configs = configs
+ self.controller = controller
+ self.host_type = 'controller' if controller else 'target'
+
+ # values which are not host specific
+ self.include_targets = args.include
+ self.allow_root = args.allow_root
+ self.allow_destructive = args.allow_destructive
+
+ @property
+ def config(self) -> THostConfig:
+ """The configuration to filter. Only valid when there is a single config."""
+ if len(self.configs) != 1:
+ raise Exception()
+
+ return self.configs[0]
+
+ def skip(
+ self,
+ skip: str,
+ reason: str,
+ targets: list[IntegrationTarget],
+ exclude: set[str],
+ override: t.Optional[list[str]] = None,
+ ) -> None:
+ """Apply the specified skip rule to the given targets by updating the provided exclude list."""
+ if skip.startswith('skip/'):
+ skipped = [target.name for target in targets if skip in target.skips and (not override or target.name not in override)]
+ else:
+ skipped = [target.name for target in targets if f'{skip}/' in target.aliases and (not override or target.name not in override)]
+
+ self.apply_skip(f'"{skip}"', reason, skipped, exclude)
+
+ def apply_skip(self, marked: str, reason: str, skipped: list[str], exclude: set[str]) -> None:
+ """Apply the provided skips to the given exclude list."""
+ if not skipped:
+ return
+
+ exclude.update(skipped)
+ display.warning(f'Excluding {self.host_type} tests marked {marked} {reason}: {", ".join(skipped)}')
+
+ def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
+ """Filter the list of profiles, returning only those which are not skipped for the given target."""
+ del target
+ return profiles
+
+ def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
+ """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
+ if self.controller and self.args.host_settings.controller_fallback and targets:
+ affected_targets = [target.name for target in targets]
+ reason = self.args.host_settings.controller_fallback.reason
+
+ if reason == FallbackReason.ENVIRONMENT:
+ exclude.update(affected_targets)
+ display.warning(f'Excluding {self.host_type} tests since a fallback controller is in use: {", ".join(affected_targets)}')
+ elif reason == FallbackReason.PYTHON:
+ display.warning(f'Some {self.host_type} tests may be redundant since a fallback python is in use: {", ".join(affected_targets)}')
+
+ if not self.allow_destructive and not self.config.is_managed:
+ override_destructive = set(target for target in self.include_targets if target.startswith('destructive/'))
+ override = [target.name for target in targets if override_destructive & set(target.aliases)]
+
+ self.skip('destructive', 'which require --allow-destructive or prefixing with "destructive/" to run on unmanaged hosts', targets, exclude, override)
+
+ if not self.args.allow_disabled:
+ override_disabled = set(target for target in self.args.include if target.startswith('disabled/'))
+ override = [target.name for target in targets if override_disabled & set(target.aliases)]
+
+ self.skip('disabled', 'which require --allow-disabled or prefixing with "disabled/"', targets, exclude, override)
+
+ if not self.args.allow_unsupported:
+ override_unsupported = set(target for target in self.args.include if target.startswith('unsupported/'))
+ override = [target.name for target in targets if override_unsupported & set(target.aliases)]
+
+ self.skip('unsupported', 'which require --allow-unsupported or prefixing with "unsupported/"', targets, exclude, override)
+
+ if not self.args.allow_unstable:
+ override_unstable = set(target for target in self.args.include if target.startswith('unstable/'))
+
+ if self.args.allow_unstable_changed:
+ override_unstable |= set(self.args.metadata.change_description.focused_targets or [])
+
+ override = [target.name for target in targets if override_unstable & set(target.aliases)]
+
+ self.skip('unstable', 'which require --allow-unstable or prefixing with "unstable/"', targets, exclude, override)
+
+
+class PosixTargetFilter(TargetFilter[TPosixConfig]):
+ """Target filter for POSIX hosts."""
+ def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
+ """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
+ super().filter_targets(targets, exclude)
+
+ if not self.allow_root and not self.config.have_root:
+ self.skip('needs/root', 'which require --allow-root or running as root', targets, exclude)
+
+ self.skip(f'skip/python{self.config.python.version}', f'which are not supported by Python {self.config.python.version}', targets, exclude)
+ self.skip(f'skip/python{self.config.python.major_version}', f'which are not supported by Python {self.config.python.major_version}', targets, exclude)
+
+
+class DockerTargetFilter(PosixTargetFilter[DockerConfig]):
+ """Target filter for docker hosts."""
+ def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
+ """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
+ super().filter_targets(targets, exclude)
+
+ self.skip('skip/docker', 'which cannot run under docker', targets, exclude)
+
+ if not self.config.privileged:
+ self.skip('needs/privileged', 'which require --docker-privileged to run under docker', targets, exclude)
+
+
+class PosixSshTargetFilter(PosixTargetFilter[PosixSshConfig]):
+ """Target filter for POSIX SSH hosts."""
+
+
+class RemoteTargetFilter(TargetFilter[TRemoteConfig]):
+ """Target filter for remote Ansible Core CI managed hosts."""
+ def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
+ """Filter the list of profiles, returning only those which are not skipped for the given target."""
+ profiles = super().filter_profiles(profiles, target)
+
+ skipped_profiles = [profile for profile in profiles if any(skip in target.skips for skip in get_remote_skip_aliases(profile.config))]
+
+ if skipped_profiles:
+ configs: list[TRemoteConfig] = [profile.config for profile in skipped_profiles]
+ display.warning(f'Excluding skipped hosts from inventory: {", ".join(config.name for config in configs)}')
+
+ profiles = [profile for profile in profiles if profile not in skipped_profiles]
+
+ return profiles
+
+ def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
+ """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
+ super().filter_targets(targets, exclude)
+
+ if len(self.configs) > 1:
+ host_skips = {host.name: get_remote_skip_aliases(host) for host in self.configs}
+
+ # Skip only targets which skip all hosts.
+ # Targets that skip only some hosts will be handled during inventory generation.
+ skipped = [target.name for target in targets if all(any(skip in target.skips for skip in skips) for skips in host_skips.values())]
+
+ if skipped:
+ exclude.update(skipped)
+ display.warning(f'Excluding tests which do not support {", ".join(host_skips.keys())}: {", ".join(skipped)}')
+ else:
+ skips = get_remote_skip_aliases(self.config)
+
+ for skip, reason in skips.items():
+ self.skip(skip, reason, targets, exclude)
+
+
+class PosixRemoteTargetFilter(PosixTargetFilter[PosixRemoteConfig], RemoteTargetFilter[PosixRemoteConfig]):
+ """Target filter for POSIX remote hosts."""
+
+
+class WindowsRemoteTargetFilter(RemoteTargetFilter[WindowsRemoteConfig]):
+ """Target filter for remote Windows hosts."""
+
+
+class WindowsInventoryTargetFilter(TargetFilter[WindowsInventoryConfig]):
+ """Target filter for Windows inventory."""
+
+
+class NetworkRemoteTargetFilter(RemoteTargetFilter[NetworkRemoteConfig]):
+ """Target filter for remote network hosts."""
+
+
+class NetworkInventoryTargetFilter(TargetFilter[NetworkInventoryConfig]):
+ """Target filter for network inventory."""
+
+
+class OriginTargetFilter(PosixTargetFilter[OriginConfig]):
+ """Target filter for localhost."""
+ def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
+ """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
+ super().filter_targets(targets, exclude)
+
+ arch = detect_architecture(self.config.python.path)
+
+ if arch:
+ self.skip(f'skip/{arch}', f'which are not supported by {arch}', targets, exclude)
+
+
+@cache
+def get_host_target_type_map() -> dict[t.Type[HostConfig], t.Type[TargetFilter]]:
+ """Create and return a mapping of HostConfig types to TargetFilter types."""
+ return get_type_map(TargetFilter, HostConfig)
+
+
+def get_target_filter(args: IntegrationConfig, configs: list[HostConfig], controller: bool) -> TargetFilter:
+ """Return an integration test target filter instance for the provided host configurations."""
+ target_type = type(configs[0])
+
+ if issubclass(target_type, ControllerConfig):
+ target_type = type(args.controller)
+ configs = [args.controller]
+
+ filter_type = get_host_target_type_map()[target_type]
+ filter_instance = filter_type(args, configs, controller)
+
+ return filter_instance
+
+
+def get_remote_skip_aliases(config: RemoteConfig) -> dict[str, str]:
+ """Return a dictionary of skip aliases and the reason why they apply."""
+ return get_platform_skip_aliases(config.platform, config.version, config.arch)
+
+
+def get_platform_skip_aliases(platform: str, version: str, arch: t.Optional[str]) -> dict[str, str]:
+ """Return a dictionary of skip aliases and the reason why they apply."""
+ skips = {
+ f'skip/{platform}': platform,
+ f'skip/{platform}/{version}': f'{platform} {version}',
+ f'skip/{platform}{version}': f'{platform} {version}', # legacy syntax, use above format
+ }
+
+ if arch:
+ skips.update({
+ f'skip/{arch}': arch,
+ f'skip/{arch}/{platform}': f'{platform} on {arch}',
+ f'skip/{arch}/{platform}/{version}': f'{platform} {version} on {arch}',
+ })
+
+ skips = {alias: f'which are not supported by {description}' for alias, description in skips.items()}
+
+ return skips
diff --git a/test/lib/ansible_test/_internal/commands/integration/network.py b/test/lib/ansible_test/_internal/commands/integration/network.py
new file mode 100644
index 0000000..d28416c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/network.py
@@ -0,0 +1,77 @@
+"""Network integration testing."""
+from __future__ import annotations
+
+import os
+
+from ...util import (
+ ApplicationError,
+ ANSIBLE_TEST_CONFIG_ROOT,
+)
+
+from ...util_common import (
+ handle_layout_messages,
+)
+
+from ...target import (
+ walk_network_integration_targets,
+)
+
+from ...config import (
+ NetworkIntegrationConfig,
+)
+
+from . import (
+ command_integration_filter,
+ command_integration_filtered,
+ get_inventory_absolute_path,
+ get_inventory_relative_path,
+ check_inventory,
+ delegate_inventory,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ...host_configs import (
+ NetworkInventoryConfig,
+ NetworkRemoteConfig,
+)
+
+
+def command_network_integration(args: NetworkIntegrationConfig) -> None:
+ """Entry point for the `network-integration` command."""
+ handle_layout_messages(data_context().content.integration_messages)
+
+ inventory_relative_path = get_inventory_relative_path(args)
+ template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template'
+
+ if issubclass(args.target_type, NetworkInventoryConfig):
+ target = args.only_target(NetworkInventoryConfig)
+ inventory_path = get_inventory_absolute_path(args, target)
+
+ if args.delegate or not target.path:
+ target.path = inventory_relative_path
+ else:
+ inventory_path = os.path.join(data_context().content.root, inventory_relative_path)
+
+ if args.no_temp_workdir:
+ # temporary solution to keep DCI tests working
+ inventory_exists = os.path.exists(inventory_path)
+ else:
+ inventory_exists = os.path.isfile(inventory_path)
+
+ if not args.explain and not issubclass(args.target_type, NetworkRemoteConfig) and not inventory_exists:
+ raise ApplicationError(
+ 'Inventory not found: %s\n'
+ 'Use --inventory to specify the inventory path.\n'
+ 'Use --platform to provision resources and generate an inventory file.\n'
+ 'See also inventory template: %s' % (inventory_path, template_path)
+ )
+
+ check_inventory(args, inventory_path)
+ delegate_inventory(args, inventory_path)
+
+ all_targets = tuple(walk_network_integration_targets(include_hidden=True))
+ host_state, internal_targets = command_integration_filter(args, all_targets)
+ command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path)
diff --git a/test/lib/ansible_test/_internal/commands/integration/posix.py b/test/lib/ansible_test/_internal/commands/integration/posix.py
new file mode 100644
index 0000000..d4c50d3
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/posix.py
@@ -0,0 +1,48 @@
+"""POSIX integration testing."""
+from __future__ import annotations
+
+import os
+
+from ...util_common import (
+ handle_layout_messages,
+)
+
+from ...containers import (
+ create_container_hooks,
+ local_ssh,
+ root_ssh,
+)
+
+from ...target import (
+ walk_posix_integration_targets,
+)
+
+from ...config import (
+ PosixIntegrationConfig,
+)
+
+from . import (
+ command_integration_filter,
+ command_integration_filtered,
+ get_inventory_relative_path,
+)
+
+from ...data import (
+ data_context,
+)
+
+
+def command_posix_integration(args: PosixIntegrationConfig) -> None:
+ """Entry point for the `integration` command."""
+ handle_layout_messages(data_context().content.integration_messages)
+
+ inventory_relative_path = get_inventory_relative_path(args)
+ inventory_path = os.path.join(data_context().content.root, inventory_relative_path)
+
+ all_targets = tuple(walk_posix_integration_targets(include_hidden=True))
+ host_state, internal_targets = command_integration_filter(args, all_targets)
+ control_connections = [local_ssh(args, host_state.controller_profile.python)]
+ managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()]
+ pre_target, post_target = create_container_hooks(args, control_connections, managed_connections)
+
+ command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target)
diff --git a/test/lib/ansible_test/_internal/commands/integration/windows.py b/test/lib/ansible_test/_internal/commands/integration/windows.py
new file mode 100644
index 0000000..aa201c4
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/windows.py
@@ -0,0 +1,81 @@
+"""Windows integration testing."""
+from __future__ import annotations
+
+import os
+
+from ...util import (
+ ApplicationError,
+ ANSIBLE_TEST_CONFIG_ROOT,
+)
+
+from ...util_common import (
+ handle_layout_messages,
+)
+
+from ...containers import (
+ create_container_hooks,
+ local_ssh,
+ root_ssh,
+)
+
+from ...target import (
+ walk_windows_integration_targets,
+)
+
+from ...config import (
+ WindowsIntegrationConfig,
+)
+
+from ...host_configs import (
+ WindowsInventoryConfig,
+ WindowsRemoteConfig,
+)
+
+from . import (
+ command_integration_filter,
+ command_integration_filtered,
+ get_inventory_absolute_path,
+ get_inventory_relative_path,
+ check_inventory,
+ delegate_inventory,
+)
+
+from ...data import (
+ data_context,
+)
+
+
+def command_windows_integration(args: WindowsIntegrationConfig) -> None:
+ """Entry point for the `windows-integration` command."""
+ handle_layout_messages(data_context().content.integration_messages)
+
+ inventory_relative_path = get_inventory_relative_path(args)
+ template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template'
+
+ if issubclass(args.target_type, WindowsInventoryConfig):
+ target = args.only_target(WindowsInventoryConfig)
+ inventory_path = get_inventory_absolute_path(args, target)
+
+ if args.delegate or not target.path:
+ target.path = inventory_relative_path
+ else:
+ inventory_path = os.path.join(data_context().content.root, inventory_relative_path)
+
+ if not args.explain and not issubclass(args.target_type, WindowsRemoteConfig) and not os.path.isfile(inventory_path):
+ raise ApplicationError(
+ 'Inventory not found: %s\n'
+ 'Use --inventory to specify the inventory path.\n'
+ 'Use --windows to provision resources and generate an inventory file.\n'
+ 'See also inventory template: %s' % (inventory_path, template_path)
+ )
+
+ check_inventory(args, inventory_path)
+ delegate_inventory(args, inventory_path)
+
+ all_targets = tuple(walk_windows_integration_targets(include_hidden=True))
+ host_state, internal_targets = command_integration_filter(args, all_targets)
+ control_connections = [local_ssh(args, host_state.controller_profile.python)]
+ managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()]
+ pre_target, post_target = create_container_hooks(args, control_connections, managed_connections)
+
+ command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target)