summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/coverage_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/coverage_util.py')
-rw-r--r--test/lib/ansible_test/_internal/coverage_util.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/coverage_util.py b/test/lib/ansible_test/_internal/coverage_util.py
new file mode 100644
index 0000000..0f44505
--- /dev/null
+++ b/test/lib/ansible_test/_internal/coverage_util.py
@@ -0,0 +1,314 @@
+"""Utility code for facilitating collection of code coverage when running tests."""
+from __future__ import annotations
+
+import atexit
+import dataclasses
+import os
+import sqlite3
+import tempfile
+import typing as t
+
+from .config import (
+ IntegrationConfig,
+ SanityConfig,
+ TestConfig,
+)
+
+from .io import (
+ write_text_file,
+ make_dirs,
+ open_binary_file,
+)
+
+from .util import (
+ ApplicationError,
+ InternalError,
+ COVERAGE_CONFIG_NAME,
+ remove_tree,
+ sanitize_host_name,
+ str_to_version,
+)
+
+from .data import (
+ data_context,
+)
+
+from .util_common import (
+ intercept_python,
+ ResultType,
+)
+
+from .host_configs import (
+ DockerConfig,
+ HostConfig,
+ OriginConfig,
+ PosixRemoteConfig,
+ PosixSshConfig,
+ PythonConfig,
+)
+
+from .constants import (
+ SUPPORTED_PYTHON_VERSIONS,
+ CONTROLLER_PYTHON_VERSIONS,
+)
+
+from .thread import (
+ mutex,
+)
+
+
+@dataclasses.dataclass(frozen=True)
+class CoverageVersion:
+ """Details about a coverage version and its supported Python versions."""
+ coverage_version: str
+ schema_version: int
+ min_python: tuple[int, int]
+ max_python: tuple[int, int]
+
+
+COVERAGE_VERSIONS = (
+ # IMPORTANT: Keep this in sync with the ansible-test.txt requirements file.
+ CoverageVersion('6.5.0', 7, (3, 7), (3, 11)),
+ CoverageVersion('4.5.4', 0, (2, 6), (3, 6)),
+)
+"""
+This tuple specifies the coverage version to use for Python version ranges.
+"""
+
+CONTROLLER_COVERAGE_VERSION = COVERAGE_VERSIONS[0]
+"""The coverage version supported on the controller."""
+
+
+class CoverageError(ApplicationError):
+ """Exception caused while attempting to read a coverage file."""
+ def __init__(self, path: str, message: str) -> None:
+ self.path = path
+ self.message = message
+
+ super().__init__(f'Error reading coverage file "{os.path.relpath(path)}": {message}')
+
+
+def get_coverage_version(version: str) -> CoverageVersion:
+ """Return the coverage version to use with the specified Python version."""
+ python_version = str_to_version(version)
+ supported_versions = [entry for entry in COVERAGE_VERSIONS if entry.min_python <= python_version <= entry.max_python]
+
+ if not supported_versions:
+ raise InternalError(f'Python {version} has no matching entry in COVERAGE_VERSIONS.')
+
+ if len(supported_versions) > 1:
+ raise InternalError(f'Python {version} has multiple matching entries in COVERAGE_VERSIONS.')
+
+ coverage_version = supported_versions[0]
+
+ return coverage_version
+
+
+def get_coverage_file_schema_version(path: str) -> int:
+ """
+ Return the schema version from the specified coverage file.
+ SQLite based files report schema version 1 or later.
+ JSON based files are reported as schema version 0.
+ An exception is raised if the file is not recognized or the schema version cannot be determined.
+ """
+ with open_binary_file(path) as file_obj:
+ header = file_obj.read(16)
+
+ if header.startswith(b'!coverage.py: '):
+ return 0
+
+ if header.startswith(b'SQLite'):
+ return get_sqlite_schema_version(path)
+
+ raise CoverageError(path, f'Unknown header: {header!r}')
+
+
+def get_sqlite_schema_version(path: str) -> int:
+ """Return the schema version from a SQLite based coverage file."""
+ try:
+ with sqlite3.connect(path) as connection:
+ cursor = connection.cursor()
+ cursor.execute('select version from coverage_schema')
+ schema_version = cursor.fetchmany(1)[0][0]
+ except Exception as ex:
+ raise CoverageError(path, f'SQLite error: {ex}') from ex
+
+ if not isinstance(schema_version, int):
+ raise CoverageError(path, f'Schema version is {type(schema_version)} instead of {int}: {schema_version}')
+
+ if schema_version < 1:
+ raise CoverageError(path, f'Schema version is out-of-range: {schema_version}')
+
+ return schema_version
+
+
+def cover_python(
+ args: TestConfig,
+ python: PythonConfig,
+ cmd: list[str],
+ target_name: str,
+ env: dict[str, str],
+ capture: bool,
+ data: t.Optional[str] = None,
+ cwd: t.Optional[str] = None,
+) -> tuple[t.Optional[str], t.Optional[str]]:
+ """Run a command while collecting Python code coverage."""
+ if args.coverage:
+ env.update(get_coverage_environment(args, target_name, python.version))
+
+ return intercept_python(args, python, cmd, env, capture, data, cwd)
+
+
+def get_coverage_platform(config: HostConfig) -> str:
+ """Return the platform label for the given host config."""
+ if isinstance(config, PosixRemoteConfig):
+ platform = f'remote-{sanitize_host_name(config.name)}'
+ elif isinstance(config, DockerConfig):
+ platform = f'docker-{sanitize_host_name(config.name)}'
+ elif isinstance(config, PosixSshConfig):
+ platform = f'ssh-{sanitize_host_name(config.host)}'
+ elif isinstance(config, OriginConfig):
+ platform = 'origin' # previous versions of ansible-test used "local-{python_version}"
+ else:
+ raise NotImplementedError(f'Coverage platform label not defined for type: {type(config)}')
+
+ return platform
+
+
+def get_coverage_environment(
+ args: TestConfig,
+ target_name: str,
+ version: str,
+) -> dict[str, str]:
+ """Return environment variables needed to collect code coverage."""
+ # unit tests, sanity tests and other special cases (localhost only)
+ # config is in a temporary directory
+ # results are in the source tree
+ config_file = get_coverage_config(args)
+ coverage_name = '='.join((args.command, target_name, get_coverage_platform(args.controller), f'python-{version}', 'coverage'))
+ coverage_dir = os.path.join(data_context().content.root, data_context().content.results_path, ResultType.COVERAGE.name)
+ coverage_file = os.path.join(coverage_dir, coverage_name)
+
+ make_dirs(coverage_dir)
+
+ if args.coverage_check:
+ # cause the 'coverage' module to be found, but not imported or enabled
+ coverage_file = ''
+
+ # Enable code coverage collection on local Python programs (this does not include Ansible modules).
+ # Used by the injectors to support code coverage.
+ # Used by the pytest unit test plugin to support code coverage.
+ # The COVERAGE_FILE variable is also used directly by the 'coverage' module.
+ env = dict(
+ COVERAGE_CONF=config_file,
+ COVERAGE_FILE=coverage_file,
+ )
+
+ return env
+
+
+@mutex
+def get_coverage_config(args: TestConfig) -> str:
+ """Return the path to the coverage config, creating the config if it does not already exist."""
+ try:
+ return get_coverage_config.path # type: ignore[attr-defined]
+ except AttributeError:
+ pass
+
+ coverage_config = generate_coverage_config(args)
+
+ if args.explain:
+ temp_dir = '/tmp/coverage-temp-dir'
+ else:
+ temp_dir = tempfile.mkdtemp()
+ atexit.register(lambda: remove_tree(temp_dir))
+
+ path = os.path.join(temp_dir, COVERAGE_CONFIG_NAME)
+
+ if not args.explain:
+ write_text_file(path, coverage_config)
+
+ get_coverage_config.path = path # type: ignore[attr-defined]
+
+ return path
+
+
+def generate_coverage_config(args: TestConfig) -> str:
+ """Generate code coverage configuration for tests."""
+ if data_context().content.collection:
+ coverage_config = generate_collection_coverage_config(args)
+ else:
+ coverage_config = generate_ansible_coverage_config()
+
+ return coverage_config
+
+
+def generate_ansible_coverage_config() -> str:
+ """Generate code coverage configuration for Ansible tests."""
+ coverage_config = '''
+[run]
+branch = True
+concurrency = multiprocessing
+parallel = True
+
+omit =
+ */python*/dist-packages/*
+ */python*/site-packages/*
+ */python*/distutils/*
+ */pyshared/*
+ */pytest
+ */AnsiballZ_*.py
+ */test/results/*
+'''
+
+ return coverage_config
+
+
+def generate_collection_coverage_config(args: TestConfig) -> str:
+ """Generate code coverage configuration for Ansible Collection tests."""
+ coverage_config = '''
+[run]
+branch = True
+concurrency = multiprocessing
+parallel = True
+disable_warnings =
+ no-data-collected
+'''
+
+ if isinstance(args, IntegrationConfig):
+ coverage_config += '''
+include =
+ %s/*
+ */%s/*
+''' % (data_context().content.root, data_context().content.collection.directory)
+ elif isinstance(args, SanityConfig):
+ # temporary work-around for import sanity test
+ coverage_config += '''
+include =
+ %s/*
+
+omit =
+ %s/*
+''' % (data_context().content.root, os.path.join(data_context().content.root, data_context().content.results_path))
+ else:
+ coverage_config += '''
+include =
+ %s/*
+''' % data_context().content.root
+
+ return coverage_config
+
+
+def self_check() -> None:
+ """Check for internal errors due to incorrect code changes."""
+ # Verify all supported Python versions have a coverage version.
+ for version in SUPPORTED_PYTHON_VERSIONS:
+ get_coverage_version(version)
+
+ # Verify all controller Python versions are mapped to the latest coverage version.
+ for version in CONTROLLER_PYTHON_VERSIONS:
+ if get_coverage_version(version) != CONTROLLER_COVERAGE_VERSION:
+ raise InternalError(f'Controller Python version {version} is not mapped to the latest coverage version.')
+
+
+self_check()