diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/integration/__init__.py')
-rw-r--r-- | test/lib/ansible_test/_internal/commands/integration/__init__.py | 967 |
1 files changed, 967 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/integration/__init__.py b/test/lib/ansible_test/_internal/commands/integration/__init__.py new file mode 100644 index 0000000..8864d2e --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/__init__.py @@ -0,0 +1,967 @@ +"""Ansible integration test infrastructure.""" +from __future__ import annotations + +import collections.abc as c +import contextlib +import datetime +import json +import os +import re +import shutil +import tempfile +import time +import typing as t + +from ...encoding import ( + to_bytes, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...executor import ( + get_changes_filter, + AllTargetsSkipped, + Delegate, + ListTargets, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...ci import ( + get_ci_provider, +) + +from ...target import ( + analyze_integration_target_dependencies, + walk_integration_targets, + IntegrationTarget, + walk_internal_targets, + TIntegrationTarget, + IntegrationTargetType, +) + +from ...config import ( + IntegrationConfig, + NetworkIntegrationConfig, + PosixIntegrationConfig, + WindowsIntegrationConfig, + TIntegrationConfig, +) + +from ...io import ( + make_dirs, + read_text_file, +) + +from ...util import ( + ApplicationError, + display, + SubprocessError, + remove_tree, +) + +from ...util_common import ( + named_temporary_file, + ResultType, + run_command, + write_json_test_results, + check_pyyaml, +) + +from ...coverage_util import ( + cover_python, +) + +from ...cache import ( + CommonCache, +) + +from .cloud import ( + CloudEnvironmentConfig, + cloud_filter, + cloud_init, + get_cloud_environment, + get_cloud_platforms, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + InventoryConfig, + OriginConfig, +) + +from ...host_profiles import ( + ControllerProfile, + ControllerHostProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...inventory import ( + create_controller_inventory, + create_windows_inventory, + create_network_inventory, + create_posix_inventory, +) + +from .filters import ( + get_target_filter, +) + +from .coverage import ( + CoverageManager, +) + +THostProfile = t.TypeVar('THostProfile', bound=HostProfile) + + +def generate_dependency_map(integration_targets: list[IntegrationTarget]) -> dict[str, set[IntegrationTarget]]: + """Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend.""" + targets_dict = dict((target.name, target) for target in integration_targets) + target_dependencies = analyze_integration_target_dependencies(integration_targets) + dependency_map: dict[str, set[IntegrationTarget]] = {} + + invalid_targets = set() + + for dependency, dependents in target_dependencies.items(): + dependency_target = targets_dict.get(dependency) + + if not dependency_target: + invalid_targets.add(dependency) + continue + + for dependent in dependents: + if dependent not in dependency_map: + dependency_map[dependent] = set() + + dependency_map[dependent].add(dependency_target) + + if invalid_targets: + raise ApplicationError('Non-existent target dependencies: %s' % ', '.join(sorted(invalid_targets))) + + return dependency_map + + +def get_files_needed(target_dependencies: list[IntegrationTarget]) -> list[str]: + """Return a list of files needed by the given list of target dependencies.""" + files_needed: list[str] = [] + + for target_dependency in target_dependencies: + files_needed += target_dependency.needs_file + + files_needed = sorted(set(files_needed)) + + invalid_paths = [path for path in files_needed if not os.path.isfile(path)] + + if invalid_paths: + raise ApplicationError('Invalid "needs/file/*" aliases:\n%s' % '\n'.join(invalid_paths)) + + return files_needed + + +def check_inventory(args: IntegrationConfig, inventory_path: str) -> None: + """Check the given inventory for issues.""" + if not isinstance(args.controller, OriginConfig): + if os.path.exists(inventory_path): + inventory = read_text_file(inventory_path) + + if 'ansible_ssh_private_key_file' in inventory: + display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.') + + +def get_inventory_absolute_path(args: IntegrationConfig, target: InventoryConfig) -> str: + """Return the absolute inventory path used for the given integration configuration or target inventory config (if provided).""" + path = target.path or os.path.basename(get_inventory_relative_path(args)) + + if args.host_path: + path = os.path.join(data_context().content.root, path) # post-delegation, path is relative to the content root + else: + path = os.path.join(data_context().content.root, data_context().content.integration_path, path) + + return path + + +def get_inventory_relative_path(args: IntegrationConfig) -> str: + """Return the inventory path used for the given integration configuration relative to the content root.""" + inventory_names: dict[t.Type[IntegrationConfig], str] = { + PosixIntegrationConfig: 'inventory', + WindowsIntegrationConfig: 'inventory.winrm', + NetworkIntegrationConfig: 'inventory.networking', + } + + return os.path.join(data_context().content.integration_path, inventory_names[type(args)]) + + +def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None: + """Make the given inventory available during delegation.""" + if isinstance(args, PosixIntegrationConfig): + return + + def inventory_callback(files: list[tuple[str, str]]) -> None: + """ + Add the inventory file to the payload file list. + This will preserve the file during delegation even if it is ignored or is outside the content and install roots. + """ + inventory_path = get_inventory_relative_path(args) + inventory_tuple = inventory_path_src, inventory_path + + if os.path.isfile(inventory_path_src) and inventory_tuple not in files: + originals = [item for item in files if item[1] == inventory_path] + + if originals: + for original in originals: + files.remove(original) + + display.warning('Overriding inventory file "%s" with "%s".' % (inventory_path, inventory_path_src)) + else: + display.notice('Sourcing inventory file "%s" from "%s".' % (inventory_path, inventory_path_src)) + + files.append(inventory_tuple) + + data_context().register_payload_callback(inventory_callback) + + +@contextlib.contextmanager +def integration_test_environment( + args: IntegrationConfig, + target: IntegrationTarget, + inventory_path_src: str, +) -> c.Iterator[IntegrationEnvironment]: + """Context manager that prepares the integration test environment and cleans it up.""" + ansible_config_src = args.get_ansible_config() + ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command) + + if args.no_temp_workdir or 'no/temp_workdir/' in target.aliases: + display.warning('Disabling the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + + integration_dir = os.path.join(data_context().content.root, data_context().content.integration_path) + targets_dir = os.path.join(data_context().content.root, data_context().content.integration_targets_path) + inventory_path = inventory_path_src + ansible_config = ansible_config_src + vars_file = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + yield IntegrationEnvironment(data_context().content.root, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + return + + # When testing a collection, the temporary directory must reside within the collection. + # This is necessary to enable support for the default collection for non-collection content (playbooks and roles). + root_temp_dir = os.path.join(ResultType.TMP.path, 'integration') + + prefix = '%s-' % target.name + suffix = '-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8' + + if args.no_temp_unicode or 'no/temp_unicode/' in target.aliases: + display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + suffix = '-ansible' + + if args.explain: + temp_dir = os.path.join(root_temp_dir, '%stemp%s' % (prefix, suffix)) + else: + make_dirs(root_temp_dir) + temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir) + + try: + display.info('Preparing temporary directory: %s' % temp_dir, verbosity=2) + + inventory_relative_path = get_inventory_relative_path(args) + inventory_path = os.path.join(temp_dir, inventory_relative_path) + + cache = IntegrationCache(args) + + target_dependencies = sorted([target] + list(cache.dependency_map.get(target.name, set()))) + + files_needed = get_files_needed(target_dependencies) + + integration_dir = os.path.join(temp_dir, data_context().content.integration_path) + targets_dir = os.path.join(temp_dir, data_context().content.integration_targets_path) + ansible_config = os.path.join(temp_dir, ansible_config_relative) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + vars_file = os.path.join(temp_dir, data_context().content.integration_vars_path) + + file_copies = [ + (ansible_config_src, ansible_config), + (inventory_path_src, inventory_path), + ] + + if os.path.exists(vars_file_src): + file_copies.append((vars_file_src, vars_file)) + + file_copies += [(path, os.path.join(temp_dir, path)) for path in files_needed] + + integration_targets_relative_path = data_context().content.integration_targets_path + + directory_copies = [ + ( + os.path.join(integration_targets_relative_path, target.relative_path), + os.path.join(temp_dir, integration_targets_relative_path, target.relative_path) + ) + for target in target_dependencies + ] + + directory_copies = sorted(set(directory_copies)) + file_copies = sorted(set(file_copies)) + + if not args.explain: + make_dirs(integration_dir) + + for dir_src, dir_dst in directory_copies: + display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2) + + if not args.explain: + shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True) # type: ignore[arg-type] # incorrect type stub omits bytes path support + + for file_src, file_dst in file_copies: + display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2) + + if not args.explain: + make_dirs(os.path.dirname(file_dst)) + shutil.copy2(file_src, file_dst) + + yield IntegrationEnvironment(temp_dir, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + finally: + if not args.explain: + remove_tree(temp_dir) + + +@contextlib.contextmanager +def integration_test_config_file( + args: IntegrationConfig, + env_config: CloudEnvironmentConfig, + integration_dir: str, +) -> c.Iterator[t.Optional[str]]: + """Context manager that provides a config file for integration tests, if needed.""" + if not env_config: + yield None + return + + config_vars = (env_config.ansible_vars or {}).copy() + + config_vars.update(dict( + ansible_test=dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + ) + )) + + config_file = json.dumps(config_vars, indent=4, sort_keys=True) + + with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path: # type: str + filename = os.path.relpath(path, integration_dir) + + display.info('>>> Config File: %s\n%s' % (filename, config_file), verbosity=3) + + yield path + + +def create_inventory( + args: IntegrationConfig, + host_state: HostState, + inventory_path: str, + target: IntegrationTarget, +) -> None: + """Create inventory.""" + if isinstance(args, PosixIntegrationConfig): + if target.target_type == IntegrationTargetType.CONTROLLER: + display.info('Configuring controller inventory.', verbosity=1) + create_controller_inventory(args, inventory_path, host_state.controller_profile) + elif target.target_type == IntegrationTargetType.TARGET: + display.info('Configuring target inventory.', verbosity=1) + create_posix_inventory(args, inventory_path, host_state.target_profiles, 'needs/ssh/' in target.aliases) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + elif isinstance(args, WindowsIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_windows_inventory(args, inventory_path, target_profiles) + elif isinstance(args, NetworkIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_network_inventory(args, inventory_path, target_profiles) + + +def command_integration_filtered( + args: IntegrationConfig, + host_state: HostState, + targets: tuple[IntegrationTarget, ...], + all_targets: tuple[IntegrationTarget, ...], + inventory_path: str, + pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, + post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, +): + """Run integration tests for the specified targets.""" + found = False + passed = [] + failed = [] + + targets_iter = iter(targets) + all_targets_dict = dict((target.name, target) for target in all_targets) + + setup_errors = [] + setup_targets_executed: set[str] = set() + + for target in all_targets: + for setup_target in target.setup_once + target.setup_always: + if setup_target not in all_targets_dict: + setup_errors.append('Target "%s" contains invalid setup target: %s' % (target.name, setup_target)) + + if setup_errors: + raise ApplicationError('Found %d invalid setup aliases:\n%s' % (len(setup_errors), '\n'.join(setup_errors))) + + check_pyyaml(host_state.controller_profile.python) + + test_dir = os.path.join(ResultType.TMP.path, 'output_dir') + + if not args.explain and any('needs/ssh/' in target.aliases for target in targets): + max_tries = 20 + display.info('SSH connection to controller required by tests. Checking the connection.') + for i in range(1, max_tries + 1): + try: + run_command(args, ['ssh', '-o', 'BatchMode=yes', 'localhost', 'id'], capture=True) + display.info('SSH service responded.') + break + except SubprocessError: + if i == max_tries: + raise + seconds = 3 + display.warning('SSH service not responding. Waiting %d second(s) before checking again.' % seconds) + time.sleep(seconds) + + start_at_task = args.start_at_task + + results = {} + + target_profile = host_state.target_profiles[0] + + if isinstance(target_profile, PosixProfile): + target_python = target_profile.python + + if isinstance(target_profile, ControllerProfile): + if host_state.controller_profile.python.path != target_profile.python.path: + install_requirements(args, target_python, command=True, controller=False) # integration + elif isinstance(target_profile, SshTargetHostProfile): + connection = target_profile.get_controller_target_connections()[0] + install_requirements(args, target_python, command=True, controller=False, connection=connection) # integration + + coverage_manager = CoverageManager(args, host_state, inventory_path) + coverage_manager.setup() + + try: + for target in targets_iter: + if args.start_at and not found: + found = target.name == args.start_at + + if not found: + continue + + create_inventory(args, host_state, inventory_path, target) + + tries = 2 if args.retry_on_error else 1 + verbosity = args.verbosity + + cloud_environment = get_cloud_environment(args, target) + + try: + while tries: + tries -= 1 + + try: + if cloud_environment: + cloud_environment.setup_once() + + run_setup_targets(args, host_state, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, False) + + start_time = time.time() + + if pre_target: + pre_target(target) + + run_setup_targets(args, host_state, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, True) + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + try: + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, start_at_task, test_dir, inventory_path, coverage_manager) + start_at_task = None + finally: + if post_target: + post_target(target) + + end_time = time.time() + + results[target.name] = dict( + name=target.name, + type=target.type, + aliases=target.aliases, + modules=target.modules, + run_time_seconds=int(end_time - start_time), + setup_once=target.setup_once, + setup_always=target.setup_always, + ) + + break + except SubprocessError: + if cloud_environment: + cloud_environment.on_failure(target, tries) + + if not tries: + raise + + if target.retry_never: + display.warning(f'Skipping retry of test target "{target.name}" since it has been excluded from retries.') + raise + + display.warning('Retrying test target "%s" with maximum verbosity.' % target.name) + display.verbosity = args.verbosity = 6 + + passed.append(target) + except Exception as ex: + failed.append(target) + + if args.continue_on_error: + display.error(str(ex)) + continue + + display.notice('To resume at this test target, use the option: --start-at %s' % target.name) + + next_target = next(targets_iter, None) + + if next_target: + display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name) + + raise + finally: + display.verbosity = args.verbosity = verbosity + + finally: + if not args.explain: + coverage_manager.teardown() + + result_name = '%s-%s.json' % ( + args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) + + data = dict( + targets=results, + ) + + write_json_test_results(ResultType.DATA, result_name, data) + + if failed: + raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % ( + len(failed), len(passed) + len(failed), '\n'.join(target.name for target in failed))) + + +def command_integration_script( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test script.""" + display.info('Running %s integration test script' % target.name) + + env_config = None + + if isinstance(args, PosixIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + cmd = ['./%s' % os.path.basename(target.script_path)] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = os.path.join(test_env.targets_dir, target.relative_path) + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path: # type: t.Optional[str] + if config_path: + cmd += ['-e', '@%s' % config_path] + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def command_integration_role( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + start_at_task: t.Optional[str], + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test role.""" + display.info('Running %s integration test role' % target.name) + + env_config = None + + vars_files = [] + variables = dict( + output_dir=test_dir, + ) + + if isinstance(args, WindowsIntegrationConfig): + hosts = 'windows' + gather_facts = False + variables.update(dict( + win_output_dir=r'C:\ansible_testing', + )) + elif isinstance(args, NetworkIntegrationConfig): + hosts = target.network_platform + gather_facts = False + else: + hosts = 'testhost' + gather_facts = True + + if 'gather_facts/yes/' in target.aliases: + gather_facts = True + elif 'gather_facts/no/' in target.aliases: + gather_facts = False + + if not isinstance(args, NetworkIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + if os.path.exists(test_env.vars_file): + vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir)) + + play = dict( + hosts=hosts, + gather_facts=gather_facts, + vars_files=vars_files, + vars=variables, + roles=[ + target.name, + ], + ) + + if env_config: + if env_config.ansible_vars: + variables.update(env_config.ansible_vars) + + play.update(dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + )) + + playbook = json.dumps([play], indent=4, sort_keys=True) + + with named_temporary_file(args=args, directory=test_env.integration_dir, prefix='%s-' % target.name, suffix='.yml', content=playbook) as playbook_path: + filename = os.path.basename(playbook_path) + + display.info('>>> Playbook: %s\n%s' % (filename, playbook.strip()), verbosity=3) + + cmd = ['ansible-playbook', filename, '-i', os.path.relpath(test_env.inventory_path, test_env.integration_dir)] + + if start_at_task: + cmd += ['--start-at-task', start_at_task] + + if args.tags: + cmd += ['--tags', args.tags] + + if args.skip_tags: + cmd += ['--skip-tags', args.skip_tags] + + if args.diff: + cmd += ['--diff'] + + if isinstance(args, NetworkIntegrationConfig): + if args.testcase: + cmd += ['-e', 'testcase=%s' % args.testcase] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = test_env.integration_dir + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def run_setup_targets( + args: IntegrationConfig, + host_state: HostState, + test_dir: str, + target_names: c.Sequence[str], + targets_dict: dict[str, IntegrationTarget], + targets_executed: set[str], + inventory_path: str, + coverage_manager: CoverageManager, + always: bool, +): + """Run setup targets.""" + for target_name in target_names: + if not always and target_name in targets_executed: + continue + + target = targets_dict[target_name] + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, None, test_dir, inventory_path, coverage_manager) + + targets_executed.add(target_name) + + +def integration_environment( + args: IntegrationConfig, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + ansible_config: t.Optional[str], + env_config: t.Optional[CloudEnvironmentConfig], + test_env: IntegrationEnvironment, +) -> dict[str, str]: + """Return a dictionary of environment variables to use when running the given integration test target.""" + env = ansible_environment(args, ansible_config=ansible_config) + + callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else []) + + integration = dict( + JUNIT_OUTPUT_DIR=ResultType.JUNIT.path, + JUNIT_TASK_RELATIVE_PATH=test_env.test_dir, + JUNIT_REPLACE_OUT_OF_TREE_PATH='out-of-tree:', + ANSIBLE_CALLBACKS_ENABLED=','.join(sorted(set(callback_plugins))), + ANSIBLE_TEST_CI=args.metadata.ci_provider or get_ci_provider().code, + ANSIBLE_TEST_COVERAGE='check' if args.coverage_check else ('yes' if args.coverage else ''), + OUTPUT_DIR=test_dir, + INVENTORY_PATH=os.path.abspath(inventory_path), + ) + + if args.debug_strategy: + env.update(dict(ANSIBLE_STRATEGY='debug')) + + if 'non_local/' in target.aliases: + if args.coverage: + display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name) + + env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER='')) + + env.update(integration) + + return env + + +class IntegrationEnvironment: + """Details about the integration environment.""" + def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None: + self.test_dir = test_dir + self.integration_dir = integration_dir + self.targets_dir = targets_dir + self.inventory_path = inventory_path + self.ansible_config = ansible_config + self.vars_file = vars_file + + +class IntegrationCache(CommonCache): + """Integration cache.""" + @property + def integration_targets(self) -> list[IntegrationTarget]: + """The list of integration test targets.""" + return self.get('integration_targets', lambda: list(walk_integration_targets())) + + @property + def dependency_map(self) -> dict[str, set[IntegrationTarget]]: + """The dependency map of integration test targets.""" + return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets)) + + +def filter_profiles_for_target(args: IntegrationConfig, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Return a list of profiles after applying target filters.""" + if target.target_type == IntegrationTargetType.CONTROLLER: + profile_filter = get_target_filter(args, [args.controller], True) + elif target.target_type == IntegrationTargetType.TARGET: + profile_filter = get_target_filter(args, args.targets, False) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + + profiles = profile_filter.filter_profiles(profiles, target) + + return profiles + + +def get_integration_filter(args: IntegrationConfig, targets: list[IntegrationTarget]) -> set[str]: + """Return a list of test targets to skip based on the host(s) that will be used to run the specified test targets.""" + invalid_targets = sorted(target.name for target in targets if target.target_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets and not args.list_targets: + message = f'''Unable to determine context for the following test targets: {", ".join(invalid_targets)} + +Make sure the test targets are correctly named: + + - Modules - The target name should match the module name. + - Plugins - The target name should be "{{plugin_type}}_{{plugin_name}}". + +If necessary, context can be controlled by adding entries to the "aliases" file for a test target: + + - Add the name(s) of modules which are tested. + - Add "context/target" for module and module_utils tests (these will run on the target host). + - Add "context/controller" for other test types (these will run on the controller).''' + + raise ApplicationError(message) + + invalid_targets = sorted(target.name for target in targets if target.actual_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets: + if data_context().content.is_ansible: + display.warning(f'Unable to determine context for the following test targets: {", ".join(invalid_targets)}') + else: + display.warning(f'Unable to determine context for the following test targets, they will be run on the target host: {", ".join(invalid_targets)}') + + exclude: set[str] = set() + + controller_targets = [target for target in targets if target.target_type == IntegrationTargetType.CONTROLLER] + target_targets = [target for target in targets if target.target_type == IntegrationTargetType.TARGET] + + controller_filter = get_target_filter(args, [args.controller], True) + target_filter = get_target_filter(args, args.targets, False) + + controller_filter.filter_targets(controller_targets, exclude) + target_filter.filter_targets(target_targets, exclude) + + return exclude + + +def command_integration_filter(args: TIntegrationConfig, + targets: c.Iterable[TIntegrationTarget], + ) -> tuple[HostState, tuple[TIntegrationTarget, ...]]: + """Filter the given integration test targets.""" + targets = tuple(target for target in targets if 'hidden/' not in target.aliases) + changes = get_changes_filter(args) + + # special behavior when the --changed-all-target target is selected based on changes + if args.changed_all_target in changes: + # act as though the --changed-all-target target was in the include list + if args.changed_all_mode == 'include' and args.changed_all_target not in args.include: + args.include.append(args.changed_all_target) + args.delegate_args += ['--include', args.changed_all_target] + # act as though the --changed-all-target target was in the exclude list + elif args.changed_all_mode == 'exclude' and args.changed_all_target not in args.exclude: + args.exclude.append(args.changed_all_target) + + require = args.require + changes + exclude = args.exclude + + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + environment_exclude = get_integration_filter(args, list(internal_targets)) + + environment_exclude |= set(cloud_filter(args, internal_targets)) + + if environment_exclude: + exclude = sorted(set(exclude) | environment_exclude) + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + + if not internal_targets: + raise AllTargetsSkipped() + + if args.start_at and not any(target.name == args.start_at for target in internal_targets): + raise ApplicationError('Start at target matches nothing: %s' % args.start_at) + + cloud_init(args, internal_targets) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + if os.path.exists(vars_file_src): + def integration_config_callback(files: list[tuple[str, str]]) -> None: + """ + Add the integration config vars file to the payload file list. + This will preserve the file during delegation even if the file is ignored by source control. + """ + files.append((vars_file_src, data_context().content.integration_vars_path)) + + data_context().register_payload_callback(integration_config_callback) + + if args.list_targets: + raise ListTargets([target.name for target in internal_targets]) + + # requirements are installed using a callback since the windows-integration and network-integration host status checks depend on them + host_state = prepare_profiles(args, targets_use_pypi=True, requirements=requirements) # integration, windows-integration, network-integration + + if args.delegate: + raise Delegate(host_state=host_state, require=require, exclude=exclude) + + return host_state, internal_targets + + +def requirements(host_profile: HostProfile) -> None: + """Install requirements after bootstrapping and delegation.""" + if isinstance(host_profile, ControllerHostProfile) and host_profile.controller: + configure_pypi_proxy(host_profile.args, host_profile) # integration, windows-integration, network-integration + install_requirements(host_profile.args, host_profile.python, ansible=True, command=True) # integration, windows-integration, network-integration + elif isinstance(host_profile, PosixProfile) and not isinstance(host_profile, ControllerProfile): + configure_pypi_proxy(host_profile.args, host_profile) # integration |