summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/coverage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/coverage/__init__.py')
-rw-r--r--test/lib/ansible_test/_internal/commands/coverage/__init__.py370
1 files changed, 370 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/coverage/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/__init__.py
new file mode 100644
index 0000000..139cf3c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/coverage/__init__.py
@@ -0,0 +1,370 @@
+"""Common logic for the coverage subcommand."""
+from __future__ import annotations
+
+import collections.abc as c
+import json
+import os
+import re
+import typing as t
+
+from ...encoding import (
+ to_bytes,
+)
+
+from ...io import (
+ read_text_file,
+ read_json_file,
+)
+
+from ...util import (
+ ApplicationError,
+ common_environment,
+ display,
+ ANSIBLE_TEST_DATA_ROOT,
+)
+
+from ...util_common import (
+ intercept_python,
+ ResultType,
+)
+
+from ...config import (
+ EnvironmentConfig,
+)
+
+from ...python_requirements import (
+ install_requirements,
+)
+
+from ...target import (
+ walk_module_targets,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ...pypi_proxy import (
+ configure_pypi_proxy,
+)
+
+from ...provisioning import (
+ HostState,
+)
+
+from ...coverage_util import (
+ get_coverage_file_schema_version,
+ CoverageError,
+ CONTROLLER_COVERAGE_VERSION,
+)
+
+if t.TYPE_CHECKING:
+ import coverage as coverage_module
+
+COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
+COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
+COVERAGE_OUTPUT_FILE_NAME = 'coverage'
+
+
+class CoverageConfig(EnvironmentConfig):
+ """Configuration for the coverage command."""
+ def __init__(self, args: t.Any) -> None:
+ super().__init__(args, 'coverage')
+
+
+def initialize_coverage(args: CoverageConfig, host_state: HostState) -> coverage_module:
+ """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available."""
+ configure_pypi_proxy(args, host_state.controller_profile) # coverage
+ install_requirements(args, host_state.controller_profile.python, coverage=True) # coverage
+
+ try:
+ import coverage
+ except ImportError:
+ coverage = None
+
+ coverage_required_version = CONTROLLER_COVERAGE_VERSION.coverage_version
+
+ if not coverage:
+ raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module must be installed to use this command.')
+
+ if coverage.__version__ != coverage_required_version:
+ raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module is required. Version {coverage.__version__} was found.')
+
+ return coverage
+
+
+def run_coverage(args: CoverageConfig, host_state: HostState, output_file: str, command: str, cmd: list[str]) -> None:
+ """Run the coverage cli tool with the specified options."""
+ env = common_environment()
+ env.update(dict(COVERAGE_FILE=output_file))
+
+ cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
+
+ stdout, stderr = intercept_python(args, host_state.controller_profile.python, cmd, env, capture=True)
+
+ stdout = (stdout or '').strip()
+ stderr = (stderr or '').strip()
+
+ if stdout:
+ display.info(stdout)
+
+ if stderr:
+ display.warning(stderr)
+
+
+def get_all_coverage_files() -> list[str]:
+ """Return a list of all coverage file paths."""
+ return get_python_coverage_files() + get_powershell_coverage_files()
+
+
+def get_python_coverage_files(path: t.Optional[str] = None) -> list[str]:
+ """Return the list of Python coverage file paths."""
+ return get_coverage_files('python', path)
+
+
+def get_powershell_coverage_files(path: t.Optional[str] = None) -> list[str]:
+ """Return the list of PowerShell coverage file paths."""
+ return get_coverage_files('powershell', path)
+
+
+def get_coverage_files(language: str, path: t.Optional[str] = None) -> list[str]:
+ """Return the list of coverage file paths for the given language."""
+ coverage_dir = path or ResultType.COVERAGE.path
+
+ try:
+ coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
+ if '=coverage.' in f and '=%s' % language in f]
+ except FileNotFoundError:
+ return []
+
+ return coverage_files
+
+
+def get_collection_path_regexes() -> tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]:
+ """Return a pair of regexes used for identifying and manipulating collection paths."""
+ if data_context().content.collection:
+ collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
+ collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
+ else:
+ collection_search_re = None
+ collection_sub_re = None
+
+ return collection_search_re, collection_sub_re
+
+
+def get_python_modules() -> dict[str, str]:
+ """Return a dictionary of Ansible module names and their paths."""
+ return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py'))
+
+
+def enumerate_python_arcs(
+ path: str,
+ coverage: coverage_module,
+ modules: dict[str, str],
+ collection_search_re: t.Optional[t.Pattern],
+ collection_sub_re: t.Optional[t.Pattern],
+) -> c.Generator[tuple[str, set[tuple[int, int]]], None, None]:
+ """Enumerate Python code coverage arcs in the given file."""
+ if os.path.getsize(path) == 0:
+ display.warning('Empty coverage file: %s' % path, verbosity=2)
+ return
+
+ try:
+ arc_data = read_python_coverage(path, coverage)
+ except CoverageError as ex:
+ display.error(str(ex))
+ return
+
+ for filename, arcs in arc_data.items():
+ if not arcs:
+ # This is most likely due to using an unsupported version of coverage.
+ display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path))
+ continue
+
+ filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
+
+ if not filename:
+ continue
+
+ yield filename, set(arcs)
+
+
+PythonArcs = dict[str, list[tuple[int, int]]]
+"""Python coverage arcs."""
+
+
+def read_python_coverage(path: str, coverage: coverage_module) -> PythonArcs:
+ """Return coverage arcs from the specified coverage file. Raises a CoverageError exception if coverage cannot be read."""
+ try:
+ return read_python_coverage_native(path, coverage)
+ except CoverageError as ex:
+ schema_version = get_coverage_file_schema_version(path)
+
+ if schema_version == CONTROLLER_COVERAGE_VERSION.schema_version:
+ raise CoverageError(path, f'Unexpected failure reading supported schema version {schema_version}.') from ex
+
+ if schema_version == 0:
+ return read_python_coverage_legacy(path)
+
+ raise CoverageError(path, f'Unsupported schema version: {schema_version}')
+
+
+def read_python_coverage_native(path: str, coverage: coverage_module) -> PythonArcs:
+ """Return coverage arcs from the specified coverage file using the coverage API."""
+ try:
+ data = coverage.CoverageData(path)
+ data.read()
+ arcs = {filename: data.arcs(filename) for filename in data.measured_files()}
+ except Exception as ex:
+ raise CoverageError(path, f'Error reading coverage file using coverage API: {ex}') from ex
+
+ return arcs
+
+
+def read_python_coverage_legacy(path: str) -> PythonArcs:
+ """Return coverage arcs from the specified coverage file, which must be in the legacy JSON format."""
+ try:
+ contents = read_text_file(path)
+ contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents)
+ data = json.loads(contents)
+ arcs: PythonArcs = {filename: [t.cast(tuple[int, int], tuple(arc)) for arc in arc_list] for filename, arc_list in data['arcs'].items()}
+ except Exception as ex:
+ raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex
+
+ return arcs
+
+
+def enumerate_powershell_lines(
+ path: str,
+ collection_search_re: t.Optional[t.Pattern],
+ collection_sub_re: t.Optional[t.Pattern],
+) -> c.Generator[tuple[str, dict[int, int]], None, None]:
+ """Enumerate PowerShell code coverage lines in the given file."""
+ if os.path.getsize(path) == 0:
+ display.warning('Empty coverage file: %s' % path, verbosity=2)
+ return
+
+ try:
+ coverage_run = read_json_file(path)
+ except Exception as ex: # pylint: disable=locally-disabled, broad-except
+ display.error('%s' % ex)
+ return
+
+ for filename, hits in coverage_run.items():
+ filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
+
+ if not filename:
+ continue
+
+ if isinstance(hits, dict) and not hits.get('Line'):
+ # Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage.
+ # This format differs from the more verbose format of raw coverage data from the remote Windows hosts.
+ hits = dict((int(key), value) for key, value in hits.items())
+
+ yield filename, hits
+ continue
+
+ # PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that
+ if not isinstance(hits, list):
+ hits = [hits]
+
+ hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit)
+
+ yield filename, hits
+
+
+def sanitize_filename(
+ filename: str,
+ modules: t.Optional[dict[str, str]] = None,
+ collection_search_re: t.Optional[t.Pattern] = None,
+ collection_sub_re: t.Optional[t.Pattern] = None,
+) -> t.Optional[str]:
+ """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
+ ansible_path = os.path.abspath('lib/ansible/') + '/'
+ root_path = data_context().content.root + '/'
+ integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
+
+ if modules is None:
+ modules = {}
+
+ if '/ansible_modlib.zip/ansible/' in filename:
+ # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
+ new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif collection_search_re and collection_search_re.search(filename):
+ new_name = os.path.abspath(collection_sub_re.sub('', filename))
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
+ # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
+ new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif '/ansible_module_' in filename:
+ # Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
+ module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
+ if module_name not in modules:
+ display.warning('Skipping coverage of unknown module: %s' % module_name)
+ return None
+ new_name = os.path.abspath(modules[module_name])
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
+ # Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
+ # AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
+ # AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
+ module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
+ '\\g<module>', filename).rstrip('_')
+ if module_name not in modules:
+ display.warning('Skipping coverage of unknown module: %s' % module_name)
+ return None
+ new_name = os.path.abspath(modules[module_name])
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif re.search('^(/.*?)?/root/ansible/', filename):
+ # Rewrite the path of code running on a remote host or in a docker container as root.
+ new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+ elif integration_temp_path in filename:
+ # Rewrite the path of code running from an integration test temporary directory.
+ new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
+ display.info('%s -> %s' % (filename, new_name), verbosity=3)
+ filename = new_name
+
+ filename = os.path.abspath(filename) # make sure path is absolute (will be relative if previously exported)
+
+ return filename
+
+
+class PathChecker:
+ """Checks code coverage paths to verify they are valid and reports on the findings."""
+ def __init__(self, args: CoverageConfig, collection_search_re: t.Optional[t.Pattern] = None) -> None:
+ self.args = args
+ self.collection_search_re = collection_search_re
+ self.invalid_paths: list[str] = []
+ self.invalid_path_chars = 0
+
+ def check_path(self, path: str) -> bool:
+ """Return True if the given coverage path is valid, otherwise display a warning and return False."""
+ if os.path.isfile(to_bytes(path)):
+ return True
+
+ if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py':
+ # the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk
+ # coverage is still reported for these non-existent files, but warnings are not needed
+ return False
+
+ self.invalid_paths.append(path)
+ self.invalid_path_chars += len(path)
+
+ if self.args.verbosity > 1:
+ display.warning('Invalid coverage path: %s' % path)
+
+ return False
+
+ def report(self) -> None:
+ """Display a warning regarding invalid paths if any were found."""
+ if self.invalid_paths:
+ display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths)))