summaryrefslogtreecommitdiffstats
path: root/taskcluster/taskgraph/create.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskcluster/taskgraph/create.py')
-rw-r--r--taskcluster/taskgraph/create.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/taskcluster/taskgraph/create.py b/taskcluster/taskgraph/create.py
new file mode 100644
index 0000000000..07cd1ce4f3
--- /dev/null
+++ b/taskcluster/taskgraph/create.py
@@ -0,0 +1,136 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import concurrent.futures as futures
+import json
+import sys
+import logging
+
+import six
+
+from slugid import nice as slugid
+from taskgraph.util.parameterization import resolve_timestamps
+from taskgraph.util.time import current_json_time
+from taskgraph.util.taskcluster import get_session, CONCURRENCY
+
+logger = logging.getLogger(__name__)
+
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
+
+
+def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
+ taskid_to_label = {t: l for l, t in six.iteritems(label_to_taskid)}
+
+ # when running as an actual decision task, we use the decision task's
+ # taskId as the taskGroupId. The process that created the decision task
+ # helpfully placed it in this same taskGroup. If there is no $TASK_ID,
+ # fall back to a slugid
+ scheduler_id = "{}-level-{}".format(graph_config["trust-domain"], params["level"])
+
+ # Add the taskGroupId, schedulerId and optionally the decision task
+ # dependency
+ for task_id in taskgraph.graph.nodes:
+ task_def = taskgraph.tasks[task_id].task
+
+ # if this task has no dependencies *within* this taskgraph, make it
+ # depend on this decision task. If it has another dependency within
+ # the taskgraph, then it already implicitly depends on the decision
+ # task. The result is that tasks do not start immediately. if this
+ # loop fails halfway through, none of the already-created tasks run.
+ if not any(t in taskgraph.tasks for t in task_def.get("dependencies", [])):
+ task_def.setdefault("dependencies", []).append(decision_task_id)
+
+ task_def["taskGroupId"] = decision_task_id
+ task_def["schedulerId"] = scheduler_id
+
+ # If `testing` is True, then run without parallelization
+ concurrency = CONCURRENCY if not testing else 1
+ session = get_session()
+ with futures.ThreadPoolExecutor(concurrency) as e:
+ fs = {}
+
+ # We can't submit a task until its dependencies have been submitted.
+ # So our strategy is to walk the graph and submit tasks once all
+ # their dependencies have been submitted.
+ tasklist = set(taskgraph.graph.visit_postorder())
+ alltasks = tasklist.copy()
+
+ def schedule_tasks():
+ # bail out early if any futures have failed
+ if any(f.done() and f.exception() for f in fs.values()):
+ return
+
+ to_remove = set()
+ new = set()
+
+ def submit(task_id, label, task_def):
+ fut = e.submit(create_task, session, task_id, label, task_def)
+ new.add(fut)
+ fs[task_id] = fut
+
+ for task_id in tasklist:
+ task_def = taskgraph.tasks[task_id].task
+ # If we haven't finished submitting all our dependencies yet,
+ # come back to this later.
+ # Some dependencies aren't in our graph, so make sure to filter
+ # those out
+ deps = set(task_def.get("dependencies", [])) & alltasks
+ if any((d not in fs or not fs[d].done()) for d in deps):
+ continue
+
+ submit(task_id, taskid_to_label[task_id], task_def)
+ to_remove.add(task_id)
+
+ # Schedule tasks as many times as task_duplicates indicates
+ attributes = taskgraph.tasks[task_id].attributes
+ for i in range(1, attributes.get("task_duplicates", 1)):
+ # We use slugid() since we want a distinct task id
+ submit(slugid().decode("ascii"), taskid_to_label[task_id], task_def)
+ tasklist.difference_update(to_remove)
+
+ # as each of those futures complete, try to schedule more tasks
+ for f in futures.as_completed(new):
+ schedule_tasks()
+
+ # start scheduling tasks and run until everything is scheduled
+ schedule_tasks()
+
+ # check the result of each future, raising an exception if it failed
+ for f in futures.as_completed(fs.values()):
+ f.result()
+
+
+def create_task(session, task_id, label, task_def):
+ # create the task using 'http://taskcluster/queue', which is proxied to the queue service
+ # with credentials appropriate to this job.
+
+ # Resolve timestamps
+ now = current_json_time(datetime_format=True)
+ task_def = resolve_timestamps(now, task_def)
+
+ if testing:
+ json.dump(
+ [task_id, task_def],
+ sys.stdout,
+ sort_keys=True,
+ indent=4,
+ separators=(",", ": "),
+ )
+ # add a newline
+ print("")
+ return
+
+ logger.debug("Creating task with taskId {} for {}".format(task_id, label))
+ res = session.put(
+ "http://taskcluster/queue/v1/task/{}".format(task_id), data=json.dumps(task_def)
+ )
+ if res.status_code != 200:
+ try:
+ logger.error(res.json()["message"])
+ except Exception:
+ logger.error(res.text)
+ res.raise_for_status()