1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import concurrent.futures as futures
import json
import logging
import sys
from slugid import nice as slugid
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.taskcluster import CONCURRENCY, get_session
from taskgraph.util.time import current_json_time
logger = logging.getLogger(__name__)
# this is set to true for `mach taskgraph action-callback --test`
testing = False
def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
taskid_to_label = {t: l for l, t in label_to_taskid.items()}
# when running as an actual decision task, we use the decision task's
# taskId as the taskGroupId. The process that created the decision task
# helpfully placed it in this same taskGroup. If there is no $TASK_ID,
# fall back to a slugid
scheduler_id = "{}-level-{}".format(graph_config["trust-domain"], params["level"])
# Add the taskGroupId, schedulerId and optionally the decision task
# dependency
for task_id in taskgraph.graph.nodes:
task_def = taskgraph.tasks[task_id].task
# if this task has no dependencies *within* this taskgraph, make it
# depend on this decision task. If it has another dependency within
# the taskgraph, then it already implicitly depends on the decision
# task. The result is that tasks do not start immediately. if this
# loop fails halfway through, none of the already-created tasks run.
if not any(t in taskgraph.tasks for t in task_def.get("dependencies", [])):
task_def.setdefault("dependencies", []).append(decision_task_id)
task_def["taskGroupId"] = decision_task_id
task_def["schedulerId"] = scheduler_id
# If `testing` is True, then run without parallelization
concurrency = CONCURRENCY if not testing else 1
session = get_session()
with futures.ThreadPoolExecutor(concurrency) as e:
fs = {}
# We can't submit a task until its dependencies have been submitted.
# So our strategy is to walk the graph and submit tasks once all
# their dependencies have been submitted.
tasklist = set(taskgraph.graph.visit_postorder())
alltasks = tasklist.copy()
def schedule_tasks():
# bail out early if any futures have failed
if any(f.done() and f.exception() for f in fs.values()):
return
to_remove = set()
new = set()
def submit(task_id, label, task_def):
fut = e.submit(create_task, session, task_id, label, task_def)
new.add(fut)
fs[task_id] = fut
for task_id in tasklist:
task_def = taskgraph.tasks[task_id].task
# If we haven't finished submitting all our dependencies yet,
# come back to this later.
# Some dependencies aren't in our graph, so make sure to filter
# those out
deps = set(task_def.get("dependencies", [])) & alltasks
if any((d not in fs or not fs[d].done()) for d in deps):
continue
submit(task_id, taskid_to_label[task_id], task_def)
to_remove.add(task_id)
# Schedule tasks as many times as task_duplicates indicates
attributes = taskgraph.tasks[task_id].attributes
for i in range(1, attributes.get("task_duplicates", 1)):
# We use slugid() since we want a distinct task id
submit(slugid(), taskid_to_label[task_id], task_def)
tasklist.difference_update(to_remove)
# as each of those futures complete, try to schedule more tasks
for f in futures.as_completed(new):
schedule_tasks()
# start scheduling tasks and run until everything is scheduled
schedule_tasks()
# check the result of each future, raising an exception if it failed
for f in futures.as_completed(fs.values()):
f.result()
def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this job.
# Resolve timestamps
now = current_json_time(datetime_format=True)
task_def = resolve_timestamps(now, task_def)
if testing:
json.dump(
[task_id, task_def],
sys.stdout,
sort_keys=True,
indent=4,
separators=(",", ": "),
)
# add a newline
print("")
return
logger.info(f"Creating task with taskId {task_id} for {label}")
res = session.put(f"http://taskcluster/queue/v1/task/{task_id}", json=task_def)
if res.status_code != 200:
try:
logger.error(res.json()["message"])
except Exception:
logger.error(res.text)
res.raise_for_status()
|