1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# mypy: allow-untyped-defs
import argparse
import os
import re
from ..wpt.testfiles import branch_point, files_changed
from tools import localpaths # noqa: F401
wpt_root = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
# Common exclusions between affected_tests and stability jobs.
# Files in these dirs would trigger the execution of too many tests.
EXCLUDES = [
"!tools/",
"!docs/",
"!conformance-checkers/",
"!.*/OWNERS",
"!.*/META.yml",
"!.*/tools/",
"!.*/README",
"!css/[^/]*$"
]
# Rules are just regex on the path, with a leading ! indicating a regex that must not
# match for the job. Paths should be kept in sync with update-built-tests.sh.
job_path_map = {
"affected_tests": [".*/.*", "!resources/(?!idlharness.js)"] + EXCLUDES,
"stability": [".*/.*", "!resources/.*"] + EXCLUDES,
"lint": [".*"],
"manifest_upload": [".*"],
"resources_unittest": ["resources/", "tools/"],
"tools_unittest": ["tools/"],
"wptrunner_unittest": ["tools/"],
"update_built": ["update-built-tests\\.sh",
"conformance-checkers/",
"css/css-ui/",
"css/css-writing-modes/",
"html/",
"infrastructure/",
"mimesniff/"],
"wpt_integration": ["tools/"],
"wptrunner_infrastructure": ["infrastructure/",
"tools/",
"resources/",
"webdriver/tests/support"],
}
def _path_norm(path):
"""normalize a path for both case and slashes (to /)"""
path = os.path.normcase(path)
if os.path.sep != "/":
# this must be after the normcase call as that does slash normalization
path = path.replace(os.path.sep, "/")
return path
class Ruleset:
def __init__(self, rules):
self.include = []
self.exclude = []
for rule in rules:
rule = _path_norm(rule)
self.add_rule(rule)
def add_rule(self, rule):
if rule.startswith("!"):
target = self.exclude
rule = rule[1:]
else:
target = self.include
target.append(re.compile("^%s" % rule))
def __call__(self, path):
path = _path_norm(path)
for item in self.exclude:
if item.match(path):
return False
for item in self.include:
if item.match(path):
return True
return False
def __repr__(self):
subs = tuple(",".join(item.pattern for item in target)
for target in (self.include, self.exclude))
return "Rules<include:[%s] exclude:[%s]>" % subs
def get_paths(**kwargs):
if kwargs["revish"] is None:
revish = "%s..HEAD" % branch_point()
else:
revish = kwargs["revish"]
changed, _ = files_changed(revish, ignore_rules=[])
all_changed = {os.path.relpath(item, wpt_root) for item in set(changed)}
return all_changed
def get_jobs(paths, **kwargs):
if kwargs.get("all"):
return set(job_path_map.keys())
jobs = set()
rules = {}
includes = kwargs.get("includes")
if includes is not None:
includes = set(includes)
for key, value in job_path_map.items():
if includes is None or key in includes:
rules[key] = Ruleset(value)
for path in paths:
for job in list(rules.keys()):
ruleset = rules[job]
if ruleset(path):
rules.pop(job)
jobs.add(job)
if not rules:
break
# Default jobs should run even if there were no changes
if not paths:
for job, path_re in job_path_map.items():
if ".*" in path_re:
jobs.add(job)
return jobs
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument("revish", default=None, help="Commits to consider. Defaults to the commits on the current branch", nargs="?")
parser.add_argument("--all", help="List all jobs unconditionally.", action="store_true")
parser.add_argument("--includes", default=None, help="Jobs to check for. Return code is 0 if all jobs are found, otherwise 1", nargs="*")
return parser
def run(**kwargs):
paths = get_paths(**kwargs)
jobs = get_jobs(paths, **kwargs)
if not kwargs["includes"]:
for item in sorted(jobs):
print(item)
else:
return 0 if set(kwargs["includes"]).issubset(jobs) else 1
|