summaryrefslogtreecommitdiffstats
path: root/drivers/gpu/drm/ci/xfails/update-xfails.py
blob: e9f0ec7fed8d7cf1c4b3aa7d7f00e0c789555bfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3

import argparse
from collections import defaultdict
import difflib
import os
import re
from glcollate import Collate
from termcolor import colored
from urllib.parse import urlparse


def get_canonical_name(job_name):
    return re.split(r" \d+/\d+", job_name)[0]


def get_xfails_file_path(job_name, suffix):
    canonical_name = get_canonical_name(job_name)
    name = canonical_name.replace(":", "-")
    script_dir = os.path.dirname(os.path.abspath(__file__))
    return os.path.join(script_dir, f"{name}-{suffix}.txt")


def get_unit_test_name_and_results(unit_test):
    if "Artifact results/failures.csv not found" in unit_test or '' == unit_test:
        return None, None
    unit_test_name, unit_test_result = unit_test.strip().split(",")
    return unit_test_name, unit_test_result


def read_file(file_path):
    try:
        with open(file_path, "r") as file:
            f = file.readlines()
            if len(f):
                f[-1] = f[-1].strip() + "\n"
            return f
    except FileNotFoundError:
        return []


def save_file(content, file_path):
    # delete file is content is empty
    if not content or not any(content):
        if os.path.exists(file_path):
            os.remove(file_path)
        return

    with open(file_path, "w") as file:
        file.writelines(content)


def is_test_present_on_file(file_content, unit_test_name):
    return any(unit_test_name in line for line in file_content)


def is_unit_test_present_in_other_jobs(unit_test, job_ids):
    return all(unit_test in job_ids[job_id] for job_id in job_ids)


def remove_unit_test_if_present(lines, unit_test_name):
    if not is_test_present_on_file(lines, unit_test_name):
        return
    lines[:] = [line for line in lines if unit_test_name not in line]


def add_unit_test_if_not_present(lines, unit_test_name, file_name):
    # core_getversion is mandatory
    if "core_getversion" in unit_test_name:
        print("WARNING: core_getversion should pass, not adding it to", os.path.basename(file_name))
    elif all(unit_test_name not in line for line in lines):
        lines.append(unit_test_name + "\n")


def update_unit_test_result_in_fails_txt(fails_txt, unit_test):
    unit_test_name, unit_test_result = get_unit_test_name_and_results(unit_test)
    for i, line in enumerate(fails_txt):
        if unit_test_name in line:
            _, current_result = get_unit_test_name_and_results(line)
            fails_txt[i] = unit_test + "\n"
            return


def add_unit_test_or_update_result_to_fails_if_present(fails_txt, unit_test, fails_txt_path):
    unit_test_name, _ = get_unit_test_name_and_results(unit_test)
    if not is_test_present_on_file(fails_txt, unit_test_name):
        add_unit_test_if_not_present(fails_txt, unit_test, fails_txt_path)
    # if it is present but not with the same result
    elif not is_test_present_on_file(fails_txt, unit_test):
        update_unit_test_result_in_fails_txt(fails_txt, unit_test)


def split_unit_test_from_collate(xfails):
    for job_name in xfails.keys():
        for job_id in xfails[job_name].copy().keys():
            if "not found" in xfails[job_name][job_id]:
                del xfails[job_name][job_id]
                continue
            xfails[job_name][job_id] = xfails[job_name][job_id].strip().split("\n")


def get_xfails_from_pipeline_url(pipeline_url):
    parsed_url = urlparse(pipeline_url)
    path_components = parsed_url.path.strip("/").split("/")

    namespace = path_components[0]
    project = path_components[1]
    pipeline_id = path_components[-1]

    print("Collating from:", namespace, project, pipeline_id)
    xfails = (
        Collate(namespace=namespace, project=project)
        .from_pipeline(pipeline_id)
        .get_artifact("results/failures.csv")
    )

    split_unit_test_from_collate(xfails)
    return xfails


def get_xfails_from_pipeline_urls(pipelines_urls):
    xfails = defaultdict(dict)

    for url in pipelines_urls:
        new_xfails = get_xfails_from_pipeline_url(url)
        for key in new_xfails:
            xfails[key].update(new_xfails[key])

    return xfails


def print_diff(old_content, new_content, file_name):
    diff = difflib.unified_diff(old_content, new_content, lineterm="", fromfile=file_name, tofile=file_name)
    diff = [colored(line, "green") if line.startswith("+") else
            colored(line, "red") if line.startswith("-") else line for line in diff]
    print("\n".join(diff[:3]))
    print("".join(diff[3:]))


def main(pipelines_urls, only_flakes):
    xfails = get_xfails_from_pipeline_urls(pipelines_urls)

    for job_name in xfails.keys():
        fails_txt_path = get_xfails_file_path(job_name, "fails")
        flakes_txt_path = get_xfails_file_path(job_name, "flakes")

        fails_txt = read_file(fails_txt_path)
        flakes_txt = read_file(flakes_txt_path)

        fails_txt_original = fails_txt.copy()
        flakes_txt_original = flakes_txt.copy()

        for job_id in xfails[job_name].keys():
            for unit_test in xfails[job_name][job_id]:
                unit_test_name, unit_test_result = get_unit_test_name_and_results(unit_test)

                if not unit_test_name:
                    continue

                if only_flakes:
                    remove_unit_test_if_present(fails_txt, unit_test_name)
                    add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
                    continue

                # drop it from flakes if it is present to analyze it again
                remove_unit_test_if_present(flakes_txt, unit_test_name)

                if unit_test_result == "UnexpectedPass":
                    remove_unit_test_if_present(fails_txt, unit_test_name)
                    # flake result
                    if not is_unit_test_present_in_other_jobs(unit_test, xfails[job_name]):
                        add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
                    continue

                # flake result
                if not is_unit_test_present_in_other_jobs(unit_test, xfails[job_name]):
                    remove_unit_test_if_present(fails_txt, unit_test_name)
                    add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
                    continue

                # consistent result
                add_unit_test_or_update_result_to_fails_if_present(fails_txt, unit_test,
                                                                   fails_txt_path)

        fails_txt.sort()
        flakes_txt.sort()

        if fails_txt != fails_txt_original:
            save_file(fails_txt, fails_txt_path)
            print_diff(fails_txt_original, fails_txt, os.path.basename(fails_txt_path))
        if flakes_txt != flakes_txt_original:
            save_file(flakes_txt, flakes_txt_path)
            print_diff(flakes_txt_original, flakes_txt, os.path.basename(flakes_txt_path))


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Update xfails from a given pipeline.")
    parser.add_argument("pipeline_urls", nargs="+", type=str, help="URLs to the pipelines to analyze the failures.")
    parser.add_argument("--only-flakes", action="store_true", help="Treat every detected failure as a flake, edit *-flakes.txt only.")

    args = parser.parse_args()

    main(args.pipeline_urls, args.only_flakes)
    print("Done.")