diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py')
-rw-r--r-- | test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py new file mode 100644 index 0000000..29a8ee5 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py @@ -0,0 +1,122 @@ +"""Filter an aggregated coverage file, keeping only the specified targets.""" +from __future__ import annotations + +import collections.abc as c +import re +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + generate_indexes, + make_report, + read_report, + write_report, +) + +from . import ( + NamedPoints, + TargetKey, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets filter` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + self.include_targets: list[str] = args.include_targets + self.exclude_targets: list[str] = args.exclude_targets + self.include_path: t.Optional[str] = args.include_path + self.exclude_path: t.Optional[str] = args.exclude_path + + +def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterConfig) -> None: + """Filter target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets filter + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + def pass_target_key(value: TargetKey) -> TargetKey: + """Return the given target key unmodified.""" + return value + + filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, pass_target_key) + filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, pass_target_key) + + include_targets = set(args.include_targets) if args.include_targets else None + exclude_targets = set(args.exclude_targets) if args.exclude_targets else None + + include_path = re.compile(args.include_path) if args.include_path else None + exclude_path = re.compile(args.exclude_path) if args.exclude_path else None + + def path_filter_func(path: str) -> bool: + """Return True if the given path should be included, otherwise return False.""" + if include_path and not re.search(include_path, path): + return False + + if exclude_path and re.search(exclude_path, path): + return False + + return True + + def target_filter_func(targets: set[str]) -> set[str]: + """Filter the given targets and return the result based on the defined includes and excludes.""" + if include_targets: + targets &= include_targets + + if exclude_targets: + targets -= exclude_targets + + return targets + + filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func) + filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func) + + target_indexes: TargetIndexes = {} + indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs) + indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines) + + report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines) + + write_report(args, report, args.output_file) + + +def filter_data( + data: NamedPoints, + path_filter_func: c.Callable[[str], bool], + target_filter_func: c.Callable[[set[str]], set[str]], +) -> NamedPoints: + """Filter the data set using the specified filter function.""" + result: NamedPoints = {} + + for src_path, src_points in data.items(): + if not path_filter_func(src_path): + continue + + dst_points = {} + + for src_point, src_targets in src_points.items(): + dst_targets = target_filter_func(src_targets) + + if dst_targets: + dst_points[src_point] = dst_targets + + if dst_points: + result[src_path] = dst_points + + return result |