1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import json
import os
import subprocess
import pytest
from gecko_taskgraph import GECKO
from mozunit import main
from taskgraph.taskgraph import TaskGraph
pytestmark = pytest.mark.slow
PARAMS_DIR = os.path.join(GECKO, "taskcluster", "test", "params")
@pytest.fixture(scope="module")
def get_graph_from_spec(tmpdir_factory):
outdir = tmpdir_factory.mktemp("graphs")
# Use a mach subprocess to leverage the auto parallelization of
# parameters when specifying a directory.
cmd = [
"./mach",
"taskgraph",
"morphed",
"--json",
f"--parameters={PARAMS_DIR}",
f"--output-file={outdir}/graph.json",
]
subprocess.run(cmd, cwd=GECKO)
assert len(outdir.listdir()) > 0
def inner(param_spec):
outfile = f"{outdir}/graph_{param_spec}.json"
with open(outfile) as fh:
output = fh.read()
try:
return TaskGraph.from_json(json.loads(output))[1]
except ValueError:
return output
return inner
@pytest.mark.parametrize(
"param_spec", [os.path.splitext(p)[0] for p in os.listdir(PARAMS_DIR)]
)
def test_generate_graphs(get_graph_from_spec, param_spec):
ret = get_graph_from_spec(param_spec)
if isinstance(ret, str):
print(ret)
pytest.fail("An exception was raised during graph generation!")
assert isinstance(ret, TaskGraph)
assert len(ret.tasks) > 0
if __name__ == "__main__":
main()
|