diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/coverage/analyze')
7 files changed, 695 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py new file mode 100644 index 0000000..37859e8 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py @@ -0,0 +1,17 @@ +"""Common logic for the `coverage analyze` subcommand.""" +from __future__ import annotations +import typing as t + +from .. import ( + CoverageConfig, +) + + +class CoverageAnalyzeConfig(CoverageConfig): + """Configuration for the `coverage analyze` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + # avoid mixing log messages with file output when using `/dev/stdout` for the output file on commands + # this may be worth considering as the default behavior in the future, instead of being dependent on the command or options used + self.display_stderr = True diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py new file mode 100644 index 0000000..ad6cf86 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py @@ -0,0 +1,154 @@ +"""Analyze integration test target code coverage.""" +from __future__ import annotations + +import collections.abc as c +import os +import typing as t + +from .....io import ( + read_json_file, + write_json_file, +) + +from .....util import ( + ApplicationError, + display, +) + +from .. import ( + CoverageAnalyzeConfig, +) + +TargetKey = t.TypeVar('TargetKey', int, tuple[int, int]) +TFlexKey = t.TypeVar('TFlexKey', int, tuple[int, int], str) +NamedPoints = dict[str, dict[TargetKey, set[str]]] +IndexedPoints = dict[str, dict[TargetKey, set[int]]] +Arcs = dict[str, dict[tuple[int, int], set[int]]] +Lines = dict[str, dict[int, set[int]]] +TargetIndexes = dict[str, int] +TargetSetIndexes = dict[frozenset[int], int] + + +class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig): + """Configuration for the `coverage analyze targets` command.""" + + +def make_report(target_indexes: TargetIndexes, arcs: Arcs, lines: Lines) -> dict[str, t.Any]: + """Condense target indexes, arcs and lines into a compact report.""" + set_indexes: TargetSetIndexes = {} + arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items()) + line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items()) + + report = dict( + targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])], + target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])], + arcs=arc_refs, + lines=line_refs, + ) + + return report + + +def load_report(report: dict[str, t.Any]) -> tuple[list[str], Arcs, Lines]: + """Extract target indexes, arcs and lines from an existing report.""" + try: + target_indexes: list[str] = report['targets'] + target_sets: list[list[int]] = report['target_sets'] + arc_data: dict[str, dict[str, int]] = report['arcs'] + line_data: dict[str, dict[int, int]] = report['lines'] + except KeyError as ex: + raise ApplicationError('Document is missing key "%s".' % ex.args) + except TypeError: + raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__) + + arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items()) + lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items()) + + return target_indexes, arcs, lines + + +def read_report(path: str) -> tuple[list[str], Arcs, Lines]: + """Read a JSON report from disk.""" + try: + report = read_json_file(path) + except Exception as ex: + raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex)) + + try: + return load_report(report) + except ApplicationError as ex: + raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex)) + + +def write_report(args: CoverageAnalyzeTargetsConfig, report: dict[str, t.Any], path: str) -> None: + """Write a JSON report to disk.""" + if args.explain: + return + + write_json_file(path, report, formatted=False) + + display.info('Generated %d byte report with %d targets covering %d files.' % ( + os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())), + ), verbosity=1) + + +def format_line(value: int) -> str: + """Format line as a string.""" + return str(value) # putting this in a function keeps both pylint and mypy happy + + +def format_arc(value: tuple[int, int]) -> str: + """Format an arc tuple as a string.""" + return '%d:%d' % value + + +def parse_arc(value: str) -> tuple[int, int]: + """Parse an arc string into a tuple.""" + first, last = tuple(map(int, value.split(':'))) + return first, last + + +def get_target_set_index(data: set[int], target_set_indexes: TargetSetIndexes) -> int: + """Find or add the target set in the result set and return the target set index.""" + return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes)) + + +def get_target_index(name: str, target_indexes: TargetIndexes) -> int: + """Find or add the target in the result set and return the target index.""" + return target_indexes.setdefault(name, len(target_indexes)) + + +def expand_indexes( + source_data: IndexedPoints, + source_index: list[str], + format_func: c.Callable[[TargetKey], TFlexKey], +) -> dict[str, dict[TFlexKey, set[str]]]: + """Expand indexes from the source into target names for easier processing of the data (arcs or lines).""" + combined_data: dict[str, dict[TFlexKey, set[str]]] = {} + + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(format_func(covered_point), set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(source_index[covered_target_index]) + + return combined_data + + +def generate_indexes(target_indexes: TargetIndexes, data: NamedPoints) -> IndexedPoints: + """Return an indexed version of the given data (arcs or points).""" + results: IndexedPoints = {} + + for path, points in data.items(): + result_points = results[path] = {} + + for point, target_names in points.items(): + result_point = result_points[point] = set() + + for target_name in target_names: + result_point.add(get_target_index(target_name, target_indexes)) + + return results diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py new file mode 100644 index 0000000..e3782ce --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py @@ -0,0 +1,74 @@ +"""Combine integration test target code coverage reports.""" +from __future__ import annotations +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + Arcs, + IndexedPoints, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets combine` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_files: list[str] = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombineConfig) -> None: + """Combine integration test target code coverage reports.""" + host_state = prepare_profiles(args) # coverage analyze targets combine + + if args.delegate: + raise Delegate(host_state=host_state) + + combined_target_indexes: TargetIndexes = {} + combined_path_arcs: Arcs = {} + combined_path_lines: Lines = {} + + for report_path in args.input_files: + covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path) + + merge_indexes(covered_path_arcs, covered_targets, combined_path_arcs, combined_target_indexes) + merge_indexes(covered_path_lines, covered_targets, combined_path_lines, combined_target_indexes) + + report = make_report(combined_target_indexes, combined_path_arcs, combined_path_lines) + + write_report(args, report, args.output_file) + + +def merge_indexes( + source_data: IndexedPoints, + source_index: list[str], + combined_data: IndexedPoints, + combined_index: TargetIndexes, +) -> None: + """Merge indexes from the source into the combined data set (arcs or lines).""" + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(covered_point, set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(get_target_index(source_index[covered_target_index], combined_index)) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py new file mode 100644 index 0000000..ba90387 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py @@ -0,0 +1,51 @@ +"""Expand target names in an aggregated coverage file.""" +from __future__ import annotations +import typing as t + +from .....io import ( + SortedSetEncoder, + write_json_file, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + format_arc, + format_line, + read_report, +) + + +class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets expand` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_expand(args: CoverageAnalyzeTargetsExpandConfig) -> None: + """Expand target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets expand + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + report = dict( + arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc), + lines=expand_indexes(covered_path_lines, covered_targets, format_line), + ) + + if not args.explain: + write_json_file(args.output_file, report, encoder=SortedSetEncoder) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py new file mode 100644 index 0000000..29a8ee5 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py @@ -0,0 +1,122 @@ +"""Filter an aggregated coverage file, keeping only the specified targets.""" +from __future__ import annotations + +import collections.abc as c +import re +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + generate_indexes, + make_report, + read_report, + write_report, +) + +from . import ( + NamedPoints, + TargetKey, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets filter` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + self.include_targets: list[str] = args.include_targets + self.exclude_targets: list[str] = args.exclude_targets + self.include_path: t.Optional[str] = args.include_path + self.exclude_path: t.Optional[str] = args.exclude_path + + +def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterConfig) -> None: + """Filter target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets filter + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + def pass_target_key(value: TargetKey) -> TargetKey: + """Return the given target key unmodified.""" + return value + + filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, pass_target_key) + filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, pass_target_key) + + include_targets = set(args.include_targets) if args.include_targets else None + exclude_targets = set(args.exclude_targets) if args.exclude_targets else None + + include_path = re.compile(args.include_path) if args.include_path else None + exclude_path = re.compile(args.exclude_path) if args.exclude_path else None + + def path_filter_func(path: str) -> bool: + """Return True if the given path should be included, otherwise return False.""" + if include_path and not re.search(include_path, path): + return False + + if exclude_path and re.search(exclude_path, path): + return False + + return True + + def target_filter_func(targets: set[str]) -> set[str]: + """Filter the given targets and return the result based on the defined includes and excludes.""" + if include_targets: + targets &= include_targets + + if exclude_targets: + targets -= exclude_targets + + return targets + + filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func) + filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func) + + target_indexes: TargetIndexes = {} + indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs) + indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines) + + report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines) + + write_report(args, report, args.output_file) + + +def filter_data( + data: NamedPoints, + path_filter_func: c.Callable[[str], bool], + target_filter_func: c.Callable[[set[str]], set[str]], +) -> NamedPoints: + """Filter the data set using the specified filter function.""" + result: NamedPoints = {} + + for src_path, src_points in data.items(): + if not path_filter_func(src_path): + continue + + dst_points = {} + + for src_point, src_targets in src_points.items(): + dst_targets = target_filter_func(src_targets) + + if dst_targets: + dst_points[src_point] = dst_targets + + if dst_points: + result[src_path] = dst_points + + return result diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py new file mode 100644 index 0000000..127b5b7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py @@ -0,0 +1,158 @@ +"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_text, +) + +from .....data import ( + data_context, +) + +from .....util_common import ( + ResultType, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, + HostState, +) + +from ... import ( + enumerate_powershell_lines, + enumerate_python_arcs, + get_collection_path_regexes, + get_powershell_coverage_files, + get_python_coverage_files, + get_python_modules, + initialize_coverage, + PathChecker, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + write_report, +) + +from . import ( + Arcs, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets generate` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_dir: str = args.input_dir or ResultType.COVERAGE.path + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenerateConfig) -> None: + """Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" + host_state = prepare_profiles(args) # coverage analyze targets generate + + if args.delegate: + raise Delegate(host_state) + + root = data_context().content.root + target_indexes: TargetIndexes = {} + arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items()) + lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items()) + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def analyze_python_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + host_state: HostState, + path: str, + target_indexes: TargetIndexes, +) -> Arcs: + """Analyze Python code coverage.""" + results: Arcs = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + modules = get_python_modules() + python_files = get_python_coverage_files(path) + coverage = initialize_coverage(args, host_state) + + for python_file in python_files: + if not is_integration_coverage_file(python_file): + continue + + target_name = get_target_name(python_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re): + arcs = results.setdefault(filename, {}) + + for covered_arc in covered_arcs: + arc = arcs.setdefault(covered_arc, set()) + arc.add(target_index) + + prune_invalid_filenames(args, results, collection_search_re=collection_search_re) + + return results + + +def analyze_powershell_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + path: str, + target_indexes: TargetIndexes, +) -> Lines: + """Analyze PowerShell code coverage""" + results: Lines = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + powershell_files = get_powershell_coverage_files(path) + + for powershell_file in powershell_files: + if not is_integration_coverage_file(powershell_file): + continue + + target_name = get_target_name(powershell_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, hits in enumerate_powershell_lines(powershell_file, collection_search_re, collection_sub_re): + lines = results.setdefault(filename, {}) + + for covered_line in hits: + line = lines.setdefault(covered_line, set()) + line.add(target_index) + + prune_invalid_filenames(args, results) + + return results + + +def prune_invalid_filenames( + args: CoverageAnalyzeTargetsGenerateConfig, + results: dict[str, t.Any], + collection_search_re: t.Optional[t.Pattern] = None, +) -> None: + """Remove invalid filenames from the given result set.""" + path_checker = PathChecker(args, collection_search_re) + + for path in list(results.keys()): + if not path_checker.check_path(path): + del results[path] + + +def get_target_name(path: str) -> str: + """Extract the test target name from the given coverage path.""" + return to_text(os.path.basename(path).split('=')[1]) + + +def is_integration_coverage_file(path: str) -> bool: + """Returns True if the coverage file came from integration tests, otherwise False.""" + return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration') diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py new file mode 100644 index 0000000..c1c77e7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py @@ -0,0 +1,119 @@ +"""Identify aggregated coverage in one file missing from another.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_bytes, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + TargetIndexes, + IndexedPoints, +) + + +class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets missing` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.from_file: str = args.from_file + self.to_file: str = args.to_file + self.output_file: str = args.output_file + + self.only_gaps: bool = args.only_gaps + self.only_exists: bool = args.only_exists + + +def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissingConfig) -> None: + """Identify aggregated coverage in one file missing from another.""" + host_state = prepare_profiles(args) # coverage analyze targets missing + + if args.delegate: + raise Delegate(host_state=host_state) + + from_targets, from_path_arcs, from_path_lines = read_report(args.from_file) + to_targets, to_path_arcs, to_path_lines = read_report(args.to_file) + target_indexes: TargetIndexes = {} + + if args.only_gaps: + arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists) + lines = find_gaps(from_path_lines, from_targets, to_path_lines, target_indexes, args.only_exists) + else: + arcs = find_missing(from_path_arcs, from_targets, to_path_arcs, to_targets, target_indexes, args.only_exists) + lines = find_missing(from_path_lines, from_targets, to_path_lines, to_targets, target_indexes, args.only_exists) + + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def find_gaps( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find gaps in coverage between the from and to data sets.""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + gaps = set(from_points.keys()) - set(to_points.keys()) + + if gaps: + gap_points = dict((key, value) for key, value in from_points.items() if key in gaps) + target_data[from_path] = dict((gap, set(get_target_index(from_index[i], target_indexes) for i in indexes)) for gap, indexes in gap_points.items()) + + return target_data + + +def find_missing( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + to_index: list[str], + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find coverage in from_data not present in to_data (arcs or lines).""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + for from_point, from_target_indexes in from_points.items(): + to_target_indexes = to_points.get(from_point, set()) + + remaining_targets = set(from_index[i] for i in from_target_indexes) - set(to_index[i] for i in to_target_indexes) + + if remaining_targets: + target_index = target_data.setdefault(from_path, {}).setdefault(from_point, set()) + target_index.update(get_target_index(name, target_indexes) for name in remaining_targets) + + return target_data |