# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from __future__ import absolute_import, print_function, unicode_literals import concurrent.futures as futures import json import sys import logging import six from slugid import nice as slugid from taskgraph.util.parameterization import resolve_timestamps from taskgraph.util.time import current_json_time from taskgraph.util.taskcluster import get_session, CONCURRENCY logger = logging.getLogger(__name__) # this is set to true for `mach taskgraph action-callback --test` testing = False def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id): taskid_to_label = {t: l for l, t in six.iteritems(label_to_taskid)} # when running as an actual decision task, we use the decision task's # taskId as the taskGroupId. The process that created the decision task # helpfully placed it in this same taskGroup. If there is no $TASK_ID, # fall back to a slugid scheduler_id = "{}-level-{}".format(graph_config["trust-domain"], params["level"]) # Add the taskGroupId, schedulerId and optionally the decision task # dependency for task_id in taskgraph.graph.nodes: task_def = taskgraph.tasks[task_id].task # if this task has no dependencies *within* this taskgraph, make it # depend on this decision task. If it has another dependency within # the taskgraph, then it already implicitly depends on the decision # task. The result is that tasks do not start immediately. if this # loop fails halfway through, none of the already-created tasks run. if not any(t in taskgraph.tasks for t in task_def.get("dependencies", [])): task_def.setdefault("dependencies", []).append(decision_task_id) task_def["taskGroupId"] = decision_task_id task_def["schedulerId"] = scheduler_id # If `testing` is True, then run without parallelization concurrency = CONCURRENCY if not testing else 1 session = get_session() with futures.ThreadPoolExecutor(concurrency) as e: fs = {} # We can't submit a task until its dependencies have been submitted. # So our strategy is to walk the graph and submit tasks once all # their dependencies have been submitted. tasklist = set(taskgraph.graph.visit_postorder()) alltasks = tasklist.copy() def schedule_tasks(): # bail out early if any futures have failed if any(f.done() and f.exception() for f in fs.values()): return to_remove = set() new = set() def submit(task_id, label, task_def): fut = e.submit(create_task, session, task_id, label, task_def) new.add(fut) fs[task_id] = fut for task_id in tasklist: task_def = taskgraph.tasks[task_id].task # If we haven't finished submitting all our dependencies yet, # come back to this later. # Some dependencies aren't in our graph, so make sure to filter # those out deps = set(task_def.get("dependencies", [])) & alltasks if any((d not in fs or not fs[d].done()) for d in deps): continue submit(task_id, taskid_to_label[task_id], task_def) to_remove.add(task_id) # Schedule tasks as many times as task_duplicates indicates attributes = taskgraph.tasks[task_id].attributes for i in range(1, attributes.get("task_duplicates", 1)): # We use slugid() since we want a distinct task id submit(slugid().decode("ascii"), taskid_to_label[task_id], task_def) tasklist.difference_update(to_remove) # as each of those futures complete, try to schedule more tasks for f in futures.as_completed(new): schedule_tasks() # start scheduling tasks and run until everything is scheduled schedule_tasks() # check the result of each future, raising an exception if it failed for f in futures.as_completed(fs.values()): f.result() def create_task(session, task_id, label, task_def): # create the task using 'http://taskcluster/queue', which is proxied to the queue service # with credentials appropriate to this job. # Resolve timestamps now = current_json_time(datetime_format=True) task_def = resolve_timestamps(now, task_def) if testing: json.dump( [task_id, task_def], sys.stdout, sort_keys=True, indent=4, separators=(",", ": "), ) # add a newline print("") return logger.debug("Creating task with taskId {} for {}".format(task_id, label)) res = session.put( "http://taskcluster/queue/v1/task/{}".format(task_id), data=json.dumps(task_def) ) if res.status_code != 200: try: logger.error(res.json()["message"]) except Exception: logger.error(res.text) res.raise_for_status()