diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/lib/ansible_test/_internal/commands/integration | |
parent | Initial commit. (diff) | |
download | ansible-core-8a754e0858d922e955e71b253c139e071ecec432.tar.xz ansible-core-8a754e0858d922e955e71b253c139e071ecec432.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
24 files changed, 3960 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/integration/__init__.py b/test/lib/ansible_test/_internal/commands/integration/__init__.py new file mode 100644 index 0000000..8864d2e --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/__init__.py @@ -0,0 +1,967 @@ +"""Ansible integration test infrastructure.""" +from __future__ import annotations + +import collections.abc as c +import contextlib +import datetime +import json +import os +import re +import shutil +import tempfile +import time +import typing as t + +from ...encoding import ( + to_bytes, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...executor import ( + get_changes_filter, + AllTargetsSkipped, + Delegate, + ListTargets, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...ci import ( + get_ci_provider, +) + +from ...target import ( + analyze_integration_target_dependencies, + walk_integration_targets, + IntegrationTarget, + walk_internal_targets, + TIntegrationTarget, + IntegrationTargetType, +) + +from ...config import ( + IntegrationConfig, + NetworkIntegrationConfig, + PosixIntegrationConfig, + WindowsIntegrationConfig, + TIntegrationConfig, +) + +from ...io import ( + make_dirs, + read_text_file, +) + +from ...util import ( + ApplicationError, + display, + SubprocessError, + remove_tree, +) + +from ...util_common import ( + named_temporary_file, + ResultType, + run_command, + write_json_test_results, + check_pyyaml, +) + +from ...coverage_util import ( + cover_python, +) + +from ...cache import ( + CommonCache, +) + +from .cloud import ( + CloudEnvironmentConfig, + cloud_filter, + cloud_init, + get_cloud_environment, + get_cloud_platforms, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + InventoryConfig, + OriginConfig, +) + +from ...host_profiles import ( + ControllerProfile, + ControllerHostProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...inventory import ( + create_controller_inventory, + create_windows_inventory, + create_network_inventory, + create_posix_inventory, +) + +from .filters import ( + get_target_filter, +) + +from .coverage import ( + CoverageManager, +) + +THostProfile = t.TypeVar('THostProfile', bound=HostProfile) + + +def generate_dependency_map(integration_targets: list[IntegrationTarget]) -> dict[str, set[IntegrationTarget]]: + """Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend.""" + targets_dict = dict((target.name, target) for target in integration_targets) + target_dependencies = analyze_integration_target_dependencies(integration_targets) + dependency_map: dict[str, set[IntegrationTarget]] = {} + + invalid_targets = set() + + for dependency, dependents in target_dependencies.items(): + dependency_target = targets_dict.get(dependency) + + if not dependency_target: + invalid_targets.add(dependency) + continue + + for dependent in dependents: + if dependent not in dependency_map: + dependency_map[dependent] = set() + + dependency_map[dependent].add(dependency_target) + + if invalid_targets: + raise ApplicationError('Non-existent target dependencies: %s' % ', '.join(sorted(invalid_targets))) + + return dependency_map + + +def get_files_needed(target_dependencies: list[IntegrationTarget]) -> list[str]: + """Return a list of files needed by the given list of target dependencies.""" + files_needed: list[str] = [] + + for target_dependency in target_dependencies: + files_needed += target_dependency.needs_file + + files_needed = sorted(set(files_needed)) + + invalid_paths = [path for path in files_needed if not os.path.isfile(path)] + + if invalid_paths: + raise ApplicationError('Invalid "needs/file/*" aliases:\n%s' % '\n'.join(invalid_paths)) + + return files_needed + + +def check_inventory(args: IntegrationConfig, inventory_path: str) -> None: + """Check the given inventory for issues.""" + if not isinstance(args.controller, OriginConfig): + if os.path.exists(inventory_path): + inventory = read_text_file(inventory_path) + + if 'ansible_ssh_private_key_file' in inventory: + display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.') + + +def get_inventory_absolute_path(args: IntegrationConfig, target: InventoryConfig) -> str: + """Return the absolute inventory path used for the given integration configuration or target inventory config (if provided).""" + path = target.path or os.path.basename(get_inventory_relative_path(args)) + + if args.host_path: + path = os.path.join(data_context().content.root, path) # post-delegation, path is relative to the content root + else: + path = os.path.join(data_context().content.root, data_context().content.integration_path, path) + + return path + + +def get_inventory_relative_path(args: IntegrationConfig) -> str: + """Return the inventory path used for the given integration configuration relative to the content root.""" + inventory_names: dict[t.Type[IntegrationConfig], str] = { + PosixIntegrationConfig: 'inventory', + WindowsIntegrationConfig: 'inventory.winrm', + NetworkIntegrationConfig: 'inventory.networking', + } + + return os.path.join(data_context().content.integration_path, inventory_names[type(args)]) + + +def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None: + """Make the given inventory available during delegation.""" + if isinstance(args, PosixIntegrationConfig): + return + + def inventory_callback(files: list[tuple[str, str]]) -> None: + """ + Add the inventory file to the payload file list. + This will preserve the file during delegation even if it is ignored or is outside the content and install roots. + """ + inventory_path = get_inventory_relative_path(args) + inventory_tuple = inventory_path_src, inventory_path + + if os.path.isfile(inventory_path_src) and inventory_tuple not in files: + originals = [item for item in files if item[1] == inventory_path] + + if originals: + for original in originals: + files.remove(original) + + display.warning('Overriding inventory file "%s" with "%s".' % (inventory_path, inventory_path_src)) + else: + display.notice('Sourcing inventory file "%s" from "%s".' % (inventory_path, inventory_path_src)) + + files.append(inventory_tuple) + + data_context().register_payload_callback(inventory_callback) + + +@contextlib.contextmanager +def integration_test_environment( + args: IntegrationConfig, + target: IntegrationTarget, + inventory_path_src: str, +) -> c.Iterator[IntegrationEnvironment]: + """Context manager that prepares the integration test environment and cleans it up.""" + ansible_config_src = args.get_ansible_config() + ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command) + + if args.no_temp_workdir or 'no/temp_workdir/' in target.aliases: + display.warning('Disabling the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + + integration_dir = os.path.join(data_context().content.root, data_context().content.integration_path) + targets_dir = os.path.join(data_context().content.root, data_context().content.integration_targets_path) + inventory_path = inventory_path_src + ansible_config = ansible_config_src + vars_file = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + yield IntegrationEnvironment(data_context().content.root, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + return + + # When testing a collection, the temporary directory must reside within the collection. + # This is necessary to enable support for the default collection for non-collection content (playbooks and roles). + root_temp_dir = os.path.join(ResultType.TMP.path, 'integration') + + prefix = '%s-' % target.name + suffix = '-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8' + + if args.no_temp_unicode or 'no/temp_unicode/' in target.aliases: + display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + suffix = '-ansible' + + if args.explain: + temp_dir = os.path.join(root_temp_dir, '%stemp%s' % (prefix, suffix)) + else: + make_dirs(root_temp_dir) + temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir) + + try: + display.info('Preparing temporary directory: %s' % temp_dir, verbosity=2) + + inventory_relative_path = get_inventory_relative_path(args) + inventory_path = os.path.join(temp_dir, inventory_relative_path) + + cache = IntegrationCache(args) + + target_dependencies = sorted([target] + list(cache.dependency_map.get(target.name, set()))) + + files_needed = get_files_needed(target_dependencies) + + integration_dir = os.path.join(temp_dir, data_context().content.integration_path) + targets_dir = os.path.join(temp_dir, data_context().content.integration_targets_path) + ansible_config = os.path.join(temp_dir, ansible_config_relative) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + vars_file = os.path.join(temp_dir, data_context().content.integration_vars_path) + + file_copies = [ + (ansible_config_src, ansible_config), + (inventory_path_src, inventory_path), + ] + + if os.path.exists(vars_file_src): + file_copies.append((vars_file_src, vars_file)) + + file_copies += [(path, os.path.join(temp_dir, path)) for path in files_needed] + + integration_targets_relative_path = data_context().content.integration_targets_path + + directory_copies = [ + ( + os.path.join(integration_targets_relative_path, target.relative_path), + os.path.join(temp_dir, integration_targets_relative_path, target.relative_path) + ) + for target in target_dependencies + ] + + directory_copies = sorted(set(directory_copies)) + file_copies = sorted(set(file_copies)) + + if not args.explain: + make_dirs(integration_dir) + + for dir_src, dir_dst in directory_copies: + display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2) + + if not args.explain: + shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True) # type: ignore[arg-type] # incorrect type stub omits bytes path support + + for file_src, file_dst in file_copies: + display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2) + + if not args.explain: + make_dirs(os.path.dirname(file_dst)) + shutil.copy2(file_src, file_dst) + + yield IntegrationEnvironment(temp_dir, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + finally: + if not args.explain: + remove_tree(temp_dir) + + +@contextlib.contextmanager +def integration_test_config_file( + args: IntegrationConfig, + env_config: CloudEnvironmentConfig, + integration_dir: str, +) -> c.Iterator[t.Optional[str]]: + """Context manager that provides a config file for integration tests, if needed.""" + if not env_config: + yield None + return + + config_vars = (env_config.ansible_vars or {}).copy() + + config_vars.update(dict( + ansible_test=dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + ) + )) + + config_file = json.dumps(config_vars, indent=4, sort_keys=True) + + with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path: # type: str + filename = os.path.relpath(path, integration_dir) + + display.info('>>> Config File: %s\n%s' % (filename, config_file), verbosity=3) + + yield path + + +def create_inventory( + args: IntegrationConfig, + host_state: HostState, + inventory_path: str, + target: IntegrationTarget, +) -> None: + """Create inventory.""" + if isinstance(args, PosixIntegrationConfig): + if target.target_type == IntegrationTargetType.CONTROLLER: + display.info('Configuring controller inventory.', verbosity=1) + create_controller_inventory(args, inventory_path, host_state.controller_profile) + elif target.target_type == IntegrationTargetType.TARGET: + display.info('Configuring target inventory.', verbosity=1) + create_posix_inventory(args, inventory_path, host_state.target_profiles, 'needs/ssh/' in target.aliases) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + elif isinstance(args, WindowsIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_windows_inventory(args, inventory_path, target_profiles) + elif isinstance(args, NetworkIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_network_inventory(args, inventory_path, target_profiles) + + +def command_integration_filtered( + args: IntegrationConfig, + host_state: HostState, + targets: tuple[IntegrationTarget, ...], + all_targets: tuple[IntegrationTarget, ...], + inventory_path: str, + pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, + post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, +): + """Run integration tests for the specified targets.""" + found = False + passed = [] + failed = [] + + targets_iter = iter(targets) + all_targets_dict = dict((target.name, target) for target in all_targets) + + setup_errors = [] + setup_targets_executed: set[str] = set() + + for target in all_targets: + for setup_target in target.setup_once + target.setup_always: + if setup_target not in all_targets_dict: + setup_errors.append('Target "%s" contains invalid setup target: %s' % (target.name, setup_target)) + + if setup_errors: + raise ApplicationError('Found %d invalid setup aliases:\n%s' % (len(setup_errors), '\n'.join(setup_errors))) + + check_pyyaml(host_state.controller_profile.python) + + test_dir = os.path.join(ResultType.TMP.path, 'output_dir') + + if not args.explain and any('needs/ssh/' in target.aliases for target in targets): + max_tries = 20 + display.info('SSH connection to controller required by tests. Checking the connection.') + for i in range(1, max_tries + 1): + try: + run_command(args, ['ssh', '-o', 'BatchMode=yes', 'localhost', 'id'], capture=True) + display.info('SSH service responded.') + break + except SubprocessError: + if i == max_tries: + raise + seconds = 3 + display.warning('SSH service not responding. Waiting %d second(s) before checking again.' % seconds) + time.sleep(seconds) + + start_at_task = args.start_at_task + + results = {} + + target_profile = host_state.target_profiles[0] + + if isinstance(target_profile, PosixProfile): + target_python = target_profile.python + + if isinstance(target_profile, ControllerProfile): + if host_state.controller_profile.python.path != target_profile.python.path: + install_requirements(args, target_python, command=True, controller=False) # integration + elif isinstance(target_profile, SshTargetHostProfile): + connection = target_profile.get_controller_target_connections()[0] + install_requirements(args, target_python, command=True, controller=False, connection=connection) # integration + + coverage_manager = CoverageManager(args, host_state, inventory_path) + coverage_manager.setup() + + try: + for target in targets_iter: + if args.start_at and not found: + found = target.name == args.start_at + + if not found: + continue + + create_inventory(args, host_state, inventory_path, target) + + tries = 2 if args.retry_on_error else 1 + verbosity = args.verbosity + + cloud_environment = get_cloud_environment(args, target) + + try: + while tries: + tries -= 1 + + try: + if cloud_environment: + cloud_environment.setup_once() + + run_setup_targets(args, host_state, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, False) + + start_time = time.time() + + if pre_target: + pre_target(target) + + run_setup_targets(args, host_state, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, True) + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + try: + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, start_at_task, test_dir, inventory_path, coverage_manager) + start_at_task = None + finally: + if post_target: + post_target(target) + + end_time = time.time() + + results[target.name] = dict( + name=target.name, + type=target.type, + aliases=target.aliases, + modules=target.modules, + run_time_seconds=int(end_time - start_time), + setup_once=target.setup_once, + setup_always=target.setup_always, + ) + + break + except SubprocessError: + if cloud_environment: + cloud_environment.on_failure(target, tries) + + if not tries: + raise + + if target.retry_never: + display.warning(f'Skipping retry of test target "{target.name}" since it has been excluded from retries.') + raise + + display.warning('Retrying test target "%s" with maximum verbosity.' % target.name) + display.verbosity = args.verbosity = 6 + + passed.append(target) + except Exception as ex: + failed.append(target) + + if args.continue_on_error: + display.error(str(ex)) + continue + + display.notice('To resume at this test target, use the option: --start-at %s' % target.name) + + next_target = next(targets_iter, None) + + if next_target: + display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name) + + raise + finally: + display.verbosity = args.verbosity = verbosity + + finally: + if not args.explain: + coverage_manager.teardown() + + result_name = '%s-%s.json' % ( + args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) + + data = dict( + targets=results, + ) + + write_json_test_results(ResultType.DATA, result_name, data) + + if failed: + raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % ( + len(failed), len(passed) + len(failed), '\n'.join(target.name for target in failed))) + + +def command_integration_script( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test script.""" + display.info('Running %s integration test script' % target.name) + + env_config = None + + if isinstance(args, PosixIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + cmd = ['./%s' % os.path.basename(target.script_path)] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = os.path.join(test_env.targets_dir, target.relative_path) + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path: # type: t.Optional[str] + if config_path: + cmd += ['-e', '@%s' % config_path] + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def command_integration_role( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + start_at_task: t.Optional[str], + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test role.""" + display.info('Running %s integration test role' % target.name) + + env_config = None + + vars_files = [] + variables = dict( + output_dir=test_dir, + ) + + if isinstance(args, WindowsIntegrationConfig): + hosts = 'windows' + gather_facts = False + variables.update(dict( + win_output_dir=r'C:\ansible_testing', + )) + elif isinstance(args, NetworkIntegrationConfig): + hosts = target.network_platform + gather_facts = False + else: + hosts = 'testhost' + gather_facts = True + + if 'gather_facts/yes/' in target.aliases: + gather_facts = True + elif 'gather_facts/no/' in target.aliases: + gather_facts = False + + if not isinstance(args, NetworkIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + if os.path.exists(test_env.vars_file): + vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir)) + + play = dict( + hosts=hosts, + gather_facts=gather_facts, + vars_files=vars_files, + vars=variables, + roles=[ + target.name, + ], + ) + + if env_config: + if env_config.ansible_vars: + variables.update(env_config.ansible_vars) + + play.update(dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + )) + + playbook = json.dumps([play], indent=4, sort_keys=True) + + with named_temporary_file(args=args, directory=test_env.integration_dir, prefix='%s-' % target.name, suffix='.yml', content=playbook) as playbook_path: + filename = os.path.basename(playbook_path) + + display.info('>>> Playbook: %s\n%s' % (filename, playbook.strip()), verbosity=3) + + cmd = ['ansible-playbook', filename, '-i', os.path.relpath(test_env.inventory_path, test_env.integration_dir)] + + if start_at_task: + cmd += ['--start-at-task', start_at_task] + + if args.tags: + cmd += ['--tags', args.tags] + + if args.skip_tags: + cmd += ['--skip-tags', args.skip_tags] + + if args.diff: + cmd += ['--diff'] + + if isinstance(args, NetworkIntegrationConfig): + if args.testcase: + cmd += ['-e', 'testcase=%s' % args.testcase] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = test_env.integration_dir + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def run_setup_targets( + args: IntegrationConfig, + host_state: HostState, + test_dir: str, + target_names: c.Sequence[str], + targets_dict: dict[str, IntegrationTarget], + targets_executed: set[str], + inventory_path: str, + coverage_manager: CoverageManager, + always: bool, +): + """Run setup targets.""" + for target_name in target_names: + if not always and target_name in targets_executed: + continue + + target = targets_dict[target_name] + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, None, test_dir, inventory_path, coverage_manager) + + targets_executed.add(target_name) + + +def integration_environment( + args: IntegrationConfig, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + ansible_config: t.Optional[str], + env_config: t.Optional[CloudEnvironmentConfig], + test_env: IntegrationEnvironment, +) -> dict[str, str]: + """Return a dictionary of environment variables to use when running the given integration test target.""" + env = ansible_environment(args, ansible_config=ansible_config) + + callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else []) + + integration = dict( + JUNIT_OUTPUT_DIR=ResultType.JUNIT.path, + JUNIT_TASK_RELATIVE_PATH=test_env.test_dir, + JUNIT_REPLACE_OUT_OF_TREE_PATH='out-of-tree:', + ANSIBLE_CALLBACKS_ENABLED=','.join(sorted(set(callback_plugins))), + ANSIBLE_TEST_CI=args.metadata.ci_provider or get_ci_provider().code, + ANSIBLE_TEST_COVERAGE='check' if args.coverage_check else ('yes' if args.coverage else ''), + OUTPUT_DIR=test_dir, + INVENTORY_PATH=os.path.abspath(inventory_path), + ) + + if args.debug_strategy: + env.update(dict(ANSIBLE_STRATEGY='debug')) + + if 'non_local/' in target.aliases: + if args.coverage: + display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name) + + env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER='')) + + env.update(integration) + + return env + + +class IntegrationEnvironment: + """Details about the integration environment.""" + def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None: + self.test_dir = test_dir + self.integration_dir = integration_dir + self.targets_dir = targets_dir + self.inventory_path = inventory_path + self.ansible_config = ansible_config + self.vars_file = vars_file + + +class IntegrationCache(CommonCache): + """Integration cache.""" + @property + def integration_targets(self) -> list[IntegrationTarget]: + """The list of integration test targets.""" + return self.get('integration_targets', lambda: list(walk_integration_targets())) + + @property + def dependency_map(self) -> dict[str, set[IntegrationTarget]]: + """The dependency map of integration test targets.""" + return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets)) + + +def filter_profiles_for_target(args: IntegrationConfig, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Return a list of profiles after applying target filters.""" + if target.target_type == IntegrationTargetType.CONTROLLER: + profile_filter = get_target_filter(args, [args.controller], True) + elif target.target_type == IntegrationTargetType.TARGET: + profile_filter = get_target_filter(args, args.targets, False) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + + profiles = profile_filter.filter_profiles(profiles, target) + + return profiles + + +def get_integration_filter(args: IntegrationConfig, targets: list[IntegrationTarget]) -> set[str]: + """Return a list of test targets to skip based on the host(s) that will be used to run the specified test targets.""" + invalid_targets = sorted(target.name for target in targets if target.target_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets and not args.list_targets: + message = f'''Unable to determine context for the following test targets: {", ".join(invalid_targets)} + +Make sure the test targets are correctly named: + + - Modules - The target name should match the module name. + - Plugins - The target name should be "{{plugin_type}}_{{plugin_name}}". + +If necessary, context can be controlled by adding entries to the "aliases" file for a test target: + + - Add the name(s) of modules which are tested. + - Add "context/target" for module and module_utils tests (these will run on the target host). + - Add "context/controller" for other test types (these will run on the controller).''' + + raise ApplicationError(message) + + invalid_targets = sorted(target.name for target in targets if target.actual_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets: + if data_context().content.is_ansible: + display.warning(f'Unable to determine context for the following test targets: {", ".join(invalid_targets)}') + else: + display.warning(f'Unable to determine context for the following test targets, they will be run on the target host: {", ".join(invalid_targets)}') + + exclude: set[str] = set() + + controller_targets = [target for target in targets if target.target_type == IntegrationTargetType.CONTROLLER] + target_targets = [target for target in targets if target.target_type == IntegrationTargetType.TARGET] + + controller_filter = get_target_filter(args, [args.controller], True) + target_filter = get_target_filter(args, args.targets, False) + + controller_filter.filter_targets(controller_targets, exclude) + target_filter.filter_targets(target_targets, exclude) + + return exclude + + +def command_integration_filter(args: TIntegrationConfig, + targets: c.Iterable[TIntegrationTarget], + ) -> tuple[HostState, tuple[TIntegrationTarget, ...]]: + """Filter the given integration test targets.""" + targets = tuple(target for target in targets if 'hidden/' not in target.aliases) + changes = get_changes_filter(args) + + # special behavior when the --changed-all-target target is selected based on changes + if args.changed_all_target in changes: + # act as though the --changed-all-target target was in the include list + if args.changed_all_mode == 'include' and args.changed_all_target not in args.include: + args.include.append(args.changed_all_target) + args.delegate_args += ['--include', args.changed_all_target] + # act as though the --changed-all-target target was in the exclude list + elif args.changed_all_mode == 'exclude' and args.changed_all_target not in args.exclude: + args.exclude.append(args.changed_all_target) + + require = args.require + changes + exclude = args.exclude + + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + environment_exclude = get_integration_filter(args, list(internal_targets)) + + environment_exclude |= set(cloud_filter(args, internal_targets)) + + if environment_exclude: + exclude = sorted(set(exclude) | environment_exclude) + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + + if not internal_targets: + raise AllTargetsSkipped() + + if args.start_at and not any(target.name == args.start_at for target in internal_targets): + raise ApplicationError('Start at target matches nothing: %s' % args.start_at) + + cloud_init(args, internal_targets) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + if os.path.exists(vars_file_src): + def integration_config_callback(files: list[tuple[str, str]]) -> None: + """ + Add the integration config vars file to the payload file list. + This will preserve the file during delegation even if the file is ignored by source control. + """ + files.append((vars_file_src, data_context().content.integration_vars_path)) + + data_context().register_payload_callback(integration_config_callback) + + if args.list_targets: + raise ListTargets([target.name for target in internal_targets]) + + # requirements are installed using a callback since the windows-integration and network-integration host status checks depend on them + host_state = prepare_profiles(args, targets_use_pypi=True, requirements=requirements) # integration, windows-integration, network-integration + + if args.delegate: + raise Delegate(host_state=host_state, require=require, exclude=exclude) + + return host_state, internal_targets + + +def requirements(host_profile: HostProfile) -> None: + """Install requirements after bootstrapping and delegation.""" + if isinstance(host_profile, ControllerHostProfile) and host_profile.controller: + configure_pypi_proxy(host_profile.args, host_profile) # integration, windows-integration, network-integration + install_requirements(host_profile.args, host_profile.python, ansible=True, command=True) # integration, windows-integration, network-integration + elif isinstance(host_profile, PosixProfile) and not isinstance(host_profile, ControllerProfile): + configure_pypi_proxy(host_profile.args, host_profile) # integration diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py new file mode 100644 index 0000000..0c078b9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py @@ -0,0 +1,389 @@ +"""Plugin system for cloud providers and environments for use in integration tests.""" +from __future__ import annotations + +import abc +import atexit +import datetime +import os +import re +import tempfile +import time +import typing as t + +from ....encoding import ( + to_bytes, +) + +from ....io import ( + read_text_file, +) + +from ....util import ( + ANSIBLE_TEST_CONFIG_ROOT, + ApplicationError, + display, + import_plugins, + load_plugins, + cache, +) + +from ....util_common import ( + ResultType, + write_json_test_results, +) + +from ....target import ( + IntegrationTarget, +) + +from ....config import ( + IntegrationConfig, + TestConfig, +) + +from ....ci import ( + get_ci_provider, +) + +from ....data import ( + data_context, +) + +from ....docker_util import ( + docker_available, +) + + +@cache +def get_cloud_plugins() -> tuple[dict[str, t.Type[CloudProvider]], dict[str, t.Type[CloudEnvironment]]]: + """Import cloud plugins and load them into the plugin dictionaries.""" + import_plugins('commands/integration/cloud') + + providers: dict[str, t.Type[CloudProvider]] = {} + environments: dict[str, t.Type[CloudEnvironment]] = {} + + load_plugins(CloudProvider, providers) + load_plugins(CloudEnvironment, environments) + + return providers, environments + + +@cache +def get_provider_plugins() -> dict[str, t.Type[CloudProvider]]: + """Return a dictionary of the available cloud provider plugins.""" + return get_cloud_plugins()[0] + + +@cache +def get_environment_plugins() -> dict[str, t.Type[CloudEnvironment]]: + """Return a dictionary of the available cloud environment plugins.""" + return get_cloud_plugins()[1] + + +def get_cloud_platforms(args: TestConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[str]: + """Return cloud platform names for the specified targets.""" + if isinstance(args, IntegrationConfig): + if args.list_targets: + return [] + + if targets is None: + cloud_platforms = set(args.metadata.cloud_config or []) + else: + cloud_platforms = set(get_cloud_platform(target) for target in targets) + + cloud_platforms.discard(None) + + return sorted(cloud_platforms) + + +def get_cloud_platform(target: IntegrationTarget) -> t.Optional[str]: + """Return the name of the cloud platform used for the given target, or None if no cloud platform is used.""" + cloud_platforms = set(a.split('/')[1] for a in target.aliases if a.startswith('cloud/') and a.endswith('/') and a != 'cloud/') + + if not cloud_platforms: + return None + + if len(cloud_platforms) == 1: + cloud_platform = cloud_platforms.pop() + + if cloud_platform not in get_provider_plugins(): + raise ApplicationError('Target %s aliases contains unknown cloud platform: %s' % (target.name, cloud_platform)) + + return cloud_platform + + raise ApplicationError('Target %s aliases contains multiple cloud platforms: %s' % (target.name, ', '.join(sorted(cloud_platforms)))) + + +def get_cloud_providers(args: IntegrationConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[CloudProvider]: + """Return a list of cloud providers for the given targets.""" + return [get_provider_plugins()[p](args) for p in get_cloud_platforms(args, targets)] + + +def get_cloud_environment(args: IntegrationConfig, target: IntegrationTarget) -> t.Optional[CloudEnvironment]: + """Return the cloud environment for the given target, or None if no cloud environment is used for the target.""" + cloud_platform = get_cloud_platform(target) + + if not cloud_platform: + return None + + return get_environment_plugins()[cloud_platform](args) + + +def cloud_filter(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> list[str]: + """Return a list of target names to exclude based on the given targets.""" + if args.metadata.cloud_config is not None: + return [] # cloud filter already performed prior to delegation + + exclude: list[str] = [] + + for provider in get_cloud_providers(args, targets): + provider.filter(targets, exclude) + + return exclude + + +def cloud_init(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> None: + """Initialize cloud plugins for the given targets.""" + if args.metadata.cloud_config is not None: + return # cloud configuration already established prior to delegation + + args.metadata.cloud_config = {} + + results = {} + + for provider in get_cloud_providers(args, targets): + if args.prime_containers and not provider.uses_docker: + continue + + args.metadata.cloud_config[provider.platform] = {} + + start_time = time.time() + provider.setup() + end_time = time.time() + + results[provider.platform] = dict( + platform=provider.platform, + setup_seconds=int(end_time - start_time), + targets=[target.name for target in targets], + ) + + if not args.explain and results: + result_name = '%s-%s.json' % ( + args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) + + data = dict( + clouds=results, + ) + + write_json_test_results(ResultType.DATA, result_name, data) + + +class CloudBase(metaclass=abc.ABCMeta): + """Base class for cloud plugins.""" + _CONFIG_PATH = 'config_path' + _RESOURCE_PREFIX = 'resource_prefix' + _MANAGED = 'managed' + _SETUP_EXECUTED = 'setup_executed' + + def __init__(self, args: IntegrationConfig) -> None: + self.args = args + self.platform = self.__module__.rsplit('.', 1)[-1] + + def config_callback(files: list[tuple[str, str]]) -> None: + """Add the config file to the payload file list.""" + if self.platform not in self.args.metadata.cloud_config: + return # platform was initialized, but not used -- such as being skipped due to all tests being disabled + + if self._get_cloud_config(self._CONFIG_PATH, ''): + pair = (self.config_path, os.path.relpath(self.config_path, data_context().content.root)) + + if pair not in files: + display.info('Including %s config: %s -> %s' % (self.platform, pair[0], pair[1]), verbosity=3) + files.append(pair) + + data_context().register_payload_callback(config_callback) + + @property + def setup_executed(self) -> bool: + """True if setup has been executed, otherwise False.""" + return t.cast(bool, self._get_cloud_config(self._SETUP_EXECUTED, False)) + + @setup_executed.setter + def setup_executed(self, value: bool) -> None: + """True if setup has been executed, otherwise False.""" + self._set_cloud_config(self._SETUP_EXECUTED, value) + + @property + def config_path(self) -> str: + """Path to the configuration file.""" + return os.path.join(data_context().content.root, str(self._get_cloud_config(self._CONFIG_PATH))) + + @config_path.setter + def config_path(self, value: str) -> None: + """Path to the configuration file.""" + self._set_cloud_config(self._CONFIG_PATH, value) + + @property + def resource_prefix(self) -> str: + """Resource prefix.""" + return str(self._get_cloud_config(self._RESOURCE_PREFIX)) + + @resource_prefix.setter + def resource_prefix(self, value: str) -> None: + """Resource prefix.""" + self._set_cloud_config(self._RESOURCE_PREFIX, value) + + @property + def managed(self) -> bool: + """True if resources are managed by ansible-test, otherwise False.""" + return t.cast(bool, self._get_cloud_config(self._MANAGED)) + + @managed.setter + def managed(self, value: bool) -> None: + """True if resources are managed by ansible-test, otherwise False.""" + self._set_cloud_config(self._MANAGED, value) + + def _get_cloud_config(self, key: str, default: t.Optional[t.Union[str, int, bool]] = None) -> t.Union[str, int, bool]: + """Return the specified value from the internal configuration.""" + if default is not None: + return self.args.metadata.cloud_config[self.platform].get(key, default) + + return self.args.metadata.cloud_config[self.platform][key] + + def _set_cloud_config(self, key: str, value: t.Union[str, int, bool]) -> None: + """Set the specified key and value in the internal configuration.""" + self.args.metadata.cloud_config[self.platform][key] = value + + +class CloudProvider(CloudBase): + """Base class for cloud provider plugins. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig, config_extension: str = '.ini') -> None: + super().__init__(args) + + self.ci_provider = get_ci_provider() + self.remove_config = False + self.config_static_name = 'cloud-config-%s%s' % (self.platform, config_extension) + self.config_static_path = os.path.join(data_context().content.integration_path, self.config_static_name) + self.config_template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, '%s.template' % self.config_static_name) + self.config_extension = config_extension + + self.uses_config = False + self.uses_docker = False + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + if not self.uses_docker and not self.uses_config: + return + + if self.uses_docker and docker_available(): + return + + if self.uses_config and os.path.exists(self.config_static_path): + return + + skip = 'cloud/%s/' % self.platform + skipped = [target.name for target in targets if skip in target.aliases] + + if skipped: + exclude.append(skip) + + if not self.uses_docker and self.uses_config: + display.warning('Excluding tests marked "%s" which require a "%s" config file (see "%s"): %s' + % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped))) + elif self.uses_docker and not self.uses_config: + display.warning('Excluding tests marked "%s" which requires container support: %s' + % (skip.rstrip('/'), ', '.join(skipped))) + elif self.uses_docker and self.uses_config: + display.warning('Excluding tests marked "%s" which requires container support or a "%s" config file (see "%s"): %s' + % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped))) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + self.resource_prefix = self.ci_provider.generate_resource_prefix() + self.resource_prefix = re.sub(r'[^a-zA-Z0-9]+', '-', self.resource_prefix)[:63].lower().rstrip('-') + + atexit.register(self.cleanup) + + def cleanup(self) -> None: + """Clean up the cloud resource and any temporary configuration files after tests complete.""" + if self.remove_config: + os.remove(self.config_path) + + def _use_static_config(self) -> bool: + """Use a static config file if available. Returns True if static config is used, otherwise returns False.""" + if os.path.isfile(self.config_static_path): + display.info('Using existing %s cloud config: %s' % (self.platform, self.config_static_path), verbosity=1) + self.config_path = self.config_static_path + static = True + else: + static = False + + self.managed = not static + + return static + + def _write_config(self, content: str) -> None: + """Write the given content to the config file.""" + prefix = '%s-' % os.path.splitext(os.path.basename(self.config_static_path))[0] + + with tempfile.NamedTemporaryFile(dir=data_context().content.integration_path, prefix=prefix, suffix=self.config_extension, delete=False) as config_fd: + filename = os.path.join(data_context().content.integration_path, os.path.basename(config_fd.name)) + + self.config_path = filename + self.remove_config = True + + display.info('>>> Config: %s\n%s' % (filename, content.strip()), verbosity=3) + + config_fd.write(to_bytes(content)) + config_fd.flush() + + def _read_config_template(self) -> str: + """Read and return the configuration template.""" + lines = read_text_file(self.config_template_path).splitlines() + lines = [line for line in lines if not line.startswith('#')] + config = '\n'.join(lines).strip() + '\n' + return config + + @staticmethod + def _populate_config_template(template: str, values: dict[str, str]) -> str: + """Populate and return the given template with the provided values.""" + for key in sorted(values): + value = values[key] + template = template.replace('@%s' % key, value) + + return template + + +class CloudEnvironment(CloudBase): + """Base class for cloud environment plugins. Updates integration test environment after delegation.""" + def setup_once(self) -> None: + """Run setup if it has not already been run.""" + if self.setup_executed: + return + + self.setup() + self.setup_executed = True + + def setup(self) -> None: + """Setup which should be done once per environment instead of once per test target.""" + + @abc.abstractmethod + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + + +class CloudEnvironmentConfig: + """Configuration for the environment.""" + def __init__(self, + env_vars: t.Optional[dict[str, str]] = None, + ansible_vars: t.Optional[dict[str, t.Any]] = None, + module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None, + callback_plugins: t.Optional[list[str]] = None, + ): + self.env_vars = env_vars + self.ansible_vars = ansible_vars + self.module_defaults = module_defaults + self.callback_plugins = callback_plugins diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py new file mode 100644 index 0000000..007d383 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py @@ -0,0 +1,79 @@ +"""ACME plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ACMEProvider(CloudProvider): + """ACME plugin. Sets up cloud resources for tests.""" + DOCKER_SIMULATOR_NAME = 'acme-simulator' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # The simulator must be pinned to a specific version to guarantee CI passes with the version used. + if os.environ.get('ANSIBLE_ACME_CONTAINER'): + self.image = os.environ.get('ANSIBLE_ACME_CONTAINER') + else: + self.image = 'quay.io/ansible/acme-test-container:2.1.0' + + self.uses_docker = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Create a ACME test container using docker.""" + ports = [ + 5000, # control port for flask app in container + 14000, # Pebble ACME CA + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('acme_host', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class ACMEEnvironment(CloudEnvironment): + """ACME environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + ansible_vars = dict( + acme_host=self._get_cloud_config('acme_host'), + ) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py new file mode 100644 index 0000000..234f311 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py @@ -0,0 +1,131 @@ +"""AWS plugin for integration tests.""" +from __future__ import annotations + +import os +import uuid +import configparser +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from ....host_configs import ( + OriginConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class AwsCloudProvider(CloudProvider): + """AWS cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + aws_config_path = os.path.expanduser('~/.aws') + + if os.path.exists(aws_config_path) and isinstance(self.args.controller, OriginConfig): + raise ApplicationError('Rename "%s" or use the --docker or --remote option to isolate tests.' % aws_config_path) + + if not self._use_static_config(): + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Request AWS credentials through the Ansible Core CI service.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + + aci = self._create_ansible_core_ci() + + response = aci.start() + + if not self.args.explain: + credentials = response['aws']['credentials'] + + values = dict( + ACCESS_KEY=credentials['access_key'], + SECRET_KEY=credentials['secret_key'], + SECURITY_TOKEN=credentials['session_token'], + REGION='us-east-1', + ) + + display.sensitive.add(values['SECRET_KEY']) + display.sensitive.add(values['SECURITY_TOKEN']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return an AWS instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='aws')) + + +class AwsCloudEnvironment(CloudEnvironment): + """AWS cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars: dict[str, t.Any] = dict( + resource_prefix=self.resource_prefix, + tiny_prefix=uuid.uuid4().hex[0:12] + ) + + ansible_vars.update(dict(parser.items('default'))) + + display.sensitive.add(ansible_vars.get('aws_secret_key')) + display.sensitive.add(ansible_vars.get('security_token')) + + if 'aws_cleanup' not in ansible_vars: + ansible_vars['aws_cleanup'] = not self.managed + + env_vars = {'ANSIBLE_DEBUG_BOTOCORE_LOGS': 'True'} + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + callback_plugins=['aws_resource_actions'], + ) + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + if not tries and self.managed: + display.notice('If %s failed due to permissions, the IAM test policy may need to be updated. ' + 'https://docs.ansible.com/ansible/devel/collections/amazon/aws/docsite/dev_guidelines.html#aws-permissions-for-integration-tests' + % target.name) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py new file mode 100644 index 0000000..dc5136a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py @@ -0,0 +1,166 @@ +"""Azure plugin for integration tests.""" +from __future__ import annotations + +import configparser +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class AzureCloudProvider(CloudProvider): + """Azure cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.aci: t.Optional[AnsibleCoreCI] = None + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + get_config(self.config_path) # check required variables + + def cleanup(self) -> None: + """Clean up the cloud resource and any temporary configuration files after tests complete.""" + if self.aci: + self.aci.stop() + + super().cleanup() + + def _setup_dynamic(self) -> None: + """Request Azure credentials through ansible-core-ci.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + response = {} + + aci = self._create_ansible_core_ci() + + aci_result = aci.start() + + if not self.args.explain: + response = aci_result['azure'] + self.aci = aci + + if not self.args.explain: + values = dict( + AZURE_CLIENT_ID=response['clientId'], + AZURE_SECRET=response['clientSecret'], + AZURE_SUBSCRIPTION_ID=response['subscriptionId'], + AZURE_TENANT=response['tenantId'], + RESOURCE_GROUP=response['resourceGroupNames'][0], + RESOURCE_GROUP_SECONDARY=response['resourceGroupNames'][1], + ) + + display.sensitive.add(values['AZURE_SECRET']) + + config = '\n'.join('%s: %s' % (key, values[key]) for key in sorted(values)) + + config = '[default]\n' + config + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return an Azure instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='azure')) + + +class AzureCloudEnvironment(CloudEnvironment): + """Azure cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = get_config(self.config_path) + + display.sensitive.add(env_vars.get('AZURE_SECRET')) + display.sensitive.add(env_vars.get('AZURE_PASSWORD')) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + if not tries and self.managed: + display.notice('If %s failed due to permissions, the test policy may need to be updated.' % target.name) + + +def get_config(config_path: str) -> dict[str, str]: + """Return a configuration dictionary parsed from the given configuration path.""" + parser = configparser.ConfigParser() + parser.read(config_path) + + config = dict((key.upper(), value) for key, value in parser.items('default')) + + rg_vars = ( + 'RESOURCE_GROUP', + 'RESOURCE_GROUP_SECONDARY', + ) + + sp_vars = ( + 'AZURE_CLIENT_ID', + 'AZURE_SECRET', + 'AZURE_SUBSCRIPTION_ID', + 'AZURE_TENANT', + ) + + ad_vars = ( + 'AZURE_AD_USER', + 'AZURE_PASSWORD', + 'AZURE_SUBSCRIPTION_ID', + ) + + rg_ok = all(var in config for var in rg_vars) + sp_ok = all(var in config for var in sp_vars) + ad_ok = all(var in config for var in ad_vars) + + if not rg_ok: + raise ApplicationError('Resource groups must be defined with: %s' % ', '.join(sorted(rg_vars))) + + if not sp_ok and not ad_ok: + raise ApplicationError('Credentials must be defined using either:\nService Principal: %s\nActive Directory: %s' % ( + ', '.join(sorted(sp_vars)), ', '.join(sorted(ad_vars)))) + + return config diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py new file mode 100644 index 0000000..f453ef3 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# (c) 2018, Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +"""Cloudscale plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class CloudscaleCloudProvider(CloudProvider): + """Cloudscale cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class CloudscaleCloudEnvironment(CloudEnvironment): + """Cloudscale cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + CLOUDSCALE_API_TOKEN=parser.get('default', 'cloudscale_api_token'), + ) + + display.sensitive.add(env_vars['CLOUDSCALE_API_TOKEN']) + + ansible_vars = dict( + cloudscale_resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py new file mode 100644 index 0000000..0037b42 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py @@ -0,0 +1,174 @@ +"""CloudStack plugin for integration tests.""" +from __future__ import annotations + +import json +import configparser +import os +import urllib.parse +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....docker_util import ( + docker_exec, +) + +from ....containers import ( + CleanupMode, + run_support_container, + wait_for_file, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class CsCloudProvider(CloudProvider): + """CloudStack cloud provider plugin. Sets up cloud resources before delegation.""" + DOCKER_SIMULATOR_NAME = 'cloudstack-sim' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.image = os.environ.get('ANSIBLE_CLOUDSTACK_CONTAINER', 'quay.io/ansible/cloudstack-test-container:1.4.0') + self.host = '' + self.port = 0 + + self.uses_docker = True + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_static(self) -> None: + """Configure CloudStack tests for use with static configuration.""" + parser = configparser.ConfigParser() + parser.read(self.config_static_path) + + endpoint = parser.get('cloudstack', 'endpoint') + + parts = urllib.parse.urlparse(endpoint) + + self.host = parts.hostname + + if not self.host: + raise ApplicationError('Could not determine host from endpoint: %s' % endpoint) + + if parts.port: + self.port = parts.port + elif parts.scheme == 'http': + self.port = 80 + elif parts.scheme == 'https': + self.port = 443 + else: + raise ApplicationError('Could not determine port from endpoint: %s' % endpoint) + + display.info('Read cs host "%s" and port %d from config: %s' % (self.host, self.port, self.config_static_path), verbosity=1) + + def _setup_dynamic(self) -> None: + """Create a CloudStack simulator using docker.""" + config = self._read_config_template() + + self.port = 8888 + + ports = [ + self.port, + ] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + if not descriptor: + return + + # apply work-around for OverlayFS issue + # https://github.com/docker/for-linux/issues/72#issuecomment-319904698 + docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'], capture=True) + + if self.args.explain: + values = dict( + HOST=self.host, + PORT=str(self.port), + ) + else: + credentials = self._get_credentials(self.DOCKER_SIMULATOR_NAME) + + values = dict( + HOST=self.DOCKER_SIMULATOR_NAME, + PORT=str(self.port), + KEY=credentials['apikey'], + SECRET=credentials['secretkey'], + ) + + display.sensitive.add(values['SECRET']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _get_credentials(self, container_name: str) -> dict[str, t.Any]: + """Wait for the CloudStack simulator to return credentials.""" + def check(value) -> bool: + """Return True if the given configuration is valid JSON, otherwise return False.""" + # noinspection PyBroadException + try: + json.loads(value) + except Exception: # pylint: disable=broad-except + return False # sometimes the file exists but is not yet valid JSON + + return True + + stdout = wait_for_file(self.args, container_name, '/var/www/html/admin.json', sleep=10, tries=30, check=check) + + return json.loads(stdout) + + +class CsCloudEnvironment(CloudEnvironment): + """CloudStack cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + config = dict(parser.items('default')) + + env_vars = dict( + CLOUDSTACK_ENDPOINT=config['endpoint'], + CLOUDSTACK_KEY=config['key'], + CLOUDSTACK_SECRET=config['secret'], + CLOUDSTACK_TIMEOUT=config['timeout'], + ) + + display.sensitive.add(env_vars['CLOUDSTACK_SECRET']) + + ansible_vars = dict( + cs_resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py new file mode 100644 index 0000000..a46bf70 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py @@ -0,0 +1,55 @@ +"""DigitalOcean plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class DigitalOceanCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class DigitalOceanCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + DO_API_KEY=parser.get('default', 'key'), + ) + + display.sensitive.add(env_vars['DO_API_KEY']) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py new file mode 100644 index 0000000..c2413ee --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py @@ -0,0 +1,94 @@ +"""Foreman plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ForemanProvider(CloudProvider): + """Foreman plugin. Sets up Foreman stub server for tests.""" + DOCKER_SIMULATOR_NAME = 'foreman-stub' + + # Default image to run Foreman stub from. + # + # The simulator must be pinned to a specific version + # to guarantee CI passes with the version used. + # + # It's source source itself resides at: + # https://github.com/ansible/foreman-test-container + DOCKER_IMAGE = 'quay.io/ansible/foreman-test-container:1.4.0' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.__container_from_env = os.environ.get('ANSIBLE_FRMNSIM_CONTAINER') + """ + Overrides target container, might be used for development. + + Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want + to use other image. Omit/empty otherwise. + """ + self.image = self.__container_from_env or self.DOCKER_IMAGE + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Spawn a Foreman stub within docker container.""" + foreman_port = 8080 + + ports = [ + foreman_port, + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('FOREMAN_HOST', self.DOCKER_SIMULATOR_NAME) + self._set_cloud_config('FOREMAN_PORT', str(foreman_port)) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class ForemanEnvironment(CloudEnvironment): + """Foreman environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = dict( + FOREMAN_HOST=str(self._get_cloud_config('FOREMAN_HOST')), + FOREMAN_PORT=str(self._get_cloud_config('FOREMAN_PORT')), + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py new file mode 100644 index 0000000..e180a02 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py @@ -0,0 +1,168 @@ +"""Galaxy (ansible-galaxy) plugin for integration tests.""" +from __future__ import annotations + +import os +import tempfile + +from ....config import ( + IntegrationConfig, +) + +from ....docker_util import ( + docker_cp_to, +) + +from ....containers import ( + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +# We add BasicAuthentication, to make the tasks that deal with +# direct API access easier to deal with across galaxy_ng and pulp +SETTINGS = b''' +CONTENT_ORIGIN = 'http://ansible-ci-pulp:80' +ANSIBLE_API_HOSTNAME = 'http://ansible-ci-pulp:80' +ANSIBLE_CONTENT_HOSTNAME = 'http://ansible-ci-pulp:80/pulp/content' +TOKEN_AUTH_DISABLED = True +GALAXY_REQUIRE_CONTENT_APPROVAL = False +GALAXY_AUTHENTICATION_CLASSES = [ + "rest_framework.authentication.SessionAuthentication", + "rest_framework.authentication.TokenAuthentication", + "rest_framework.authentication.BasicAuthentication", +] +''' + +SET_ADMIN_PASSWORD = b'''#!/usr/bin/execlineb -S0 +foreground { + redirfd -w 1 /dev/null + redirfd -w 2 /dev/null + export DJANGO_SETTINGS_MODULE pulpcore.app.settings + export PULP_CONTENT_ORIGIN localhost + s6-setuidgid postgres + if { /usr/local/bin/django-admin reset-admin-password --password password } + if { /usr/local/bin/pulpcore-manager create-group system:partner-engineers --users admin } +} +''' + +# There are 2 overrides here: +# 1. Change the gunicorn bind address from 127.0.0.1 to 0.0.0.0 now that Galaxy NG does not allow us to access the +# Pulp API through it. +# 2. Grant access allowing us to DELETE a namespace in Galaxy NG. This is as CI deletes and recreates repos and +# distributions in Pulp which now breaks the namespace in Galaxy NG. Recreating it is the "simple" fix to get it +# working again. +# These may not be needed in the future, especially if 1 becomes configurable by an env var but for now they must be +# done. +OVERRIDES = b'''#!/usr/bin/execlineb -S0 +foreground { + sed -i "0,/\\"127.0.0.1:24817\\"/s//\\"0.0.0.0:24817\\"/" /etc/services.d/pulpcore-api/run +} + +# This sed calls changes the first occurrence to "allow" which is conveniently the delete operation for a namespace. +# https://github.com/ansible/galaxy_ng/blob/master/galaxy_ng/app/access_control/statements/standalone.py#L9-L11. +backtick NG_PREFIX { python -c "import galaxy_ng; print(galaxy_ng.__path__[0], end='')" } +importas ng_prefix NG_PREFIX +foreground { + sed -i "0,/\\"effect\\": \\"deny\\"/s//\\"effect\\": \\"allow\\"/" ${ng_prefix}/app/access_control/statements/standalone.py +}''' + + +class GalaxyProvider(CloudProvider): + """ + Galaxy plugin. Sets up pulp (ansible-galaxy) servers for tests. + The pulp source itself resides at: https://github.com/pulp/pulp-oci-images + """ + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # Cannot use the latest container image as either galaxy_ng 4.2.0rc2 or pulp 0.5.0 has sporatic issues with + # dropping published collections in CI. Try running the tests multiple times when updating. Will also need to + # comment out the cache tests in 'test/integration/targets/ansible-galaxy-collection/tasks/install.yml' when + # the newer update is available. + self.pulp = os.environ.get( + 'ANSIBLE_PULP_CONTAINER', + 'quay.io/ansible/pulp-galaxy-ng:b79a7be64eff' + ) + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + galaxy_port = 80 + pulp_host = 'ansible-ci-pulp' + pulp_port = 24817 + + ports = [ + galaxy_port, + pulp_port, + ] + + # Create the container, don't run it, we need to inject configs before it starts + descriptor = run_support_container( + self.args, + self.platform, + self.pulp, + pulp_host, + ports, + start=False, + allow_existing=True, + ) + + if not descriptor: + return + + if not descriptor.running: + pulp_id = descriptor.container_id + + injected_files = { + '/etc/pulp/settings.py': SETTINGS, + '/etc/cont-init.d/111-postgres': SET_ADMIN_PASSWORD, + '/etc/cont-init.d/000-ansible-test-overrides': OVERRIDES, + } + for path, content in injected_files.items(): + with tempfile.NamedTemporaryFile() as temp_fd: + temp_fd.write(content) + temp_fd.flush() + docker_cp_to(self.args, pulp_id, temp_fd.name, path) + + descriptor.start(self.args) + + self._set_cloud_config('PULP_HOST', pulp_host) + self._set_cloud_config('PULP_PORT', str(pulp_port)) + self._set_cloud_config('GALAXY_PORT', str(galaxy_port)) + self._set_cloud_config('PULP_USER', 'admin') + self._set_cloud_config('PULP_PASSWORD', 'password') + + +class GalaxyEnvironment(CloudEnvironment): + """Galaxy environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + pulp_user = str(self._get_cloud_config('PULP_USER')) + pulp_password = str(self._get_cloud_config('PULP_PASSWORD')) + pulp_host = self._get_cloud_config('PULP_HOST') + galaxy_port = self._get_cloud_config('GALAXY_PORT') + pulp_port = self._get_cloud_config('PULP_PORT') + + return CloudEnvironmentConfig( + ansible_vars=dict( + pulp_user=pulp_user, + pulp_password=pulp_password, + pulp_api='http://%s:%s' % (pulp_host, pulp_port), + pulp_server='http://%s:%s/pulp_ansible/galaxy/' % (pulp_host, pulp_port), + galaxy_ng_server='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port), + ), + env_vars=dict( + PULP_USER=pulp_user, + PULP_PASSWORD=pulp_password, + PULP_SERVER='http://%s:%s/pulp_ansible/galaxy/api/' % (pulp_host, pulp_port), + GALAXY_NG_SERVER='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port), + ), + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py new file mode 100644 index 0000000..28ffb7b --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py @@ -0,0 +1,55 @@ +# Copyright: (c) 2018, Google Inc. +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""GCP plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class GcpCloudProvider(CloudProvider): + """GCP cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + display.notice( + 'static configuration could not be used. are you missing a template file?' + ) + + +class GcpCloudEnvironment(CloudEnvironment): + """GCP cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict(parser.items('default'))) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py new file mode 100644 index 0000000..4d75f22 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py @@ -0,0 +1,106 @@ +"""Hetzner Cloud plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class HcloudCloudProvider(CloudProvider): + """Hetzner Cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Request Hetzner credentials through the Ansible Core CI service.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + + aci = self._create_ansible_core_ci() + + response = aci.start() + + if not self.args.explain: + token = response['hetzner']['token'] + + display.sensitive.add(token) + display.info('Hetzner Cloud Token: %s' % token, verbosity=1) + + values = dict( + TOKEN=token, + ) + + display.sensitive.add(values['TOKEN']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return a Heztner instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='hetzner')) + + +class HcloudCloudEnvironment(CloudEnvironment): + """Hetzner Cloud cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + HCLOUD_TOKEN=parser.get('default', 'hcloud_api_token'), + ) + + display.sensitive.add(env_vars['HCLOUD_TOKEN']) + + ansible_vars = dict( + hcloud_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py new file mode 100644 index 0000000..e250eed --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py @@ -0,0 +1,92 @@ +"""HTTP Tester plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....util import ( + display, + generate_password, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + +KRB5_PASSWORD_ENV = 'KRB5_PASSWORD' + + +class HttptesterProvider(CloudProvider): + """HTTP Tester provider plugin. Sets up resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.image = os.environ.get('ANSIBLE_HTTP_TEST_CONTAINER', 'quay.io/ansible/http-test-container:2.1.0') + + self.uses_docker = True + + def setup(self) -> None: + """Setup resources before delegation.""" + super().setup() + + ports = [ + 80, + 88, + 443, + 444, + 749, + ] + + aliases = [ + 'ansible.http.tests', + 'sni1.ansible.http.tests', + 'fail.ansible.http.tests', + 'self-signed.ansible.http.tests', + ] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + 'http-test-container', + ports, + aliases=aliases, + allow_existing=True, + cleanup=CleanupMode.YES, + env={ + KRB5_PASSWORD_ENV: generate_password(), + }, + ) + + if not descriptor: + return + + # Read the password from the container environment. + # This allows the tests to work when re-using an existing container. + # The password is marked as sensitive, since it may differ from the one we generated. + krb5_password = descriptor.details.container.env_dict()[KRB5_PASSWORD_ENV] + display.sensitive.add(krb5_password) + + self._set_cloud_config(KRB5_PASSWORD_ENV, krb5_password) + + +class HttptesterEnvironment(CloudEnvironment): + """HTTP Tester environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + return CloudEnvironmentConfig( + env_vars=dict( + HTTPTESTER='1', # backwards compatibility for tests intended to work with or without HTTP Tester + KRB5_PASSWORD=str(self._get_cloud_config(KRB5_PASSWORD_ENV)), + ) + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py new file mode 100644 index 0000000..df0ebb0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py @@ -0,0 +1,97 @@ +"""NIOS plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class NiosProvider(CloudProvider): + """Nios plugin. Sets up NIOS mock server for tests.""" + DOCKER_SIMULATOR_NAME = 'nios-simulator' + + # Default image to run the nios simulator. + # + # The simulator must be pinned to a specific version + # to guarantee CI passes with the version used. + # + # It's source source itself resides at: + # https://github.com/ansible/nios-test-container + DOCKER_IMAGE = 'quay.io/ansible/nios-test-container:1.4.0' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.__container_from_env = os.environ.get('ANSIBLE_NIOSSIM_CONTAINER') + """ + Overrides target container, might be used for development. + + Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want + to use other image. Omit/empty otherwise. + """ + + self.image = self.__container_from_env or self.DOCKER_IMAGE + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Spawn a NIOS simulator within docker container.""" + nios_port = 443 + + ports = [ + nios_port, + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('NIOS_HOST', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class NiosEnvironment(CloudEnvironment): + """NIOS environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + ansible_vars = dict( + nios_provider=dict( + host=self._get_cloud_config('NIOS_HOST'), + username='admin', + password='infoblox', + ), + ) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py new file mode 100644 index 0000000..d005a3c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py @@ -0,0 +1,60 @@ +"""OpenNebula plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class OpenNebulaCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + self.uses_config = True + + def _setup_dynamic(self) -> None: + display.info('No config file provided, will run test from fixtures') + + config = self._read_config_template() + values = dict( + URL="http://localhost/RPC2", + USERNAME='oneadmin', + PASSWORD='onepass', + FIXTURES='true', + REPLAY='true', + ) + config = self._populate_config_template(config, values) + self._write_config(config) + + +class OpenNebulaCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict(parser.items('default'))) + + display.sensitive.add(ansible_vars.get('opennebula_password')) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py new file mode 100644 index 0000000..da930c0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py @@ -0,0 +1,114 @@ +"""OpenShift plugin for integration tests.""" +from __future__ import annotations + +import re + +from ....io import ( + read_text_file, +) + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, + wait_for_file, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class OpenShiftCloudProvider(CloudProvider): + """OpenShift cloud provider plugin. Sets up cloud resources before delegation.""" + DOCKER_CONTAINER_NAME = 'openshift-origin' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args, config_extension='.kubeconfig') + + # The image must be pinned to a specific version to guarantee CI passes with the version used. + self.image = 'quay.io/ansible/openshift-origin:v3.9.0' + + self.uses_docker = True + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_static(self) -> None: + """Configure OpenShift tests for use with static configuration.""" + config = read_text_file(self.config_static_path) + + match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE) + + if not match: + display.warning('Could not find OpenShift endpoint in kubeconfig.') + + def _setup_dynamic(self) -> None: + """Create a OpenShift container using docker.""" + port = 8443 + + ports = [ + port, + ] + + cmd = ['start', 'master', '--listen', 'https://0.0.0.0:%d' % port] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_CONTAINER_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + cmd=cmd, + ) + + if not descriptor: + return + + if self.args.explain: + config = '# Unknown' + else: + config = self._get_config(self.DOCKER_CONTAINER_NAME, 'https://%s:%s/' % (self.DOCKER_CONTAINER_NAME, port)) + + self._write_config(config) + + def _get_config(self, container_name: str, server: str) -> str: + """Get OpenShift config from container.""" + stdout = wait_for_file(self.args, container_name, '/var/lib/origin/openshift.local.config/master/admin.kubeconfig', sleep=10, tries=30) + + config = stdout + config = re.sub(r'^( *)certificate-authority-data: .*$', r'\1insecure-skip-tls-verify: true', config, flags=re.MULTILINE) + config = re.sub(r'^( *)server: .*$', r'\1server: %s' % server, config, flags=re.MULTILINE) + + return config + + +class OpenShiftCloudEnvironment(CloudEnvironment): + """OpenShift cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = dict( + K8S_AUTH_KUBECONFIG=self.config_path, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py new file mode 100644 index 0000000..04c2d89 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py @@ -0,0 +1,56 @@ +"""Scaleway plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ScalewayCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class ScalewayCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + SCW_API_KEY=parser.get('default', 'key'), + SCW_ORG=parser.get('default', 'org') + ) + + display.sensitive.add(env_vars['SCW_API_KEY']) + + ansible_vars = dict( + scw_org=parser.get('default', 'org'), + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py new file mode 100644 index 0000000..df1651f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py @@ -0,0 +1,138 @@ +"""VMware vCenter plugin for integration tests.""" +from __future__ import annotations + +import configparser +import os + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class VcenterProvider(CloudProvider): + """VMware vcenter/esx plugin. Sets up cloud resources for tests.""" + DOCKER_SIMULATOR_NAME = 'vcenter-simulator' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # The simulator must be pinned to a specific version to guarantee CI passes with the version used. + if os.environ.get('ANSIBLE_VCSIM_CONTAINER'): + self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER') + else: + self.image = 'quay.io/ansible/vcenter-test-container:1.7.0' + + # VMware tests can be run on govcsim or BYO with a static config file. + # The simulator is the default if no config is provided. + self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim') + + if self.vmware_test_platform == 'govcsim': + self.uses_docker = True + self.uses_config = False + elif self.vmware_test_platform == 'static': + self.uses_docker = False + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._set_cloud_config('vmware_test_platform', self.vmware_test_platform) + + if self.vmware_test_platform == 'govcsim': + self._setup_dynamic_simulator() + self.managed = True + elif self.vmware_test_platform == 'static': + self._use_static_config() + self._setup_static() + else: + raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform) + + def _setup_dynamic_simulator(self) -> None: + """Create a vcenter simulator using docker.""" + ports = [ + 443, + 8080, + 8989, + 5000, # control port for flask app in simulator + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + if not os.path.exists(self.config_static_path): + raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path) + + +class VcenterEnvironment(CloudEnvironment): + """VMware vcenter/esx environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + try: + # We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM, + # We do a try/except instead + parser = configparser.ConfigParser() + parser.read(self.config_path) # static + + env_vars = {} + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + ansible_vars.update(dict(parser.items('DEFAULT', raw=True))) + except KeyError: # govcsim + env_vars = dict( + VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')), + VCENTER_USERNAME='user', + VCENTER_PASSWORD='pass', + ) + + ansible_vars = dict( + vcsim=str(self._get_cloud_config('vcenter_hostname')), + vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')), + vcenter_username='user', + vcenter_password='pass', + ) + + for key, value in ansible_vars.items(): + if key.endswith('_password'): + display.sensitive.add(value) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + module_defaults={ + 'group/vmware': { + 'hostname': ansible_vars['vcenter_hostname'], + 'username': ansible_vars['vcenter_username'], + 'password': ansible_vars['vcenter_password'], + 'port': ansible_vars.get('vcenter_port', '443'), + 'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'), + }, + }, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py new file mode 100644 index 0000000..1993cda --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py @@ -0,0 +1,55 @@ +"""Vultr plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class VultrCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class VultrCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + VULTR_API_KEY=parser.get('default', 'key'), + ) + + display.sensitive.add(env_vars['VULTR_API_KEY']) + + ansible_vars = dict( + vultr_resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/coverage.py b/test/lib/ansible_test/_internal/commands/integration/coverage.py new file mode 100644 index 0000000..5a486e9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/coverage.py @@ -0,0 +1,417 @@ +"""Code coverage support for integration tests.""" +from __future__ import annotations + +import abc +import os +import shutil +import tempfile +import typing as t +import zipfile + +from ...io import ( + write_text_file, +) + +from ...ansible_util import ( + run_playbook, +) + +from ...config import ( + IntegrationConfig, +) + +from ...util import ( + COVERAGE_CONFIG_NAME, + MODE_DIRECTORY, + MODE_DIRECTORY_WRITE, + MODE_FILE, + SubprocessError, + cache, + display, + generate_name, + get_generic_type, + get_type_map, + remove_tree, + sanitize_host_name, + verified_chmod, +) + +from ...util_common import ( + ResultType, +) + +from ...coverage_util import ( + generate_coverage_config, + get_coverage_platform, +) + +from ...host_configs import ( + HostConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ...data import ( + data_context, +) + +from ...host_profiles import ( + ControllerProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, +) + +from ...connections import ( + LocalConnection, +) + +from ...inventory import ( + create_windows_inventory, + create_posix_inventory, +) + +THostConfig = t.TypeVar('THostConfig', bound=HostConfig) + + +class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta): + """Base class for configuring hosts for integration test code coverage.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + self.profiles = self.get_profiles() + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profile_type = get_generic_type(type(self), HostConfig) + profiles = [profile for profile in self.host_state.target_profiles if isinstance(profile.config, profile_type)] + + return profiles + + @property + @abc.abstractmethod + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + + @abc.abstractmethod + def setup(self) -> None: + """Perform setup for code coverage.""" + + @abc.abstractmethod + def teardown(self) -> None: + """Perform teardown for code coverage.""" + + @abc.abstractmethod + def create_inventory(self) -> None: + """Create inventory, if needed.""" + + @abc.abstractmethod + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + def run_playbook(self, playbook: str, variables: dict[str, str]) -> None: + """Run the specified playbook using the current inventory.""" + self.create_inventory() + run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables) + + +class PosixCoverageHandler(CoverageHandler[PosixConfig]): + """Configure integration test code coverage for POSIX hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all POSIX hosts that will be created world writeable. + self.common_temp_path = f'/tmp/ansible-test-{generate_name()}' + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profiles = super().get_profiles() + profiles = [profile for profile in profiles if not isinstance(profile, ControllerProfile) or + profile.python.path != self.host_state.controller_profile.python.path] + + return profiles + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return True + + @property + def target_profile(self) -> t.Optional[PosixProfile]: + """The POSIX target profile, if it uses a different Python interpreter than the controller, otherwise None.""" + return t.cast(PosixProfile, self.profiles[0]) if self.profiles else None + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.setup_controller() + self.setup_target() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + self.teardown_controller() + self.teardown_target() + + def setup_controller(self) -> None: + """Perform setup for code coverage on the controller.""" + coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + + coverage_config = generate_coverage_config(self.args) + + write_text_file(coverage_config_path, coverage_config, create_directories=True) + + verified_chmod(coverage_config_path, MODE_FILE) + os.mkdir(coverage_output_path) + verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE) + + def setup_target(self) -> None: + """Perform setup for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + self.run_playbook('posix_coverage_setup.yml', self.get_playbook_variables()) + + def teardown_controller(self) -> None: + """Perform teardown for code coverage on the controller.""" + coverage_temp_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + platform = get_coverage_platform(self.args.controller) + + for filename in os.listdir(coverage_temp_path): + shutil.copyfile(os.path.join(coverage_temp_path, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + remove_tree(self.common_temp_path) + + def teardown_target(self) -> None: + """Perform teardown for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + profile = t.cast(SshTargetHostProfile, self.target_profile) + platform = get_coverage_platform(profile.config) + con = profile.get_controller_target_connections()[0] + + with tempfile.NamedTemporaryFile(prefix='ansible-test-coverage-', suffix='.tgz') as coverage_tgz: + try: + con.create_archive(chdir=self.common_temp_path, name=ResultType.COVERAGE.name, dst=coverage_tgz) + except SubprocessError as ex: + display.warning(f'Failed to download coverage results: {ex}') + else: + coverage_tgz.seek(0) + + with tempfile.TemporaryDirectory() as temp_dir: + local_con = LocalConnection(self.args) + local_con.extract_archive(chdir=temp_dir, src=coverage_tgz) + + base_dir = os.path.join(temp_dir, ResultType.COVERAGE.name) + + for filename in os.listdir(base_dir): + shutil.copyfile(os.path.join(base_dir, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + self.run_playbook('posix_coverage_teardown.yml', self.get_playbook_variables()) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Enable code coverage collection on Ansible modules (both local and remote). + # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage. + config_file = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The generated AnsiballZ wrapper is responsible for adding '=python-{X.Y}=coverage.{hostname}.{pid}.{id}' + coverage_file = os.path.join(self.common_temp_path, ResultType.COVERAGE.name, '='.join((self.args.command, target_name, 'platform'))) + + if self.args.coverage_check: + # cause the 'coverage' module to be found, but not imported or enabled + coverage_file = '' + + variables = dict( + _ANSIBLE_COVERAGE_CONFIG=config_file, + _ANSIBLE_COVERAGE_OUTPUT=coverage_file, + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_posix_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of POSIX coverage.""" + return dict( + common_temp_dir=self.common_temp_path, + coverage_config=generate_coverage_config(self.args), + coverage_config_path=os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME), + coverage_output_path=os.path.join(self.common_temp_path, ResultType.COVERAGE.name), + mode_directory=f'{MODE_DIRECTORY:04o}', + mode_directory_write=f'{MODE_DIRECTORY_WRITE:04o}', + mode_file=f'{MODE_FILE:04o}', + ) + + +class WindowsCoverageHandler(CoverageHandler[WindowsConfig]): + """Configure integration test code coverage for Windows hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all Windows hosts that will be created writable by everyone. + self.remote_temp_path = f'C:\\ansible_test_coverage_{generate_name()}' + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return bool(self.profiles) and not self.args.coverage_check + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.run_playbook('windows_coverage_setup.yml', self.get_playbook_variables()) + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + with tempfile.TemporaryDirectory() as local_temp_path: + variables = self.get_playbook_variables() + variables.update( + local_temp_path=local_temp_path, + ) + + self.run_playbook('windows_coverage_teardown.yml', variables) + + for filename in os.listdir(local_temp_path): + if all(isinstance(profile.config, WindowsRemoteConfig) for profile in self.profiles): + prefix = 'remote' + elif all(isinstance(profile.config, WindowsInventoryConfig) for profile in self.profiles): + prefix = 'inventory' + else: + raise NotImplementedError() + + platform = f'{prefix}-{sanitize_host_name(os.path.splitext(filename)[0])}' + + with zipfile.ZipFile(os.path.join(local_temp_path, filename)) as coverage_zip: + for item in coverage_zip.infolist(): + if item.is_dir(): + raise Exception(f'Unexpected directory in zip file: {item.filename}') + + item.filename = update_coverage_filename(item.filename, platform) + + coverage_zip.extract(item, ResultType.COVERAGE.path) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The remote is responsible for adding '={language-version}=coverage.{hostname}.{pid}.{id}' + coverage_name = '='.join((self.args.command, target_name, 'platform')) + + variables = dict( + _ANSIBLE_COVERAGE_REMOTE_OUTPUT=os.path.join(self.remote_temp_path, coverage_name), + _ANSIBLE_COVERAGE_REMOTE_PATH_FILTER=os.path.join(data_context().content.root, '*'), + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_windows_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of Windows coverage.""" + return dict( + remote_temp_path=self.remote_temp_path, + ) + + +class CoverageManager: + """Manager for code coverage configuration and state.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + + if self.args.coverage: + handler_types = set(get_handler_type(type(profile.config)) for profile in host_state.profiles) + handler_types.discard(None) + else: + handler_types = set() + + handlers = [handler_type(args=args, host_state=host_state, inventory_path=inventory_path) for handler_type in handler_types] + + self.handlers = [handler for handler in handlers if handler.is_active] + + def setup(self) -> None: + """Perform setup for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.setup() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.teardown() + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + if not self.args.coverage or 'non_local/' in aliases: + return {} + + env = {} + + for handler in self.handlers: + env.update(handler.get_environment(target_name, aliases)) + + return env + + +@cache +def get_config_handler_type_map() -> dict[t.Type[HostConfig], t.Type[CoverageHandler]]: + """Create and return a mapping of HostConfig types to CoverageHandler types.""" + return get_type_map(CoverageHandler, HostConfig) + + +def get_handler_type(config_type: t.Type[HostConfig]) -> t.Optional[t.Type[CoverageHandler]]: + """Return the coverage handler type associated with the given host config type if found, otherwise return None.""" + queue = [config_type] + type_map = get_config_handler_type_map() + + while queue: + config_type = queue.pop(0) + handler_type = type_map.get(config_type) + + if handler_type: + return handler_type + + queue.extend(config_type.__bases__) + + return None + + +def update_coverage_filename(original_filename: str, platform: str) -> str: + """Validate the given filename and insert the specified platform, then return the result.""" + parts = original_filename.split('=') + + if original_filename != os.path.basename(original_filename) or len(parts) != 5 or parts[2] != 'platform': + raise Exception(f'Unexpected coverage filename: {original_filename}') + + parts[2] = platform + + updated_filename = '='.join(parts) + + display.info(f'Coverage file for platform "{platform}": {original_filename} -> {updated_filename}', verbosity=3) + + return updated_filename diff --git a/test/lib/ansible_test/_internal/commands/integration/filters.py b/test/lib/ansible_test/_internal/commands/integration/filters.py new file mode 100644 index 0000000..be03d7f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/filters.py @@ -0,0 +1,279 @@ +"""Logic for filtering out integration test targets which are unsupported for the currently provided arguments and available hosts.""" +from __future__ import annotations + +import abc +import typing as t + +from ...config import ( + IntegrationConfig, +) + +from ...util import ( + cache, + detect_architecture, + display, + get_type_map, +) + +from ...target import ( + IntegrationTarget, +) + +from ...host_configs import ( + ControllerConfig, + DockerConfig, + FallbackReason, + HostConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixConfig, + PosixRemoteConfig, + PosixSshConfig, + RemoteConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ...host_profiles import ( + HostProfile, +) + +THostConfig = t.TypeVar('THostConfig', bound=HostConfig) +TPosixConfig = t.TypeVar('TPosixConfig', bound=PosixConfig) +TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig) +THostProfile = t.TypeVar('THostProfile', bound=HostProfile) + + +class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta): + """Base class for target filters.""" + def __init__(self, args: IntegrationConfig, configs: list[THostConfig], controller: bool) -> None: + self.args = args + self.configs = configs + self.controller = controller + self.host_type = 'controller' if controller else 'target' + + # values which are not host specific + self.include_targets = args.include + self.allow_root = args.allow_root + self.allow_destructive = args.allow_destructive + + @property + def config(self) -> THostConfig: + """The configuration to filter. Only valid when there is a single config.""" + if len(self.configs) != 1: + raise Exception() + + return self.configs[0] + + def skip( + self, + skip: str, + reason: str, + targets: list[IntegrationTarget], + exclude: set[str], + override: t.Optional[list[str]] = None, + ) -> None: + """Apply the specified skip rule to the given targets by updating the provided exclude list.""" + if skip.startswith('skip/'): + skipped = [target.name for target in targets if skip in target.skips and (not override or target.name not in override)] + else: + skipped = [target.name for target in targets if f'{skip}/' in target.aliases and (not override or target.name not in override)] + + self.apply_skip(f'"{skip}"', reason, skipped, exclude) + + def apply_skip(self, marked: str, reason: str, skipped: list[str], exclude: set[str]) -> None: + """Apply the provided skips to the given exclude list.""" + if not skipped: + return + + exclude.update(skipped) + display.warning(f'Excluding {self.host_type} tests marked {marked} {reason}: {", ".join(skipped)}') + + def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Filter the list of profiles, returning only those which are not skipped for the given target.""" + del target + return profiles + + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + if self.controller and self.args.host_settings.controller_fallback and targets: + affected_targets = [target.name for target in targets] + reason = self.args.host_settings.controller_fallback.reason + + if reason == FallbackReason.ENVIRONMENT: + exclude.update(affected_targets) + display.warning(f'Excluding {self.host_type} tests since a fallback controller is in use: {", ".join(affected_targets)}') + elif reason == FallbackReason.PYTHON: + display.warning(f'Some {self.host_type} tests may be redundant since a fallback python is in use: {", ".join(affected_targets)}') + + if not self.allow_destructive and not self.config.is_managed: + override_destructive = set(target for target in self.include_targets if target.startswith('destructive/')) + override = [target.name for target in targets if override_destructive & set(target.aliases)] + + self.skip('destructive', 'which require --allow-destructive or prefixing with "destructive/" to run on unmanaged hosts', targets, exclude, override) + + if not self.args.allow_disabled: + override_disabled = set(target for target in self.args.include if target.startswith('disabled/')) + override = [target.name for target in targets if override_disabled & set(target.aliases)] + + self.skip('disabled', 'which require --allow-disabled or prefixing with "disabled/"', targets, exclude, override) + + if not self.args.allow_unsupported: + override_unsupported = set(target for target in self.args.include if target.startswith('unsupported/')) + override = [target.name for target in targets if override_unsupported & set(target.aliases)] + + self.skip('unsupported', 'which require --allow-unsupported or prefixing with "unsupported/"', targets, exclude, override) + + if not self.args.allow_unstable: + override_unstable = set(target for target in self.args.include if target.startswith('unstable/')) + + if self.args.allow_unstable_changed: + override_unstable |= set(self.args.metadata.change_description.focused_targets or []) + + override = [target.name for target in targets if override_unstable & set(target.aliases)] + + self.skip('unstable', 'which require --allow-unstable or prefixing with "unstable/"', targets, exclude, override) + + +class PosixTargetFilter(TargetFilter[TPosixConfig]): + """Target filter for POSIX hosts.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + if not self.allow_root and not self.config.have_root: + self.skip('needs/root', 'which require --allow-root or running as root', targets, exclude) + + self.skip(f'skip/python{self.config.python.version}', f'which are not supported by Python {self.config.python.version}', targets, exclude) + self.skip(f'skip/python{self.config.python.major_version}', f'which are not supported by Python {self.config.python.major_version}', targets, exclude) + + +class DockerTargetFilter(PosixTargetFilter[DockerConfig]): + """Target filter for docker hosts.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + self.skip('skip/docker', 'which cannot run under docker', targets, exclude) + + if not self.config.privileged: + self.skip('needs/privileged', 'which require --docker-privileged to run under docker', targets, exclude) + + +class PosixSshTargetFilter(PosixTargetFilter[PosixSshConfig]): + """Target filter for POSIX SSH hosts.""" + + +class RemoteTargetFilter(TargetFilter[TRemoteConfig]): + """Target filter for remote Ansible Core CI managed hosts.""" + def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Filter the list of profiles, returning only those which are not skipped for the given target.""" + profiles = super().filter_profiles(profiles, target) + + skipped_profiles = [profile for profile in profiles if any(skip in target.skips for skip in get_remote_skip_aliases(profile.config))] + + if skipped_profiles: + configs: list[TRemoteConfig] = [profile.config for profile in skipped_profiles] + display.warning(f'Excluding skipped hosts from inventory: {", ".join(config.name for config in configs)}') + + profiles = [profile for profile in profiles if profile not in skipped_profiles] + + return profiles + + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + if len(self.configs) > 1: + host_skips = {host.name: get_remote_skip_aliases(host) for host in self.configs} + + # Skip only targets which skip all hosts. + # Targets that skip only some hosts will be handled during inventory generation. + skipped = [target.name for target in targets if all(any(skip in target.skips for skip in skips) for skips in host_skips.values())] + + if skipped: + exclude.update(skipped) + display.warning(f'Excluding tests which do not support {", ".join(host_skips.keys())}: {", ".join(skipped)}') + else: + skips = get_remote_skip_aliases(self.config) + + for skip, reason in skips.items(): + self.skip(skip, reason, targets, exclude) + + +class PosixRemoteTargetFilter(PosixTargetFilter[PosixRemoteConfig], RemoteTargetFilter[PosixRemoteConfig]): + """Target filter for POSIX remote hosts.""" + + +class WindowsRemoteTargetFilter(RemoteTargetFilter[WindowsRemoteConfig]): + """Target filter for remote Windows hosts.""" + + +class WindowsInventoryTargetFilter(TargetFilter[WindowsInventoryConfig]): + """Target filter for Windows inventory.""" + + +class NetworkRemoteTargetFilter(RemoteTargetFilter[NetworkRemoteConfig]): + """Target filter for remote network hosts.""" + + +class NetworkInventoryTargetFilter(TargetFilter[NetworkInventoryConfig]): + """Target filter for network inventory.""" + + +class OriginTargetFilter(PosixTargetFilter[OriginConfig]): + """Target filter for localhost.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + arch = detect_architecture(self.config.python.path) + + if arch: + self.skip(f'skip/{arch}', f'which are not supported by {arch}', targets, exclude) + + +@cache +def get_host_target_type_map() -> dict[t.Type[HostConfig], t.Type[TargetFilter]]: + """Create and return a mapping of HostConfig types to TargetFilter types.""" + return get_type_map(TargetFilter, HostConfig) + + +def get_target_filter(args: IntegrationConfig, configs: list[HostConfig], controller: bool) -> TargetFilter: + """Return an integration test target filter instance for the provided host configurations.""" + target_type = type(configs[0]) + + if issubclass(target_type, ControllerConfig): + target_type = type(args.controller) + configs = [args.controller] + + filter_type = get_host_target_type_map()[target_type] + filter_instance = filter_type(args, configs, controller) + + return filter_instance + + +def get_remote_skip_aliases(config: RemoteConfig) -> dict[str, str]: + """Return a dictionary of skip aliases and the reason why they apply.""" + return get_platform_skip_aliases(config.platform, config.version, config.arch) + + +def get_platform_skip_aliases(platform: str, version: str, arch: t.Optional[str]) -> dict[str, str]: + """Return a dictionary of skip aliases and the reason why they apply.""" + skips = { + f'skip/{platform}': platform, + f'skip/{platform}/{version}': f'{platform} {version}', + f'skip/{platform}{version}': f'{platform} {version}', # legacy syntax, use above format + } + + if arch: + skips.update({ + f'skip/{arch}': arch, + f'skip/{arch}/{platform}': f'{platform} on {arch}', + f'skip/{arch}/{platform}/{version}': f'{platform} {version} on {arch}', + }) + + skips = {alias: f'which are not supported by {description}' for alias, description in skips.items()} + + return skips diff --git a/test/lib/ansible_test/_internal/commands/integration/network.py b/test/lib/ansible_test/_internal/commands/integration/network.py new file mode 100644 index 0000000..d28416c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/network.py @@ -0,0 +1,77 @@ +"""Network integration testing.""" +from __future__ import annotations + +import os + +from ...util import ( + ApplicationError, + ANSIBLE_TEST_CONFIG_ROOT, +) + +from ...util_common import ( + handle_layout_messages, +) + +from ...target import ( + walk_network_integration_targets, +) + +from ...config import ( + NetworkIntegrationConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_absolute_path, + get_inventory_relative_path, + check_inventory, + delegate_inventory, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + NetworkInventoryConfig, + NetworkRemoteConfig, +) + + +def command_network_integration(args: NetworkIntegrationConfig) -> None: + """Entry point for the `network-integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template' + + if issubclass(args.target_type, NetworkInventoryConfig): + target = args.only_target(NetworkInventoryConfig) + inventory_path = get_inventory_absolute_path(args, target) + + if args.delegate or not target.path: + target.path = inventory_relative_path + else: + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + if args.no_temp_workdir: + # temporary solution to keep DCI tests working + inventory_exists = os.path.exists(inventory_path) + else: + inventory_exists = os.path.isfile(inventory_path) + + if not args.explain and not issubclass(args.target_type, NetworkRemoteConfig) and not inventory_exists: + raise ApplicationError( + 'Inventory not found: %s\n' + 'Use --inventory to specify the inventory path.\n' + 'Use --platform to provision resources and generate an inventory file.\n' + 'See also inventory template: %s' % (inventory_path, template_path) + ) + + check_inventory(args, inventory_path) + delegate_inventory(args, inventory_path) + + all_targets = tuple(walk_network_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path) diff --git a/test/lib/ansible_test/_internal/commands/integration/posix.py b/test/lib/ansible_test/_internal/commands/integration/posix.py new file mode 100644 index 0000000..d4c50d3 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/posix.py @@ -0,0 +1,48 @@ +"""POSIX integration testing.""" +from __future__ import annotations + +import os + +from ...util_common import ( + handle_layout_messages, +) + +from ...containers import ( + create_container_hooks, + local_ssh, + root_ssh, +) + +from ...target import ( + walk_posix_integration_targets, +) + +from ...config import ( + PosixIntegrationConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_relative_path, +) + +from ...data import ( + data_context, +) + + +def command_posix_integration(args: PosixIntegrationConfig) -> None: + """Entry point for the `integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + all_targets = tuple(walk_posix_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + control_connections = [local_ssh(args, host_state.controller_profile.python)] + managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()] + pre_target, post_target = create_container_hooks(args, control_connections, managed_connections) + + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target) diff --git a/test/lib/ansible_test/_internal/commands/integration/windows.py b/test/lib/ansible_test/_internal/commands/integration/windows.py new file mode 100644 index 0000000..aa201c4 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/windows.py @@ -0,0 +1,81 @@ +"""Windows integration testing.""" +from __future__ import annotations + +import os + +from ...util import ( + ApplicationError, + ANSIBLE_TEST_CONFIG_ROOT, +) + +from ...util_common import ( + handle_layout_messages, +) + +from ...containers import ( + create_container_hooks, + local_ssh, + root_ssh, +) + +from ...target import ( + walk_windows_integration_targets, +) + +from ...config import ( + WindowsIntegrationConfig, +) + +from ...host_configs import ( + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_absolute_path, + get_inventory_relative_path, + check_inventory, + delegate_inventory, +) + +from ...data import ( + data_context, +) + + +def command_windows_integration(args: WindowsIntegrationConfig) -> None: + """Entry point for the `windows-integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template' + + if issubclass(args.target_type, WindowsInventoryConfig): + target = args.only_target(WindowsInventoryConfig) + inventory_path = get_inventory_absolute_path(args, target) + + if args.delegate or not target.path: + target.path = inventory_relative_path + else: + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + if not args.explain and not issubclass(args.target_type, WindowsRemoteConfig) and not os.path.isfile(inventory_path): + raise ApplicationError( + 'Inventory not found: %s\n' + 'Use --inventory to specify the inventory path.\n' + 'Use --windows to provision resources and generate an inventory file.\n' + 'See also inventory template: %s' % (inventory_path, template_path) + ) + + check_inventory(args, inventory_path) + delegate_inventory(args, inventory_path) + + all_targets = tuple(walk_windows_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + control_connections = [local_ssh(args, host_state.controller_profile.python)] + managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()] + pre_target, post_target = create_container_hooks(args, control_connections, managed_connections) + + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target) |