summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py
blob: 127b5b7f1fd7d0243b2fb06d4bffab5881a13f98 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
from __future__ import annotations

import os
import typing as t

from .....encoding import (
    to_text,
)

from .....data import (
    data_context,
)

from .....util_common import (
    ResultType,
)

from .....executor import (
    Delegate,
)

from .....provisioning import (
    prepare_profiles,
    HostState,
)

from ... import (
    enumerate_powershell_lines,
    enumerate_python_arcs,
    get_collection_path_regexes,
    get_powershell_coverage_files,
    get_python_coverage_files,
    get_python_modules,
    initialize_coverage,
    PathChecker,
)

from . import (
    CoverageAnalyzeTargetsConfig,
    get_target_index,
    make_report,
    write_report,
)

from . import (
    Arcs,
    Lines,
    TargetIndexes,
)


class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
    """Configuration for the `coverage analyze targets generate` command."""
    def __init__(self, args: t.Any) -> None:
        super().__init__(args)

        self.input_dir: str = args.input_dir or ResultType.COVERAGE.path
        self.output_file: str = args.output_file


def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenerateConfig) -> None:
    """Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
    host_state = prepare_profiles(args)  # coverage analyze targets generate

    if args.delegate:
        raise Delegate(host_state)

    root = data_context().content.root
    target_indexes: TargetIndexes = {}
    arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items())
    lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items())
    report = make_report(target_indexes, arcs, lines)
    write_report(args, report, args.output_file)


def analyze_python_coverage(
    args: CoverageAnalyzeTargetsGenerateConfig,
    host_state: HostState,
    path: str,
    target_indexes: TargetIndexes,
) -> Arcs:
    """Analyze Python code coverage."""
    results: Arcs = {}
    collection_search_re, collection_sub_re = get_collection_path_regexes()
    modules = get_python_modules()
    python_files = get_python_coverage_files(path)
    coverage = initialize_coverage(args, host_state)

    for python_file in python_files:
        if not is_integration_coverage_file(python_file):
            continue

        target_name = get_target_name(python_file)
        target_index = get_target_index(target_name, target_indexes)

        for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re):
            arcs = results.setdefault(filename, {})

            for covered_arc in covered_arcs:
                arc = arcs.setdefault(covered_arc, set())
                arc.add(target_index)

    prune_invalid_filenames(args, results, collection_search_re=collection_search_re)

    return results


def analyze_powershell_coverage(
    args: CoverageAnalyzeTargetsGenerateConfig,
    path: str,
    target_indexes: TargetIndexes,
) -> Lines:
    """Analyze PowerShell code coverage"""
    results: Lines = {}
    collection_search_re, collection_sub_re = get_collection_path_regexes()
    powershell_files = get_powershell_coverage_files(path)

    for powershell_file in powershell_files:
        if not is_integration_coverage_file(powershell_file):
            continue

        target_name = get_target_name(powershell_file)
        target_index = get_target_index(target_name, target_indexes)

        for filename, hits in enumerate_powershell_lines(powershell_file, collection_search_re, collection_sub_re):
            lines = results.setdefault(filename, {})

            for covered_line in hits:
                line = lines.setdefault(covered_line, set())
                line.add(target_index)

    prune_invalid_filenames(args, results)

    return results


def prune_invalid_filenames(
    args: CoverageAnalyzeTargetsGenerateConfig,
    results: dict[str, t.Any],
    collection_search_re: t.Optional[t.Pattern] = None,
) -> None:
    """Remove invalid filenames from the given result set."""
    path_checker = PathChecker(args, collection_search_re)

    for path in list(results.keys()):
        if not path_checker.check_path(path):
            del results[path]


def get_target_name(path: str) -> str:
    """Extract the test target name from the given coverage path."""
    return to_text(os.path.basename(path).split('=')[1])


def is_integration_coverage_file(path: str) -> bool:
    """Returns True if the coverage file came from integration tests, otherwise False."""
    return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration')