diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/lib/ansible_test/_internal/commands/integration/coverage.py | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream.tar.xz ansible-core-upstream.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/integration/coverage.py')
-rw-r--r-- | test/lib/ansible_test/_internal/commands/integration/coverage.py | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/integration/coverage.py b/test/lib/ansible_test/_internal/commands/integration/coverage.py new file mode 100644 index 0000000..5a486e9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/coverage.py @@ -0,0 +1,417 @@ +"""Code coverage support for integration tests.""" +from __future__ import annotations + +import abc +import os +import shutil +import tempfile +import typing as t +import zipfile + +from ...io import ( + write_text_file, +) + +from ...ansible_util import ( + run_playbook, +) + +from ...config import ( + IntegrationConfig, +) + +from ...util import ( + COVERAGE_CONFIG_NAME, + MODE_DIRECTORY, + MODE_DIRECTORY_WRITE, + MODE_FILE, + SubprocessError, + cache, + display, + generate_name, + get_generic_type, + get_type_map, + remove_tree, + sanitize_host_name, + verified_chmod, +) + +from ...util_common import ( + ResultType, +) + +from ...coverage_util import ( + generate_coverage_config, + get_coverage_platform, +) + +from ...host_configs import ( + HostConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ...data import ( + data_context, +) + +from ...host_profiles import ( + ControllerProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, +) + +from ...connections import ( + LocalConnection, +) + +from ...inventory import ( + create_windows_inventory, + create_posix_inventory, +) + +THostConfig = t.TypeVar('THostConfig', bound=HostConfig) + + +class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta): + """Base class for configuring hosts for integration test code coverage.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + self.profiles = self.get_profiles() + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profile_type = get_generic_type(type(self), HostConfig) + profiles = [profile for profile in self.host_state.target_profiles if isinstance(profile.config, profile_type)] + + return profiles + + @property + @abc.abstractmethod + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + + @abc.abstractmethod + def setup(self) -> None: + """Perform setup for code coverage.""" + + @abc.abstractmethod + def teardown(self) -> None: + """Perform teardown for code coverage.""" + + @abc.abstractmethod + def create_inventory(self) -> None: + """Create inventory, if needed.""" + + @abc.abstractmethod + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + def run_playbook(self, playbook: str, variables: dict[str, str]) -> None: + """Run the specified playbook using the current inventory.""" + self.create_inventory() + run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables) + + +class PosixCoverageHandler(CoverageHandler[PosixConfig]): + """Configure integration test code coverage for POSIX hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all POSIX hosts that will be created world writeable. + self.common_temp_path = f'/tmp/ansible-test-{generate_name()}' + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profiles = super().get_profiles() + profiles = [profile for profile in profiles if not isinstance(profile, ControllerProfile) or + profile.python.path != self.host_state.controller_profile.python.path] + + return profiles + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return True + + @property + def target_profile(self) -> t.Optional[PosixProfile]: + """The POSIX target profile, if it uses a different Python interpreter than the controller, otherwise None.""" + return t.cast(PosixProfile, self.profiles[0]) if self.profiles else None + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.setup_controller() + self.setup_target() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + self.teardown_controller() + self.teardown_target() + + def setup_controller(self) -> None: + """Perform setup for code coverage on the controller.""" + coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + + coverage_config = generate_coverage_config(self.args) + + write_text_file(coverage_config_path, coverage_config, create_directories=True) + + verified_chmod(coverage_config_path, MODE_FILE) + os.mkdir(coverage_output_path) + verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE) + + def setup_target(self) -> None: + """Perform setup for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + self.run_playbook('posix_coverage_setup.yml', self.get_playbook_variables()) + + def teardown_controller(self) -> None: + """Perform teardown for code coverage on the controller.""" + coverage_temp_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + platform = get_coverage_platform(self.args.controller) + + for filename in os.listdir(coverage_temp_path): + shutil.copyfile(os.path.join(coverage_temp_path, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + remove_tree(self.common_temp_path) + + def teardown_target(self) -> None: + """Perform teardown for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + profile = t.cast(SshTargetHostProfile, self.target_profile) + platform = get_coverage_platform(profile.config) + con = profile.get_controller_target_connections()[0] + + with tempfile.NamedTemporaryFile(prefix='ansible-test-coverage-', suffix='.tgz') as coverage_tgz: + try: + con.create_archive(chdir=self.common_temp_path, name=ResultType.COVERAGE.name, dst=coverage_tgz) + except SubprocessError as ex: + display.warning(f'Failed to download coverage results: {ex}') + else: + coverage_tgz.seek(0) + + with tempfile.TemporaryDirectory() as temp_dir: + local_con = LocalConnection(self.args) + local_con.extract_archive(chdir=temp_dir, src=coverage_tgz) + + base_dir = os.path.join(temp_dir, ResultType.COVERAGE.name) + + for filename in os.listdir(base_dir): + shutil.copyfile(os.path.join(base_dir, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + self.run_playbook('posix_coverage_teardown.yml', self.get_playbook_variables()) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Enable code coverage collection on Ansible modules (both local and remote). + # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage. + config_file = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The generated AnsiballZ wrapper is responsible for adding '=python-{X.Y}=coverage.{hostname}.{pid}.{id}' + coverage_file = os.path.join(self.common_temp_path, ResultType.COVERAGE.name, '='.join((self.args.command, target_name, 'platform'))) + + if self.args.coverage_check: + # cause the 'coverage' module to be found, but not imported or enabled + coverage_file = '' + + variables = dict( + _ANSIBLE_COVERAGE_CONFIG=config_file, + _ANSIBLE_COVERAGE_OUTPUT=coverage_file, + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_posix_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of POSIX coverage.""" + return dict( + common_temp_dir=self.common_temp_path, + coverage_config=generate_coverage_config(self.args), + coverage_config_path=os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME), + coverage_output_path=os.path.join(self.common_temp_path, ResultType.COVERAGE.name), + mode_directory=f'{MODE_DIRECTORY:04o}', + mode_directory_write=f'{MODE_DIRECTORY_WRITE:04o}', + mode_file=f'{MODE_FILE:04o}', + ) + + +class WindowsCoverageHandler(CoverageHandler[WindowsConfig]): + """Configure integration test code coverage for Windows hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all Windows hosts that will be created writable by everyone. + self.remote_temp_path = f'C:\\ansible_test_coverage_{generate_name()}' + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return bool(self.profiles) and not self.args.coverage_check + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.run_playbook('windows_coverage_setup.yml', self.get_playbook_variables()) + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + with tempfile.TemporaryDirectory() as local_temp_path: + variables = self.get_playbook_variables() + variables.update( + local_temp_path=local_temp_path, + ) + + self.run_playbook('windows_coverage_teardown.yml', variables) + + for filename in os.listdir(local_temp_path): + if all(isinstance(profile.config, WindowsRemoteConfig) for profile in self.profiles): + prefix = 'remote' + elif all(isinstance(profile.config, WindowsInventoryConfig) for profile in self.profiles): + prefix = 'inventory' + else: + raise NotImplementedError() + + platform = f'{prefix}-{sanitize_host_name(os.path.splitext(filename)[0])}' + + with zipfile.ZipFile(os.path.join(local_temp_path, filename)) as coverage_zip: + for item in coverage_zip.infolist(): + if item.is_dir(): + raise Exception(f'Unexpected directory in zip file: {item.filename}') + + item.filename = update_coverage_filename(item.filename, platform) + + coverage_zip.extract(item, ResultType.COVERAGE.path) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The remote is responsible for adding '={language-version}=coverage.{hostname}.{pid}.{id}' + coverage_name = '='.join((self.args.command, target_name, 'platform')) + + variables = dict( + _ANSIBLE_COVERAGE_REMOTE_OUTPUT=os.path.join(self.remote_temp_path, coverage_name), + _ANSIBLE_COVERAGE_REMOTE_PATH_FILTER=os.path.join(data_context().content.root, '*'), + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_windows_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of Windows coverage.""" + return dict( + remote_temp_path=self.remote_temp_path, + ) + + +class CoverageManager: + """Manager for code coverage configuration and state.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + + if self.args.coverage: + handler_types = set(get_handler_type(type(profile.config)) for profile in host_state.profiles) + handler_types.discard(None) + else: + handler_types = set() + + handlers = [handler_type(args=args, host_state=host_state, inventory_path=inventory_path) for handler_type in handler_types] + + self.handlers = [handler for handler in handlers if handler.is_active] + + def setup(self) -> None: + """Perform setup for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.setup() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.teardown() + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + if not self.args.coverage or 'non_local/' in aliases: + return {} + + env = {} + + for handler in self.handlers: + env.update(handler.get_environment(target_name, aliases)) + + return env + + +@cache +def get_config_handler_type_map() -> dict[t.Type[HostConfig], t.Type[CoverageHandler]]: + """Create and return a mapping of HostConfig types to CoverageHandler types.""" + return get_type_map(CoverageHandler, HostConfig) + + +def get_handler_type(config_type: t.Type[HostConfig]) -> t.Optional[t.Type[CoverageHandler]]: + """Return the coverage handler type associated with the given host config type if found, otherwise return None.""" + queue = [config_type] + type_map = get_config_handler_type_map() + + while queue: + config_type = queue.pop(0) + handler_type = type_map.get(config_type) + + if handler_type: + return handler_type + + queue.extend(config_type.__bases__) + + return None + + +def update_coverage_filename(original_filename: str, platform: str) -> str: + """Validate the given filename and insert the specified platform, then return the result.""" + parts = original_filename.split('=') + + if original_filename != os.path.basename(original_filename) or len(parts) != 5 or parts[2] != 'platform': + raise Exception(f'Unexpected coverage filename: {original_filename}') + + parts[2] = platform + + updated_filename = '='.join(parts) + + display.info(f'Coverage file for platform "{platform}": {original_filename} -> {updated_filename}', verbosity=3) + + return updated_filename |