summaryrefslogtreecommitdiffstats
path: root/taskcluster/taskgraph/create.py
blob: 07cd1ce4f38a2026febcba1341c972ca86ec7d7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import absolute_import, print_function, unicode_literals

import concurrent.futures as futures
import json
import sys
import logging

import six

from slugid import nice as slugid
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.time import current_json_time
from taskgraph.util.taskcluster import get_session, CONCURRENCY

logger = logging.getLogger(__name__)

# this is set to true for `mach taskgraph action-callback --test`
testing = False


def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
    taskid_to_label = {t: l for l, t in six.iteritems(label_to_taskid)}

    # when running as an actual decision task, we use the decision task's
    # taskId as the taskGroupId.  The process that created the decision task
    # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
    # fall back to a slugid
    scheduler_id = "{}-level-{}".format(graph_config["trust-domain"], params["level"])

    # Add the taskGroupId, schedulerId and optionally the decision task
    # dependency
    for task_id in taskgraph.graph.nodes:
        task_def = taskgraph.tasks[task_id].task

        # if this task has no dependencies *within* this taskgraph, make it
        # depend on this decision task. If it has another dependency within
        # the taskgraph, then it already implicitly depends on the decision
        # task.  The result is that tasks do not start immediately. if this
        # loop fails halfway through, none of the already-created tasks run.
        if not any(t in taskgraph.tasks for t in task_def.get("dependencies", [])):
            task_def.setdefault("dependencies", []).append(decision_task_id)

        task_def["taskGroupId"] = decision_task_id
        task_def["schedulerId"] = scheduler_id

    # If `testing` is True, then run without parallelization
    concurrency = CONCURRENCY if not testing else 1
    session = get_session()
    with futures.ThreadPoolExecutor(concurrency) as e:
        fs = {}

        # We can't submit a task until its dependencies have been submitted.
        # So our strategy is to walk the graph and submit tasks once all
        # their dependencies have been submitted.
        tasklist = set(taskgraph.graph.visit_postorder())
        alltasks = tasklist.copy()

        def schedule_tasks():
            # bail out early if any futures have failed
            if any(f.done() and f.exception() for f in fs.values()):
                return

            to_remove = set()
            new = set()

            def submit(task_id, label, task_def):
                fut = e.submit(create_task, session, task_id, label, task_def)
                new.add(fut)
                fs[task_id] = fut

            for task_id in tasklist:
                task_def = taskgraph.tasks[task_id].task
                # If we haven't finished submitting all our dependencies yet,
                # come back to this later.
                # Some dependencies aren't in our graph, so make sure to filter
                # those out
                deps = set(task_def.get("dependencies", [])) & alltasks
                if any((d not in fs or not fs[d].done()) for d in deps):
                    continue

                submit(task_id, taskid_to_label[task_id], task_def)
                to_remove.add(task_id)

                # Schedule tasks as many times as task_duplicates indicates
                attributes = taskgraph.tasks[task_id].attributes
                for i in range(1, attributes.get("task_duplicates", 1)):
                    # We use slugid() since we want a distinct task id
                    submit(slugid().decode("ascii"), taskid_to_label[task_id], task_def)
            tasklist.difference_update(to_remove)

            # as each of those futures complete, try to schedule more tasks
            for f in futures.as_completed(new):
                schedule_tasks()

        # start scheduling tasks and run until everything is scheduled
        schedule_tasks()

        # check the result of each future, raising an exception if it failed
        for f in futures.as_completed(fs.values()):
            f.result()


def create_task(session, task_id, label, task_def):
    # create the task using 'http://taskcluster/queue', which is proxied to the queue service
    # with credentials appropriate to this job.

    # Resolve timestamps
    now = current_json_time(datetime_format=True)
    task_def = resolve_timestamps(now, task_def)

    if testing:
        json.dump(
            [task_id, task_def],
            sys.stdout,
            sort_keys=True,
            indent=4,
            separators=(",", ": "),
        )
        # add a newline
        print("")
        return

    logger.debug("Creating task with taskId {} for {}".format(task_id, label))
    res = session.put(
        "http://taskcluster/queue/v1/task/{}".format(task_id), data=json.dumps(task_def)
    )
    if res.status_code != 200:
        try:
            logger.error(res.json()["message"])
        except Exception:
            logger.error(res.text)
        res.raise_for_status()