summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/coverage/combine.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/coverage/combine.py')
-rw-r--r--test/lib/ansible_test/_internal/commands/coverage/combine.py362
1 files changed, 362 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/coverage/combine.py b/test/lib/ansible_test/_internal/commands/coverage/combine.py
new file mode 100644
index 0000000..66210c7
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/coverage/combine.py
@@ -0,0 +1,362 @@
+"""Combine code coverage files."""
+from __future__ import annotations
+
+import collections.abc as c
+import os
+import json
+import typing as t
+
+from ...target import (
+ walk_compile_targets,
+ walk_powershell_targets,
+)
+
+from ...io import (
+ read_text_file,
+)
+
+from ...util import (
+ ANSIBLE_TEST_TOOLS_ROOT,
+ display,
+ ApplicationError,
+ raw_command,
+)
+
+from ...util_common import (
+ ResultType,
+ write_json_file,
+ write_json_test_results,
+)
+
+from ...executor import (
+ Delegate,
+)
+
+from ...data import (
+ data_context,
+)
+
+from ...host_configs import (
+ DockerConfig,
+ RemoteConfig,
+)
+
+from ...provisioning import (
+ HostState,
+ prepare_profiles,
+)
+
+from . import (
+ enumerate_python_arcs,
+ enumerate_powershell_lines,
+ get_collection_path_regexes,
+ get_all_coverage_files,
+ get_python_coverage_files,
+ get_python_modules,
+ get_powershell_coverage_files,
+ initialize_coverage,
+ COVERAGE_OUTPUT_FILE_NAME,
+ COVERAGE_GROUPS,
+ CoverageConfig,
+ PathChecker,
+)
+
+TValue = t.TypeVar('TValue')
+
+
+def command_coverage_combine(args: CoverageCombineConfig) -> None:
+ """Patch paths in coverage files and merge into a single file."""
+ host_state = prepare_profiles(args) # coverage combine
+ combine_coverage_files(args, host_state)
+
+
+def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
+ """Combine coverage and return a list of the resulting files."""
+ if args.delegate:
+ if isinstance(args.controller, (DockerConfig, RemoteConfig)):
+ paths = get_all_coverage_files()
+ exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']]
+
+ if not exported_paths:
+ raise ExportedCoverageDataNotFound()
+
+ pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths]
+
+ def coverage_callback(files: list[tuple[str, str]]) -> None:
+ """Add the coverage files to the payload file list."""
+ display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1)
+ files.extend(pairs)
+
+ data_context().register_payload_callback(coverage_callback)
+
+ raise Delegate(host_state=host_state)
+
+ paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state)
+
+ for path in paths:
+ display.info('Generated combined output: %s' % path, verbosity=1)
+
+ return paths
+
+
+class ExportedCoverageDataNotFound(ApplicationError):
+ """Exception when no combined coverage data is present yet is required."""
+ def __init__(self) -> None:
+ super().__init__(
+ 'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
+ 'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'
+ 'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path)
+
+
+def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
+ """Combine Python coverage files and return a list of the output files."""
+ coverage = initialize_coverage(args, host_state)
+
+ modules = get_python_modules()
+
+ coverage_files = get_python_coverage_files()
+
+ def _default_stub_value(source_paths: list[str]) -> dict[str, set[tuple[int, int]]]:
+ return {path: set() for path in source_paths}
+
+ counter = 0
+ sources = _get_coverage_targets(args, walk_compile_targets)
+ groups = _build_stub_groups(args, sources, _default_stub_value)
+
+ collection_search_re, collection_sub_re = get_collection_path_regexes()
+
+ for coverage_file in coverage_files:
+ counter += 1
+ display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
+
+ group = get_coverage_group(args, coverage_file)
+
+ if group is None:
+ display.warning('Unexpected name for coverage file: %s' % coverage_file)
+ continue
+
+ for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
+ if args.export:
+ filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems
+
+ if group not in groups:
+ groups[group] = {}
+
+ arc_data = groups[group]
+
+ if filename not in arc_data:
+ arc_data[filename] = set()
+
+ arc_data[filename].update(arcs)
+
+ output_files = []
+
+ if args.export:
+ coverage_file = os.path.join(args.export, '')
+ suffix = '=coverage.combined'
+ else:
+ coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
+ suffix = ''
+
+ path_checker = PathChecker(args, collection_search_re)
+
+ for group in sorted(groups):
+ arc_data = groups[group]
+ output_file = coverage_file + group + suffix
+
+ if args.explain:
+ continue
+
+ updated = coverage.CoverageData(output_file)
+
+ for filename in arc_data:
+ if not path_checker.check_path(filename):
+ continue
+
+ updated.add_arcs({filename: list(arc_data[filename])})
+
+ if args.all:
+ updated.add_arcs(dict((source[0], []) for source in sources))
+
+ updated.write() # always write files to make sure stale files do not exist
+
+ if updated:
+ # only report files which are non-empty to prevent coverage from reporting errors
+ output_files.append(output_file)
+
+ path_checker.report()
+
+ return sorted(output_files)
+
+
+def _command_coverage_combine_powershell(args: CoverageCombineConfig) -> list[str]:
+ """Combine PowerShell coverage files and return a list of the output files."""
+ coverage_files = get_powershell_coverage_files()
+
+ def _default_stub_value(source_paths: list[str]) -> dict[str, dict[int, int]]:
+ cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')]
+ cmd.extend(source_paths)
+
+ stubs = json.loads(raw_command(cmd, capture=True)[0])
+
+ return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs)
+
+ counter = 0
+ sources = _get_coverage_targets(args, walk_powershell_targets)
+ groups = _build_stub_groups(args, sources, _default_stub_value)
+
+ collection_search_re, collection_sub_re = get_collection_path_regexes()
+
+ for coverage_file in coverage_files:
+ counter += 1
+ display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
+
+ group = get_coverage_group(args, coverage_file)
+
+ if group is None:
+ display.warning('Unexpected name for coverage file: %s' % coverage_file)
+ continue
+
+ for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re):
+ if args.export:
+ filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems
+
+ if group not in groups:
+ groups[group] = {}
+
+ coverage_data = groups[group]
+
+ if filename not in coverage_data:
+ coverage_data[filename] = {}
+
+ file_coverage = coverage_data[filename]
+
+ for line_no, hit_count in hits.items():
+ file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count
+
+ output_files = []
+
+ path_checker = PathChecker(args)
+
+ for group in sorted(groups):
+ coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))
+
+ if args.all:
+ missing_sources = [source for source, _source_line_count in sources if source not in coverage_data]
+ coverage_data.update(_default_stub_value(missing_sources))
+
+ if not args.explain:
+ if args.export:
+ output_file = os.path.join(args.export, group + '=coverage.combined')
+ write_json_file(output_file, coverage_data, formatted=False)
+ output_files.append(output_file)
+ continue
+
+ output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell'
+
+ write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False)
+
+ output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))
+
+ path_checker.report()
+
+ return sorted(output_files)
+
+
+def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) -> list[tuple[str, int]]:
+ """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files."""
+ sources = []
+
+ if args.all or args.stub:
+ # excludes symlinks of regular files to avoid reporting on the same file multiple times
+ # in the future it would be nice to merge any coverage for symlinks into the real files
+ for target in walk_func(include_symlinks=False):
+ target_path = os.path.abspath(target.path)
+
+ target_lines = len(read_text_file(target_path).splitlines())
+
+ sources.append((target_path, target_lines))
+
+ sources.sort()
+
+ return sources
+
+
+def _build_stub_groups(
+ args: CoverageCombineConfig,
+ sources: list[tuple[str, int]],
+ default_stub_value: c.Callable[[list[str]], dict[str, TValue]],
+) -> dict[str, dict[str, TValue]]:
+ """
+ Split the given list of sources with line counts into groups, maintaining a maximum line count for each group.
+ Each group consists of a dictionary of sources and default coverage stubs generated by the provided default_stub_value function.
+ """
+ groups = {}
+
+ if args.stub:
+ stub_group: list[str] = []
+ stub_groups = [stub_group]
+ stub_line_limit = 500000
+ stub_line_count = 0
+
+ for source, source_line_count in sources:
+ stub_group.append(source)
+ stub_line_count += source_line_count
+
+ if stub_line_count > stub_line_limit:
+ stub_line_count = 0
+ stub_group = []
+ stub_groups.append(stub_group)
+
+ for stub_index, stub_group in enumerate(stub_groups):
+ if not stub_group:
+ continue
+
+ groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group)
+
+ return groups
+
+
+def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Optional[str]:
+ """Return the name of the coverage group for the specified coverage file, or None if no group was found."""
+ parts = os.path.basename(coverage_file).split('=', 4)
+
+ if len(parts) != 5 or not parts[4].startswith('coverage.'):
+ return None
+
+ names = dict(
+ command=parts[0],
+ target=parts[1],
+ environment=parts[2],
+ version=parts[3],
+ )
+
+ export_names = dict(
+ version=parts[3],
+ )
+
+ group = ''
+
+ for part in COVERAGE_GROUPS:
+ if part in args.group_by:
+ group += '=%s' % names[part]
+ elif args.export:
+ group += '=%s' % export_names.get(part, 'various')
+
+ if args.export:
+ group = group.lstrip('=')
+
+ return group
+
+
+class CoverageCombineConfig(CoverageConfig):
+ """Configuration for the coverage combine command."""
+ def __init__(self, args: t.Any) -> None:
+ super().__init__(args)
+
+ self.group_by: frozenset[str] = frozenset(args.group_by) if args.group_by else frozenset()
+ self.all: bool = args.all
+ self.stub: bool = args.stub
+
+ # only available to coverage combine
+ self.export: str = args.export if 'export' in args else False