summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/transforms/chunking.py
blob: 31d7eff82c5a04696eaf6ddc19fc43057476f31e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
from textwrap import dedent

from voluptuous import ALLOW_EXTRA, Optional, Required

from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema
from taskgraph.util.templates import substitute

CHUNK_SCHEMA = Schema(
    {
        # Optional, so it can be used for a subset of tasks in a kind
        Optional(
            "chunk",
            description=dedent(
                """
            `chunk` can be used to split one task into `total-chunks`
            tasks, substituting `this_chunk` and `total_chunks` into any
            fields in `substitution-fields`.
            """.lstrip()
            ),
        ): {
            Required(
                "total-chunks",
                description=dedent(
                    """
                The total number of chunks to split the task into.
                """.lstrip()
                ),
            ): int,
            Optional(
                "substitution-fields",
                description=dedent(
                    """
                A list of fields that need to have `{this_chunk}` and/or
                `{total_chunks}` replaced in them.
                """.lstrip()
                ),
            ): [str],
        }
    },
    extra=ALLOW_EXTRA,
)

transforms = TransformSequence()
transforms.add_validate(CHUNK_SCHEMA)


@transforms.add
def chunk_tasks(config, tasks):
    for task in tasks:
        chunk_config = task.pop("chunk", None)
        if not chunk_config:
            yield task
            continue

        total_chunks = chunk_config["total-chunks"]

        for this_chunk in range(1, total_chunks + 1):
            subtask = copy.deepcopy(task)

            subs = {
                "this_chunk": this_chunk,
                "total_chunks": total_chunks,
            }
            subtask.setdefault("attributes", {})
            subtask["attributes"].update(subs)

            for field in chunk_config["substitution-fields"]:
                container, subfield = subtask, field
                while "." in subfield:
                    f, subfield = subfield.split(".", 1)
                    container = container[f]

                subcontainer = copy.deepcopy(container[subfield])
                subfield = substitute(subfield, **subs)
                container[subfield] = substitute(subcontainer, **subs)

            yield subtask