diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/coverage')
13 files changed, 1862 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/coverage/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/__init__.py new file mode 100644 index 0000000..139cf3c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/__init__.py @@ -0,0 +1,370 @@ +"""Common logic for the coverage subcommand.""" +from __future__ import annotations + +import collections.abc as c +import json +import os +import re +import typing as t + +from ...encoding import ( + to_bytes, +) + +from ...io import ( + read_text_file, + read_json_file, +) + +from ...util import ( + ApplicationError, + common_environment, + display, + ANSIBLE_TEST_DATA_ROOT, +) + +from ...util_common import ( + intercept_python, + ResultType, +) + +from ...config import ( + EnvironmentConfig, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...target import ( + walk_module_targets, +) + +from ...data import ( + data_context, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...provisioning import ( + HostState, +) + +from ...coverage_util import ( + get_coverage_file_schema_version, + CoverageError, + CONTROLLER_COVERAGE_VERSION, +) + +if t.TYPE_CHECKING: + import coverage as coverage_module + +COVERAGE_GROUPS = ('command', 'target', 'environment', 'version') +COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc') +COVERAGE_OUTPUT_FILE_NAME = 'coverage' + + +class CoverageConfig(EnvironmentConfig): + """Configuration for the coverage command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args, 'coverage') + + +def initialize_coverage(args: CoverageConfig, host_state: HostState) -> coverage_module: + """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available.""" + configure_pypi_proxy(args, host_state.controller_profile) # coverage + install_requirements(args, host_state.controller_profile.python, coverage=True) # coverage + + try: + import coverage + except ImportError: + coverage = None + + coverage_required_version = CONTROLLER_COVERAGE_VERSION.coverage_version + + if not coverage: + raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module must be installed to use this command.') + + if coverage.__version__ != coverage_required_version: + raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module is required. Version {coverage.__version__} was found.') + + return coverage + + +def run_coverage(args: CoverageConfig, host_state: HostState, output_file: str, command: str, cmd: list[str]) -> None: + """Run the coverage cli tool with the specified options.""" + env = common_environment() + env.update(dict(COVERAGE_FILE=output_file)) + + cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd + + stdout, stderr = intercept_python(args, host_state.controller_profile.python, cmd, env, capture=True) + + stdout = (stdout or '').strip() + stderr = (stderr or '').strip() + + if stdout: + display.info(stdout) + + if stderr: + display.warning(stderr) + + +def get_all_coverage_files() -> list[str]: + """Return a list of all coverage file paths.""" + return get_python_coverage_files() + get_powershell_coverage_files() + + +def get_python_coverage_files(path: t.Optional[str] = None) -> list[str]: + """Return the list of Python coverage file paths.""" + return get_coverage_files('python', path) + + +def get_powershell_coverage_files(path: t.Optional[str] = None) -> list[str]: + """Return the list of PowerShell coverage file paths.""" + return get_coverage_files('powershell', path) + + +def get_coverage_files(language: str, path: t.Optional[str] = None) -> list[str]: + """Return the list of coverage file paths for the given language.""" + coverage_dir = path or ResultType.COVERAGE.path + + try: + coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir) + if '=coverage.' in f and '=%s' % language in f] + except FileNotFoundError: + return [] + + return coverage_files + + +def get_collection_path_regexes() -> tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]: + """Return a pair of regexes used for identifying and manipulating collection paths.""" + if data_context().content.collection: + collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory) + collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory) + else: + collection_search_re = None + collection_sub_re = None + + return collection_search_re, collection_sub_re + + +def get_python_modules() -> dict[str, str]: + """Return a dictionary of Ansible module names and their paths.""" + return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py')) + + +def enumerate_python_arcs( + path: str, + coverage: coverage_module, + modules: dict[str, str], + collection_search_re: t.Optional[t.Pattern], + collection_sub_re: t.Optional[t.Pattern], +) -> c.Generator[tuple[str, set[tuple[int, int]]], None, None]: + """Enumerate Python code coverage arcs in the given file.""" + if os.path.getsize(path) == 0: + display.warning('Empty coverage file: %s' % path, verbosity=2) + return + + try: + arc_data = read_python_coverage(path, coverage) + except CoverageError as ex: + display.error(str(ex)) + return + + for filename, arcs in arc_data.items(): + if not arcs: + # This is most likely due to using an unsupported version of coverage. + display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path)) + continue + + filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) + + if not filename: + continue + + yield filename, set(arcs) + + +PythonArcs = dict[str, list[tuple[int, int]]] +"""Python coverage arcs.""" + + +def read_python_coverage(path: str, coverage: coverage_module) -> PythonArcs: + """Return coverage arcs from the specified coverage file. Raises a CoverageError exception if coverage cannot be read.""" + try: + return read_python_coverage_native(path, coverage) + except CoverageError as ex: + schema_version = get_coverage_file_schema_version(path) + + if schema_version == CONTROLLER_COVERAGE_VERSION.schema_version: + raise CoverageError(path, f'Unexpected failure reading supported schema version {schema_version}.') from ex + + if schema_version == 0: + return read_python_coverage_legacy(path) + + raise CoverageError(path, f'Unsupported schema version: {schema_version}') + + +def read_python_coverage_native(path: str, coverage: coverage_module) -> PythonArcs: + """Return coverage arcs from the specified coverage file using the coverage API.""" + try: + data = coverage.CoverageData(path) + data.read() + arcs = {filename: data.arcs(filename) for filename in data.measured_files()} + except Exception as ex: + raise CoverageError(path, f'Error reading coverage file using coverage API: {ex}') from ex + + return arcs + + +def read_python_coverage_legacy(path: str) -> PythonArcs: + """Return coverage arcs from the specified coverage file, which must be in the legacy JSON format.""" + try: + contents = read_text_file(path) + contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents) + data = json.loads(contents) + arcs: PythonArcs = {filename: [t.cast(tuple[int, int], tuple(arc)) for arc in arc_list] for filename, arc_list in data['arcs'].items()} + except Exception as ex: + raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex + + return arcs + + +def enumerate_powershell_lines( + path: str, + collection_search_re: t.Optional[t.Pattern], + collection_sub_re: t.Optional[t.Pattern], +) -> c.Generator[tuple[str, dict[int, int]], None, None]: + """Enumerate PowerShell code coverage lines in the given file.""" + if os.path.getsize(path) == 0: + display.warning('Empty coverage file: %s' % path, verbosity=2) + return + + try: + coverage_run = read_json_file(path) + except Exception as ex: # pylint: disable=locally-disabled, broad-except + display.error('%s' % ex) + return + + for filename, hits in coverage_run.items(): + filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) + + if not filename: + continue + + if isinstance(hits, dict) and not hits.get('Line'): + # Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage. + # This format differs from the more verbose format of raw coverage data from the remote Windows hosts. + hits = dict((int(key), value) for key, value in hits.items()) + + yield filename, hits + continue + + # PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that + if not isinstance(hits, list): + hits = [hits] + + hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit) + + yield filename, hits + + +def sanitize_filename( + filename: str, + modules: t.Optional[dict[str, str]] = None, + collection_search_re: t.Optional[t.Pattern] = None, + collection_sub_re: t.Optional[t.Pattern] = None, +) -> t.Optional[str]: + """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid.""" + ansible_path = os.path.abspath('lib/ansible/') + '/' + root_path = data_context().content.root + '/' + integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep + + if modules is None: + modules = {} + + if '/ansible_modlib.zip/ansible/' in filename: + # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier. + new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif collection_search_re and collection_search_re.search(filename): + new_name = os.path.abspath(collection_sub_re.sub('', filename)) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename): + # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later. + new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif '/ansible_module_' in filename: + # Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier. + module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename) + if module_name not in modules: + display.warning('Skipping coverage of unknown module: %s' % module_name) + return None + new_name = os.path.abspath(modules[module_name]) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename): + # Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later. + # AnsiballZ versions using zipimporter will match the `.zip` portion of the regex. + # AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex. + module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$', + '\\g<module>', filename).rstrip('_') + if module_name not in modules: + display.warning('Skipping coverage of unknown module: %s' % module_name) + return None + new_name = os.path.abspath(modules[module_name]) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search('^(/.*?)?/root/ansible/', filename): + # Rewrite the path of code running on a remote host or in a docker container as root. + new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif integration_temp_path in filename: + # Rewrite the path of code running from an integration test temporary directory. + new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + + filename = os.path.abspath(filename) # make sure path is absolute (will be relative if previously exported) + + return filename + + +class PathChecker: + """Checks code coverage paths to verify they are valid and reports on the findings.""" + def __init__(self, args: CoverageConfig, collection_search_re: t.Optional[t.Pattern] = None) -> None: + self.args = args + self.collection_search_re = collection_search_re + self.invalid_paths: list[str] = [] + self.invalid_path_chars = 0 + + def check_path(self, path: str) -> bool: + """Return True if the given coverage path is valid, otherwise display a warning and return False.""" + if os.path.isfile(to_bytes(path)): + return True + + if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py': + # the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk + # coverage is still reported for these non-existent files, but warnings are not needed + return False + + self.invalid_paths.append(path) + self.invalid_path_chars += len(path) + + if self.args.verbosity > 1: + display.warning('Invalid coverage path: %s' % path) + + return False + + def report(self) -> None: + """Display a warning regarding invalid paths if any were found.""" + if self.invalid_paths: + display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths))) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py new file mode 100644 index 0000000..37859e8 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py @@ -0,0 +1,17 @@ +"""Common logic for the `coverage analyze` subcommand.""" +from __future__ import annotations +import typing as t + +from .. import ( + CoverageConfig, +) + + +class CoverageAnalyzeConfig(CoverageConfig): + """Configuration for the `coverage analyze` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + # avoid mixing log messages with file output when using `/dev/stdout` for the output file on commands + # this may be worth considering as the default behavior in the future, instead of being dependent on the command or options used + self.display_stderr = True diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py new file mode 100644 index 0000000..ad6cf86 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py @@ -0,0 +1,154 @@ +"""Analyze integration test target code coverage.""" +from __future__ import annotations + +import collections.abc as c +import os +import typing as t + +from .....io import ( + read_json_file, + write_json_file, +) + +from .....util import ( + ApplicationError, + display, +) + +from .. import ( + CoverageAnalyzeConfig, +) + +TargetKey = t.TypeVar('TargetKey', int, tuple[int, int]) +TFlexKey = t.TypeVar('TFlexKey', int, tuple[int, int], str) +NamedPoints = dict[str, dict[TargetKey, set[str]]] +IndexedPoints = dict[str, dict[TargetKey, set[int]]] +Arcs = dict[str, dict[tuple[int, int], set[int]]] +Lines = dict[str, dict[int, set[int]]] +TargetIndexes = dict[str, int] +TargetSetIndexes = dict[frozenset[int], int] + + +class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig): + """Configuration for the `coverage analyze targets` command.""" + + +def make_report(target_indexes: TargetIndexes, arcs: Arcs, lines: Lines) -> dict[str, t.Any]: + """Condense target indexes, arcs and lines into a compact report.""" + set_indexes: TargetSetIndexes = {} + arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items()) + line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items()) + + report = dict( + targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])], + target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])], + arcs=arc_refs, + lines=line_refs, + ) + + return report + + +def load_report(report: dict[str, t.Any]) -> tuple[list[str], Arcs, Lines]: + """Extract target indexes, arcs and lines from an existing report.""" + try: + target_indexes: list[str] = report['targets'] + target_sets: list[list[int]] = report['target_sets'] + arc_data: dict[str, dict[str, int]] = report['arcs'] + line_data: dict[str, dict[int, int]] = report['lines'] + except KeyError as ex: + raise ApplicationError('Document is missing key "%s".' % ex.args) + except TypeError: + raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__) + + arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items()) + lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items()) + + return target_indexes, arcs, lines + + +def read_report(path: str) -> tuple[list[str], Arcs, Lines]: + """Read a JSON report from disk.""" + try: + report = read_json_file(path) + except Exception as ex: + raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex)) + + try: + return load_report(report) + except ApplicationError as ex: + raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex)) + + +def write_report(args: CoverageAnalyzeTargetsConfig, report: dict[str, t.Any], path: str) -> None: + """Write a JSON report to disk.""" + if args.explain: + return + + write_json_file(path, report, formatted=False) + + display.info('Generated %d byte report with %d targets covering %d files.' % ( + os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())), + ), verbosity=1) + + +def format_line(value: int) -> str: + """Format line as a string.""" + return str(value) # putting this in a function keeps both pylint and mypy happy + + +def format_arc(value: tuple[int, int]) -> str: + """Format an arc tuple as a string.""" + return '%d:%d' % value + + +def parse_arc(value: str) -> tuple[int, int]: + """Parse an arc string into a tuple.""" + first, last = tuple(map(int, value.split(':'))) + return first, last + + +def get_target_set_index(data: set[int], target_set_indexes: TargetSetIndexes) -> int: + """Find or add the target set in the result set and return the target set index.""" + return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes)) + + +def get_target_index(name: str, target_indexes: TargetIndexes) -> int: + """Find or add the target in the result set and return the target index.""" + return target_indexes.setdefault(name, len(target_indexes)) + + +def expand_indexes( + source_data: IndexedPoints, + source_index: list[str], + format_func: c.Callable[[TargetKey], TFlexKey], +) -> dict[str, dict[TFlexKey, set[str]]]: + """Expand indexes from the source into target names for easier processing of the data (arcs or lines).""" + combined_data: dict[str, dict[TFlexKey, set[str]]] = {} + + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(format_func(covered_point), set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(source_index[covered_target_index]) + + return combined_data + + +def generate_indexes(target_indexes: TargetIndexes, data: NamedPoints) -> IndexedPoints: + """Return an indexed version of the given data (arcs or points).""" + results: IndexedPoints = {} + + for path, points in data.items(): + result_points = results[path] = {} + + for point, target_names in points.items(): + result_point = result_points[point] = set() + + for target_name in target_names: + result_point.add(get_target_index(target_name, target_indexes)) + + return results diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py new file mode 100644 index 0000000..e3782ce --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py @@ -0,0 +1,74 @@ +"""Combine integration test target code coverage reports.""" +from __future__ import annotations +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + Arcs, + IndexedPoints, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets combine` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_files: list[str] = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombineConfig) -> None: + """Combine integration test target code coverage reports.""" + host_state = prepare_profiles(args) # coverage analyze targets combine + + if args.delegate: + raise Delegate(host_state=host_state) + + combined_target_indexes: TargetIndexes = {} + combined_path_arcs: Arcs = {} + combined_path_lines: Lines = {} + + for report_path in args.input_files: + covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path) + + merge_indexes(covered_path_arcs, covered_targets, combined_path_arcs, combined_target_indexes) + merge_indexes(covered_path_lines, covered_targets, combined_path_lines, combined_target_indexes) + + report = make_report(combined_target_indexes, combined_path_arcs, combined_path_lines) + + write_report(args, report, args.output_file) + + +def merge_indexes( + source_data: IndexedPoints, + source_index: list[str], + combined_data: IndexedPoints, + combined_index: TargetIndexes, +) -> None: + """Merge indexes from the source into the combined data set (arcs or lines).""" + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(covered_point, set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(get_target_index(source_index[covered_target_index], combined_index)) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py new file mode 100644 index 0000000..ba90387 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py @@ -0,0 +1,51 @@ +"""Expand target names in an aggregated coverage file.""" +from __future__ import annotations +import typing as t + +from .....io import ( + SortedSetEncoder, + write_json_file, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + format_arc, + format_line, + read_report, +) + + +class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets expand` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_expand(args: CoverageAnalyzeTargetsExpandConfig) -> None: + """Expand target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets expand + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + report = dict( + arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc), + lines=expand_indexes(covered_path_lines, covered_targets, format_line), + ) + + if not args.explain: + write_json_file(args.output_file, report, encoder=SortedSetEncoder) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py new file mode 100644 index 0000000..29a8ee5 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py @@ -0,0 +1,122 @@ +"""Filter an aggregated coverage file, keeping only the specified targets.""" +from __future__ import annotations + +import collections.abc as c +import re +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + generate_indexes, + make_report, + read_report, + write_report, +) + +from . import ( + NamedPoints, + TargetKey, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets filter` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + self.include_targets: list[str] = args.include_targets + self.exclude_targets: list[str] = args.exclude_targets + self.include_path: t.Optional[str] = args.include_path + self.exclude_path: t.Optional[str] = args.exclude_path + + +def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterConfig) -> None: + """Filter target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets filter + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + def pass_target_key(value: TargetKey) -> TargetKey: + """Return the given target key unmodified.""" + return value + + filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, pass_target_key) + filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, pass_target_key) + + include_targets = set(args.include_targets) if args.include_targets else None + exclude_targets = set(args.exclude_targets) if args.exclude_targets else None + + include_path = re.compile(args.include_path) if args.include_path else None + exclude_path = re.compile(args.exclude_path) if args.exclude_path else None + + def path_filter_func(path: str) -> bool: + """Return True if the given path should be included, otherwise return False.""" + if include_path and not re.search(include_path, path): + return False + + if exclude_path and re.search(exclude_path, path): + return False + + return True + + def target_filter_func(targets: set[str]) -> set[str]: + """Filter the given targets and return the result based on the defined includes and excludes.""" + if include_targets: + targets &= include_targets + + if exclude_targets: + targets -= exclude_targets + + return targets + + filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func) + filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func) + + target_indexes: TargetIndexes = {} + indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs) + indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines) + + report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines) + + write_report(args, report, args.output_file) + + +def filter_data( + data: NamedPoints, + path_filter_func: c.Callable[[str], bool], + target_filter_func: c.Callable[[set[str]], set[str]], +) -> NamedPoints: + """Filter the data set using the specified filter function.""" + result: NamedPoints = {} + + for src_path, src_points in data.items(): + if not path_filter_func(src_path): + continue + + dst_points = {} + + for src_point, src_targets in src_points.items(): + dst_targets = target_filter_func(src_targets) + + if dst_targets: + dst_points[src_point] = dst_targets + + if dst_points: + result[src_path] = dst_points + + return result diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py new file mode 100644 index 0000000..127b5b7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py @@ -0,0 +1,158 @@ +"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_text, +) + +from .....data import ( + data_context, +) + +from .....util_common import ( + ResultType, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, + HostState, +) + +from ... import ( + enumerate_powershell_lines, + enumerate_python_arcs, + get_collection_path_regexes, + get_powershell_coverage_files, + get_python_coverage_files, + get_python_modules, + initialize_coverage, + PathChecker, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + write_report, +) + +from . import ( + Arcs, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets generate` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_dir: str = args.input_dir or ResultType.COVERAGE.path + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenerateConfig) -> None: + """Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" + host_state = prepare_profiles(args) # coverage analyze targets generate + + if args.delegate: + raise Delegate(host_state) + + root = data_context().content.root + target_indexes: TargetIndexes = {} + arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items()) + lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items()) + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def analyze_python_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + host_state: HostState, + path: str, + target_indexes: TargetIndexes, +) -> Arcs: + """Analyze Python code coverage.""" + results: Arcs = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + modules = get_python_modules() + python_files = get_python_coverage_files(path) + coverage = initialize_coverage(args, host_state) + + for python_file in python_files: + if not is_integration_coverage_file(python_file): + continue + + target_name = get_target_name(python_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re): + arcs = results.setdefault(filename, {}) + + for covered_arc in covered_arcs: + arc = arcs.setdefault(covered_arc, set()) + arc.add(target_index) + + prune_invalid_filenames(args, results, collection_search_re=collection_search_re) + + return results + + +def analyze_powershell_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + path: str, + target_indexes: TargetIndexes, +) -> Lines: + """Analyze PowerShell code coverage""" + results: Lines = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + powershell_files = get_powershell_coverage_files(path) + + for powershell_file in powershell_files: + if not is_integration_coverage_file(powershell_file): + continue + + target_name = get_target_name(powershell_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, hits in enumerate_powershell_lines(powershell_file, collection_search_re, collection_sub_re): + lines = results.setdefault(filename, {}) + + for covered_line in hits: + line = lines.setdefault(covered_line, set()) + line.add(target_index) + + prune_invalid_filenames(args, results) + + return results + + +def prune_invalid_filenames( + args: CoverageAnalyzeTargetsGenerateConfig, + results: dict[str, t.Any], + collection_search_re: t.Optional[t.Pattern] = None, +) -> None: + """Remove invalid filenames from the given result set.""" + path_checker = PathChecker(args, collection_search_re) + + for path in list(results.keys()): + if not path_checker.check_path(path): + del results[path] + + +def get_target_name(path: str) -> str: + """Extract the test target name from the given coverage path.""" + return to_text(os.path.basename(path).split('=')[1]) + + +def is_integration_coverage_file(path: str) -> bool: + """Returns True if the coverage file came from integration tests, otherwise False.""" + return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration') diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py new file mode 100644 index 0000000..c1c77e7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py @@ -0,0 +1,119 @@ +"""Identify aggregated coverage in one file missing from another.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_bytes, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + TargetIndexes, + IndexedPoints, +) + + +class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets missing` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.from_file: str = args.from_file + self.to_file: str = args.to_file + self.output_file: str = args.output_file + + self.only_gaps: bool = args.only_gaps + self.only_exists: bool = args.only_exists + + +def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissingConfig) -> None: + """Identify aggregated coverage in one file missing from another.""" + host_state = prepare_profiles(args) # coverage analyze targets missing + + if args.delegate: + raise Delegate(host_state=host_state) + + from_targets, from_path_arcs, from_path_lines = read_report(args.from_file) + to_targets, to_path_arcs, to_path_lines = read_report(args.to_file) + target_indexes: TargetIndexes = {} + + if args.only_gaps: + arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists) + lines = find_gaps(from_path_lines, from_targets, to_path_lines, target_indexes, args.only_exists) + else: + arcs = find_missing(from_path_arcs, from_targets, to_path_arcs, to_targets, target_indexes, args.only_exists) + lines = find_missing(from_path_lines, from_targets, to_path_lines, to_targets, target_indexes, args.only_exists) + + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def find_gaps( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find gaps in coverage between the from and to data sets.""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + gaps = set(from_points.keys()) - set(to_points.keys()) + + if gaps: + gap_points = dict((key, value) for key, value in from_points.items() if key in gaps) + target_data[from_path] = dict((gap, set(get_target_index(from_index[i], target_indexes) for i in indexes)) for gap, indexes in gap_points.items()) + + return target_data + + +def find_missing( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + to_index: list[str], + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find coverage in from_data not present in to_data (arcs or lines).""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + for from_point, from_target_indexes in from_points.items(): + to_target_indexes = to_points.get(from_point, set()) + + remaining_targets = set(from_index[i] for i in from_target_indexes) - set(to_index[i] for i in to_target_indexes) + + if remaining_targets: + target_index = target_data.setdefault(from_path, {}).setdefault(from_point, set()) + target_index.update(get_target_index(name, target_indexes) for name in remaining_targets) + + return target_data diff --git a/test/lib/ansible_test/_internal/commands/coverage/combine.py b/test/lib/ansible_test/_internal/commands/coverage/combine.py new file mode 100644 index 0000000..66210c7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/combine.py @@ -0,0 +1,362 @@ +"""Combine code coverage files.""" +from __future__ import annotations + +import collections.abc as c +import os +import json +import typing as t + +from ...target import ( + walk_compile_targets, + walk_powershell_targets, +) + +from ...io import ( + read_text_file, +) + +from ...util import ( + ANSIBLE_TEST_TOOLS_ROOT, + display, + ApplicationError, + raw_command, +) + +from ...util_common import ( + ResultType, + write_json_file, + write_json_test_results, +) + +from ...executor import ( + Delegate, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + DockerConfig, + RemoteConfig, +) + +from ...provisioning import ( + HostState, + prepare_profiles, +) + +from . import ( + enumerate_python_arcs, + enumerate_powershell_lines, + get_collection_path_regexes, + get_all_coverage_files, + get_python_coverage_files, + get_python_modules, + get_powershell_coverage_files, + initialize_coverage, + COVERAGE_OUTPUT_FILE_NAME, + COVERAGE_GROUPS, + CoverageConfig, + PathChecker, +) + +TValue = t.TypeVar('TValue') + + +def command_coverage_combine(args: CoverageCombineConfig) -> None: + """Patch paths in coverage files and merge into a single file.""" + host_state = prepare_profiles(args) # coverage combine + combine_coverage_files(args, host_state) + + +def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -> list[str]: + """Combine coverage and return a list of the resulting files.""" + if args.delegate: + if isinstance(args.controller, (DockerConfig, RemoteConfig)): + paths = get_all_coverage_files() + exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']] + + if not exported_paths: + raise ExportedCoverageDataNotFound() + + pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths] + + def coverage_callback(files: list[tuple[str, str]]) -> None: + """Add the coverage files to the payload file list.""" + display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1) + files.extend(pairs) + + data_context().register_payload_callback(coverage_callback) + + raise Delegate(host_state=host_state) + + paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state) + + for path in paths: + display.info('Generated combined output: %s' % path, verbosity=1) + + return paths + + +class ExportedCoverageDataNotFound(ApplicationError): + """Exception when no combined coverage data is present yet is required.""" + def __init__(self) -> None: + super().__init__( + 'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n' + 'Export coverage with `ansible-test coverage combine` using the `--export` option.\n' + 'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path) + + +def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]: + """Combine Python coverage files and return a list of the output files.""" + coverage = initialize_coverage(args, host_state) + + modules = get_python_modules() + + coverage_files = get_python_coverage_files() + + def _default_stub_value(source_paths: list[str]) -> dict[str, set[tuple[int, int]]]: + return {path: set() for path in source_paths} + + counter = 0 + sources = _get_coverage_targets(args, walk_compile_targets) + groups = _build_stub_groups(args, sources, _default_stub_value) + + collection_search_re, collection_sub_re = get_collection_path_regexes() + + for coverage_file in coverage_files: + counter += 1 + display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2) + + group = get_coverage_group(args, coverage_file) + + if group is None: + display.warning('Unexpected name for coverage file: %s' % coverage_file) + continue + + for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re): + if args.export: + filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems + + if group not in groups: + groups[group] = {} + + arc_data = groups[group] + + if filename not in arc_data: + arc_data[filename] = set() + + arc_data[filename].update(arcs) + + output_files = [] + + if args.export: + coverage_file = os.path.join(args.export, '') + suffix = '=coverage.combined' + else: + coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME) + suffix = '' + + path_checker = PathChecker(args, collection_search_re) + + for group in sorted(groups): + arc_data = groups[group] + output_file = coverage_file + group + suffix + + if args.explain: + continue + + updated = coverage.CoverageData(output_file) + + for filename in arc_data: + if not path_checker.check_path(filename): + continue + + updated.add_arcs({filename: list(arc_data[filename])}) + + if args.all: + updated.add_arcs(dict((source[0], []) for source in sources)) + + updated.write() # always write files to make sure stale files do not exist + + if updated: + # only report files which are non-empty to prevent coverage from reporting errors + output_files.append(output_file) + + path_checker.report() + + return sorted(output_files) + + +def _command_coverage_combine_powershell(args: CoverageCombineConfig) -> list[str]: + """Combine PowerShell coverage files and return a list of the output files.""" + coverage_files = get_powershell_coverage_files() + + def _default_stub_value(source_paths: list[str]) -> dict[str, dict[int, int]]: + cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')] + cmd.extend(source_paths) + + stubs = json.loads(raw_command(cmd, capture=True)[0]) + + return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs) + + counter = 0 + sources = _get_coverage_targets(args, walk_powershell_targets) + groups = _build_stub_groups(args, sources, _default_stub_value) + + collection_search_re, collection_sub_re = get_collection_path_regexes() + + for coverage_file in coverage_files: + counter += 1 + display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2) + + group = get_coverage_group(args, coverage_file) + + if group is None: + display.warning('Unexpected name for coverage file: %s' % coverage_file) + continue + + for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re): + if args.export: + filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems + + if group not in groups: + groups[group] = {} + + coverage_data = groups[group] + + if filename not in coverage_data: + coverage_data[filename] = {} + + file_coverage = coverage_data[filename] + + for line_no, hit_count in hits.items(): + file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count + + output_files = [] + + path_checker = PathChecker(args) + + for group in sorted(groups): + coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename)) + + if args.all: + missing_sources = [source for source, _source_line_count in sources if source not in coverage_data] + coverage_data.update(_default_stub_value(missing_sources)) + + if not args.explain: + if args.export: + output_file = os.path.join(args.export, group + '=coverage.combined') + write_json_file(output_file, coverage_data, formatted=False) + output_files.append(output_file) + continue + + output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell' + + write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False) + + output_files.append(os.path.join(ResultType.COVERAGE.path, output_file)) + + path_checker.report() + + return sorted(output_files) + + +def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) -> list[tuple[str, int]]: + """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files.""" + sources = [] + + if args.all or args.stub: + # excludes symlinks of regular files to avoid reporting on the same file multiple times + # in the future it would be nice to merge any coverage for symlinks into the real files + for target in walk_func(include_symlinks=False): + target_path = os.path.abspath(target.path) + + target_lines = len(read_text_file(target_path).splitlines()) + + sources.append((target_path, target_lines)) + + sources.sort() + + return sources + + +def _build_stub_groups( + args: CoverageCombineConfig, + sources: list[tuple[str, int]], + default_stub_value: c.Callable[[list[str]], dict[str, TValue]], +) -> dict[str, dict[str, TValue]]: + """ + Split the given list of sources with line counts into groups, maintaining a maximum line count for each group. + Each group consists of a dictionary of sources and default coverage stubs generated by the provided default_stub_value function. + """ + groups = {} + + if args.stub: + stub_group: list[str] = [] + stub_groups = [stub_group] + stub_line_limit = 500000 + stub_line_count = 0 + + for source, source_line_count in sources: + stub_group.append(source) + stub_line_count += source_line_count + + if stub_line_count > stub_line_limit: + stub_line_count = 0 + stub_group = [] + stub_groups.append(stub_group) + + for stub_index, stub_group in enumerate(stub_groups): + if not stub_group: + continue + + groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group) + + return groups + + +def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Optional[str]: + """Return the name of the coverage group for the specified coverage file, or None if no group was found.""" + parts = os.path.basename(coverage_file).split('=', 4) + + if len(parts) != 5 or not parts[4].startswith('coverage.'): + return None + + names = dict( + command=parts[0], + target=parts[1], + environment=parts[2], + version=parts[3], + ) + + export_names = dict( + version=parts[3], + ) + + group = '' + + for part in COVERAGE_GROUPS: + if part in args.group_by: + group += '=%s' % names[part] + elif args.export: + group += '=%s' % export_names.get(part, 'various') + + if args.export: + group = group.lstrip('=') + + return group + + +class CoverageCombineConfig(CoverageConfig): + """Configuration for the coverage combine command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.group_by: frozenset[str] = frozenset(args.group_by) if args.group_by else frozenset() + self.all: bool = args.all + self.stub: bool = args.stub + + # only available to coverage combine + self.export: str = args.export if 'export' in args else False diff --git a/test/lib/ansible_test/_internal/commands/coverage/erase.py b/test/lib/ansible_test/_internal/commands/coverage/erase.py new file mode 100644 index 0000000..70b685c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/erase.py @@ -0,0 +1,43 @@ +"""Erase code coverage files.""" +from __future__ import annotations + +import os + +from ...util_common import ( + ResultType, +) + +from ...executor import ( + Delegate, +) + +from ...provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageConfig, +) + + +def command_coverage_erase(args: CoverageEraseConfig) -> None: + """Erase code coverage data files collected during test runs.""" + host_state = prepare_profiles(args) # coverage erase + + if args.delegate: + raise Delegate(host_state=host_state) + + coverage_dir = ResultType.COVERAGE.path + + for name in os.listdir(coverage_dir): + if not name.startswith('coverage') and '=coverage.' not in name: + continue + + path = os.path.join(coverage_dir, name) + + if not args.explain: + os.remove(path) + + +class CoverageEraseConfig(CoverageConfig): + """Configuration for the coverage erase command.""" diff --git a/test/lib/ansible_test/_internal/commands/coverage/html.py b/test/lib/ansible_test/_internal/commands/coverage/html.py new file mode 100644 index 0000000..e3063c0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/html.py @@ -0,0 +1,51 @@ +"""Generate HTML code coverage reports.""" +from __future__ import annotations + +import os + +from ...io import ( + make_dirs, +) + +from ...util import ( + display, +) + +from ...util_common import ( + ResultType, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_html(args: CoverageHtmlConfig) -> None: + """Generate an HTML coverage report.""" + host_state = prepare_profiles(args) # coverage html + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + if output_file.endswith('-powershell'): + # coverage.py does not support non-Python files so we just skip the local html report. + display.info("Skipping output file %s in html generation" % output_file, verbosity=3) + continue + + dir_name = os.path.join(ResultType.REPORTS.path, os.path.basename(output_file)) + make_dirs(dir_name) + run_coverage(args, host_state, output_file, 'html', ['-i', '-d', dir_name]) + + display.info('HTML report generated: file:///%s' % os.path.join(dir_name, 'index.html')) + + +class CoverageHtmlConfig(CoverageCombineConfig): + """Configuration for the coverage html command.""" diff --git a/test/lib/ansible_test/_internal/commands/coverage/report.py b/test/lib/ansible_test/_internal/commands/coverage/report.py new file mode 100644 index 0000000..fadc13f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/report.py @@ -0,0 +1,152 @@ +"""Generate console code coverage reports.""" +from __future__ import annotations + +import os +import typing as t + +from ...io import ( + read_json_file, +) + +from ...util import ( + display, +) + +from ...data import ( + data_context, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_report(args: CoverageReportConfig) -> None: + """Generate a console coverage report.""" + host_state = prepare_profiles(args) # coverage report + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + if args.group_by or args.stub: + display.info('>>> Coverage Group: %s' % ' '.join(os.path.basename(output_file).split('=')[1:])) + + if output_file.endswith('-powershell'): + display.info(_generate_powershell_output_report(args, output_file)) + else: + options = [] + + if args.show_missing: + options.append('--show-missing') + + if args.include: + options.extend(['--include', args.include]) + + if args.omit: + options.extend(['--omit', args.omit]) + + run_coverage(args, host_state, output_file, 'report', options) + + +def _generate_powershell_output_report(args: CoverageReportConfig, coverage_file: str) -> str: + """Generate and return a PowerShell coverage report for the given coverage file.""" + coverage_info = read_json_file(coverage_file) + + root_path = data_context().content.root + '/' + + name_padding = 7 + cover_padding = 8 + + file_report = [] + total_stmts = 0 + total_miss = 0 + + for filename in sorted(coverage_info.keys()): + hit_info = coverage_info[filename] + + if filename.startswith(root_path): + filename = filename[len(root_path):] + + if args.omit and filename in args.omit: + continue + if args.include and filename not in args.include: + continue + + stmts = len(hit_info) + miss = len([hit for hit in hit_info.values() if hit == 0]) + + name_padding = max(name_padding, len(filename) + 3) + + total_stmts += stmts + total_miss += miss + + cover = "{0}%".format(int((stmts - miss) / stmts * 100)) + + missing = [] + current_missing = None + sorted_lines = sorted([int(x) for x in hit_info.keys()]) + for idx, line in enumerate(sorted_lines): + hit = hit_info[str(line)] + if hit == 0 and current_missing is None: + current_missing = line + elif hit != 0 and current_missing is not None: + end_line = sorted_lines[idx - 1] + if current_missing == end_line: + missing.append(str(current_missing)) + else: + missing.append('%s-%s' % (current_missing, end_line)) + current_missing = None + + if current_missing is not None: + end_line = sorted_lines[-1] + if current_missing == end_line: + missing.append(str(current_missing)) + else: + missing.append('%s-%s' % (current_missing, end_line)) + + file_report.append({'name': filename, 'stmts': stmts, 'miss': miss, 'cover': cover, 'missing': missing}) + + if total_stmts == 0: + return '' + + total_percent = '{0}%'.format(int((total_stmts - total_miss) / total_stmts * 100)) + stmts_padding = max(8, len(str(total_stmts))) + miss_padding = max(7, len(str(total_miss))) + + line_length = name_padding + stmts_padding + miss_padding + cover_padding + + header = 'Name'.ljust(name_padding) + 'Stmts'.rjust(stmts_padding) + 'Miss'.rjust(miss_padding) + \ + 'Cover'.rjust(cover_padding) + + if args.show_missing: + header += 'Lines Missing'.rjust(16) + line_length += 16 + + line_break = '-' * line_length + lines = ['%s%s%s%s%s' % (f['name'].ljust(name_padding), str(f['stmts']).rjust(stmts_padding), + str(f['miss']).rjust(miss_padding), f['cover'].rjust(cover_padding), + ' ' + ', '.join(f['missing']) if args.show_missing else '') + for f in file_report] + totals = 'TOTAL'.ljust(name_padding) + str(total_stmts).rjust(stmts_padding) + \ + str(total_miss).rjust(miss_padding) + total_percent.rjust(cover_padding) + + report = '{0}\n{1}\n{2}\n{1}\n{3}'.format(header, line_break, "\n".join(lines), totals) + return report + + +class CoverageReportConfig(CoverageCombineConfig): + """Configuration for the coverage report command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.show_missing: bool = args.show_missing + self.include: str = args.include + self.omit: str = args.omit diff --git a/test/lib/ansible_test/_internal/commands/coverage/xml.py b/test/lib/ansible_test/_internal/commands/coverage/xml.py new file mode 100644 index 0000000..243c9a9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/xml.py @@ -0,0 +1,189 @@ +"""Generate XML code coverage reports.""" +from __future__ import annotations + +import os +import time + +from xml.etree.ElementTree import ( + Comment, + Element, + SubElement, + tostring, +) + +from xml.dom import ( + minidom, +) + +from ...io import ( + make_dirs, + read_json_file, +) + +from ...util_common import ( + ResultType, + write_text_test_results, +) + +from ...util import ( + get_ansible_version, +) + +from ...data import ( + data_context, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_xml(args: CoverageXmlConfig) -> None: + """Generate an XML coverage report.""" + host_state = prepare_profiles(args) # coverage xml + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + xml_name = '%s.xml' % os.path.basename(output_file) + if output_file.endswith('-powershell'): + report = _generate_powershell_xml(output_file) + + rough_string = tostring(report, 'utf-8') + reparsed = minidom.parseString(rough_string) + pretty = reparsed.toprettyxml(indent=' ') + + write_text_test_results(ResultType.REPORTS, xml_name, pretty) + else: + xml_path = os.path.join(ResultType.REPORTS.path, xml_name) + make_dirs(ResultType.REPORTS.path) + run_coverage(args, host_state, output_file, 'xml', ['-i', '-o', xml_path]) + + +def _generate_powershell_xml(coverage_file: str) -> Element: + """Generate a PowerShell coverage report XML element from the specified coverage file and return it.""" + coverage_info = read_json_file(coverage_file) + + content_root = data_context().content.root + is_ansible = data_context().content.is_ansible + + packages: dict[str, dict[str, dict[str, int]]] = {} + for path, results in coverage_info.items(): + filename = os.path.splitext(os.path.basename(path))[0] + + if filename.startswith('Ansible.ModuleUtils'): + package = 'ansible.module_utils' + elif is_ansible: + package = 'ansible.modules' + else: + rel_path = path[len(content_root) + 1:] + plugin_type = "modules" if rel_path.startswith("plugins/modules") else "module_utils" + package = 'ansible_collections.%splugins.%s' % (data_context().content.collection.prefix, plugin_type) + + if package not in packages: + packages[package] = {} + + packages[package][path] = results + + elem_coverage = Element('coverage') + elem_coverage.append( + Comment(' Generated by ansible-test from the Ansible project: https://www.ansible.com/ ')) + elem_coverage.append( + Comment(' Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd ')) + + elem_sources = SubElement(elem_coverage, 'sources') + + elem_source = SubElement(elem_sources, 'source') + elem_source.text = data_context().content.root + + elem_packages = SubElement(elem_coverage, 'packages') + + total_lines_hit = 0 + total_line_count = 0 + + for package_name, package_data in packages.items(): + lines_hit, line_count = _add_cobertura_package(elem_packages, package_name, package_data) + + total_lines_hit += lines_hit + total_line_count += line_count + + elem_coverage.attrib.update({ + 'branch-rate': '0', + 'branches-covered': '0', + 'branches-valid': '0', + 'complexity': '0', + 'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0", + 'lines-covered': str(total_line_count), + 'lines-valid': str(total_lines_hit), + 'timestamp': str(int(time.time())), + 'version': get_ansible_version(), + }) + + return elem_coverage + + +def _add_cobertura_package(packages: Element, package_name: str, package_data: dict[str, dict[str, int]]) -> tuple[int, int]: + """Add a package element to the given packages element.""" + elem_package = SubElement(packages, 'package') + elem_classes = SubElement(elem_package, 'classes') + + total_lines_hit = 0 + total_line_count = 0 + + for path, results in package_data.items(): + lines_hit = len([True for hits in results.values() if hits]) + line_count = len(results) + + total_lines_hit += lines_hit + total_line_count += line_count + + elem_class = SubElement(elem_classes, 'class') + + class_name = os.path.splitext(os.path.basename(path))[0] + if class_name.startswith("Ansible.ModuleUtils"): + class_name = class_name[20:] + + content_root = data_context().content.root + filename = path + if filename.startswith(content_root): + filename = filename[len(content_root) + 1:] + + elem_class.attrib.update({ + 'branch-rate': '0', + 'complexity': '0', + 'filename': filename, + 'line-rate': str(round(lines_hit / line_count, 4)) if line_count else "0", + 'name': class_name, + }) + + SubElement(elem_class, 'methods') + + elem_lines = SubElement(elem_class, 'lines') + + for number, hits in results.items(): + elem_line = SubElement(elem_lines, 'line') + elem_line.attrib.update( + hits=str(hits), + number=str(number), + ) + + elem_package.attrib.update({ + 'branch-rate': '0', + 'complexity': '0', + 'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0", + 'name': package_name, + }) + + return total_lines_hit, total_line_count + + +class CoverageXmlConfig(CoverageCombineConfig): + """Configuration for the coverage xml command.""" |