summaryrefslogtreecommitdiffstats
path: root/taskcluster/taskgraph/transforms/job/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskcluster/taskgraph/transforms/job/__init__.py')
-rw-r--r--taskcluster/taskgraph/transforms/job/__init__.py464
1 files changed, 464 insertions, 0 deletions
diff --git a/taskcluster/taskgraph/transforms/job/__init__.py b/taskcluster/taskgraph/transforms/job/__init__.py
new file mode 100644
index 0000000000..e0fd3e8dd3
--- /dev/null
+++ b/taskcluster/taskgraph/transforms/job/__init__.py
@@ -0,0 +1,464 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+"""
+Convert a job description into a task description.
+
+Jobs descriptions are similar to task descriptions, but they specify how to run
+the job at a higher level, using a "run" field that can be interpreted by
+run-using handlers in `taskcluster/taskgraph/transforms/job`.
+"""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import copy
+import logging
+import json
+import six
+from six import text_type
+
+import mozpack.path as mozpath
+
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.transforms.cached_tasks import order_tasks
+from taskgraph.util.schema import (
+ validate_schema,
+ Schema,
+)
+from taskgraph.util.python_path import import_sibling_modules
+from taskgraph.util.taskcluster import get_artifact_prefix
+from taskgraph.util.workertypes import worker_type_implementation
+from taskgraph.transforms.task import task_description_schema
+from voluptuous import (
+ Extra,
+ Optional,
+ Required,
+ Exclusive,
+)
+
+logger = logging.getLogger(__name__)
+
+# Schema for a build description
+job_description_schema = Schema(
+ {
+ # The name of the job and the job's label. At least one must be specified,
+ # and the label will be generated from the name if necessary, by prepending
+ # the kind.
+ Optional("name"): text_type,
+ Optional("label"): text_type,
+ # the following fields are passed directly through to the task description,
+ # possibly modified by the run implementation. See
+ # taskcluster/taskgraph/transforms/task.py for the schema details.
+ Required("description"): task_description_schema["description"],
+ Optional("attributes"): task_description_schema["attributes"],
+ Optional("job-from"): task_description_schema["job-from"],
+ Optional("dependencies"): task_description_schema["dependencies"],
+ Optional("if-dependencies"): task_description_schema["if-dependencies"],
+ Optional("soft-dependencies"): task_description_schema["soft-dependencies"],
+ Optional("if-dependencies"): task_description_schema["if-dependencies"],
+ Optional("requires"): task_description_schema["requires"],
+ Optional("expires-after"): task_description_schema["expires-after"],
+ Optional("routes"): task_description_schema["routes"],
+ Optional("scopes"): task_description_schema["scopes"],
+ Optional("tags"): task_description_schema["tags"],
+ Optional("extra"): task_description_schema["extra"],
+ Optional("treeherder"): task_description_schema["treeherder"],
+ Optional("index"): task_description_schema["index"],
+ Optional("run-on-projects"): task_description_schema["run-on-projects"],
+ Optional("shipping-phase"): task_description_schema["shipping-phase"],
+ Optional("shipping-product"): task_description_schema["shipping-product"],
+ Optional("always-target"): task_description_schema["always-target"],
+ Exclusive("optimization", "optimization"): task_description_schema[
+ "optimization"
+ ],
+ Optional("use-sccache"): task_description_schema["use-sccache"],
+ Optional("release-artifacts"): task_description_schema["release-artifacts"],
+ Optional("priority"): task_description_schema["priority"],
+ # The "when" section contains descriptions of the circumstances under which
+ # this task should be included in the task graph. This will be converted
+ # into an optimization, so it cannot be specified in a job description that
+ # also gives 'optimization'.
+ Exclusive("when", "optimization"): {
+ # This task only needs to be run if a file matching one of the given
+ # patterns has changed in the push. The patterns use the mozpack
+ # match function (python/mozbuild/mozpack/path.py).
+ Optional("files-changed"): [text_type],
+ },
+ # A list of artifacts to install from 'fetch' tasks.
+ Optional("fetches"): {
+ text_type: [
+ text_type,
+ {
+ Required("artifact"): text_type,
+ Optional("dest"): text_type,
+ Optional("extract"): bool,
+ Optional("verify-hash"): bool,
+ },
+ ],
+ },
+ # A description of how to run this job.
+ "run": {
+ # The key to a job implementation in a peer module to this one
+ "using": text_type,
+ # Base work directory used to set up the task.
+ Optional("workdir"): text_type,
+ # Any remaining content is verified against that job implementation's
+ # own schema.
+ Extra: object,
+ },
+ Required("worker-type"): task_description_schema["worker-type"],
+ # This object will be passed through to the task description, with additions
+ # provided by the job's run-using function
+ Optional("worker"): dict,
+ }
+)
+
+transforms = TransformSequence()
+transforms.add_validate(job_description_schema)
+
+
+@transforms.add
+def rewrite_when_to_optimization(config, jobs):
+ for job in jobs:
+ when = job.pop("when", {})
+ if not when:
+ yield job
+ continue
+
+ files_changed = when.get("files-changed")
+
+ # implicitly add task config directory.
+ files_changed.append("{}/**".format(config.path))
+
+ # "only when files changed" implies "skip if files have not changed"
+ job["optimization"] = {"skip-unless-changed": files_changed}
+
+ assert "when" not in job
+ yield job
+
+
+@transforms.add
+def set_implementation(config, jobs):
+ for job in jobs:
+ impl, os = worker_type_implementation(config.graph_config, job["worker-type"])
+ if os:
+ job.setdefault("tags", {})["os"] = os
+ if impl:
+ job.setdefault("tags", {})["worker-implementation"] = impl
+ worker = job.setdefault("worker", {})
+ assert "implementation" not in worker
+ worker["implementation"] = impl
+ if os:
+ worker["os"] = os
+ yield job
+
+
+@transforms.add
+def set_label(config, jobs):
+ for job in jobs:
+ if "label" not in job:
+ if "name" not in job:
+ raise Exception("job has neither a name nor a label")
+ job["label"] = "{}-{}".format(config.kind, job["name"])
+ if job.get("name"):
+ del job["name"]
+ yield job
+
+
+@transforms.add
+def add_resource_monitor(config, jobs):
+ for job in jobs:
+ if job.get("attributes", {}).get("resource-monitor"):
+ worker_implementation, worker_os = worker_type_implementation(
+ config.graph_config, job["worker-type"]
+ )
+ # Normalise worker os so that linux-bitbar and similar use linux tools.
+ worker_os = worker_os.split("-")[0]
+ # We don't currently support an Arm worker, due to gopsutil's indirect
+ # dependencies (go-ole)
+ if "aarch64" in job["worker-type"]:
+ yield job
+ continue
+ elif "win7" in job["worker-type"]:
+ arch = "32"
+ else:
+ arch = "64"
+ job.setdefault("fetches", {})
+ job["fetches"].setdefault("toolchain", [])
+ job["fetches"]["toolchain"].append(
+ "{}{}-resource-monitor".format(worker_os, arch)
+ )
+
+ if worker_implementation == "docker-worker":
+ artifact_source = "/builds/worker/monitoring/resource-monitor.json"
+ else:
+ artifact_source = "monitoring/resource-monitor.json"
+ job["worker"].setdefault("artifacts", [])
+ job["worker"]["artifacts"].append(
+ {
+ "name": "public/monitoring/resource-monitor.json",
+ "type": "file",
+ "path": artifact_source,
+ }
+ )
+ # Set env for output file
+ job["worker"].setdefault("env", {})
+ job["worker"]["env"]["RESOURCE_MONITOR_OUTPUT"] = artifact_source
+
+ yield job
+
+
+def get_attribute(dict, key, attributes, attribute_name):
+ """Get `attribute_name` from the given `attributes` dict, and if there
+ is a corresponding value, set `key` in `dict` to that value."""
+ value = attributes.get(attribute_name)
+ if value:
+ dict[key] = value
+
+
+@transforms.add
+def use_fetches(config, jobs):
+ artifact_names = {}
+ aliases = {}
+
+ if config.kind in ("toolchain", "fetch"):
+ jobs = list(jobs)
+ for job in jobs:
+ run = job.get("run", {})
+ label = job["label"]
+ get_attribute(artifact_names, label, run, "toolchain-artifact")
+ value = run.get("{}-alias".format(config.kind))
+ if value:
+ aliases["{}-{}".format(config.kind, value)] = label
+
+ for task in config.kind_dependencies_tasks.values():
+ if task.kind in ("fetch", "toolchain"):
+ get_attribute(
+ artifact_names,
+ task.label,
+ task.attributes,
+ "{kind}-artifact".format(kind=task.kind),
+ )
+ value = task.attributes.get("{}-alias".format(task.kind))
+ if value:
+ aliases["{}-{}".format(task.kind, value)] = task.label
+
+ artifact_prefixes = {}
+ for job in order_tasks(config, jobs):
+ artifact_prefixes[job["label"]] = get_artifact_prefix(job)
+
+ fetches = job.pop("fetches", None)
+ if not fetches:
+ yield job
+ continue
+
+ job_fetches = []
+ name = job.get("name", job.get("label"))
+ dependencies = job.setdefault("dependencies", {})
+ worker = job.setdefault("worker", {})
+ prefix = get_artifact_prefix(job)
+ has_sccache = False
+ for kind, artifacts in fetches.items():
+ if kind in ("fetch", "toolchain"):
+ for fetch_name in artifacts:
+ label = "{kind}-{name}".format(kind=kind, name=fetch_name)
+ label = aliases.get(label, label)
+ if label not in artifact_names:
+ raise Exception(
+ "Missing fetch job for {kind}-{name}: {fetch}".format(
+ kind=config.kind, name=name, fetch=fetch_name
+ )
+ )
+
+ path = artifact_names[label]
+
+ dependencies[label] = label
+ job_fetches.append(
+ {
+ "artifact": path,
+ "task": "<{label}>".format(label=label),
+ "extract": True,
+ }
+ )
+
+ if kind == "toolchain" and fetch_name.endswith("-sccache"):
+ has_sccache = True
+ else:
+ if kind not in dependencies:
+ raise Exception(
+ "{name} can't fetch {kind} artifacts because "
+ "it has no {kind} dependencies!".format(name=name, kind=kind)
+ )
+ dep_label = dependencies[kind]
+ if dep_label in artifact_prefixes:
+ prefix = artifact_prefixes[dep_label]
+ else:
+ if dep_label not in config.kind_dependencies_tasks:
+ raise Exception(
+ "{name} can't fetch {kind} artifacts because "
+ "there are no tasks with label {label} in kind dependencies!".format(
+ name=name,
+ kind=kind,
+ label=dependencies[kind],
+ )
+ )
+
+ prefix = get_artifact_prefix(
+ config.kind_dependencies_tasks[dep_label]
+ )
+
+ for artifact in artifacts:
+ if isinstance(artifact, text_type):
+ path = artifact
+ dest = None
+ extract = True
+ verify_hash = False
+ else:
+ path = artifact["artifact"]
+ dest = artifact.get("dest")
+ extract = artifact.get("extract", True)
+ verify_hash = artifact.get("verify-hash", False)
+
+ fetch = {
+ "artifact": "{prefix}/{path}".format(prefix=prefix, path=path)
+ if not path.startswith("/")
+ else path[1:],
+ "task": "<{dep}>".format(dep=kind),
+ "extract": extract,
+ }
+ if dest is not None:
+ fetch["dest"] = dest
+ if verify_hash:
+ fetch["verify-hash"] = verify_hash
+ job_fetches.append(fetch)
+
+ if job.get("use-sccache") and not has_sccache:
+ raise Exception("Must provide an sccache toolchain if using sccache.")
+
+ job_artifact_prefixes = {
+ mozpath.dirname(fetch["artifact"])
+ for fetch in job_fetches
+ if not fetch["artifact"].startswith("public/")
+ }
+ if job_artifact_prefixes:
+ # Use taskcluster-proxy and request appropriate scope. For example, add
+ # 'scopes: [queue:get-artifact:path/to/*]' for 'path/to/artifact.tar.xz'.
+ worker["taskcluster-proxy"] = True
+ for prefix in sorted(job_artifact_prefixes):
+ scope = "queue:get-artifact:{}/*".format(prefix)
+ if scope not in job.setdefault("scopes", []):
+ job["scopes"].append(scope)
+
+ env = worker.setdefault("env", {})
+ env["MOZ_FETCHES"] = {
+ "task-reference": six.ensure_text(
+ json.dumps(
+ sorted(job_fetches, key=lambda x: sorted(x.items())), sort_keys=True
+ )
+ )
+ }
+ # The path is normalized to an absolute path in run-task
+ env.setdefault("MOZ_FETCHES_DIR", "fetches")
+
+ yield job
+
+
+@transforms.add
+def make_task_description(config, jobs):
+ """Given a build description, create a task description"""
+ # import plugin modules first, before iterating over jobs
+ import_sibling_modules(exceptions=("common.py",))
+
+ for job in jobs:
+ # only docker-worker uses a fixed absolute path to find directories
+ if job["worker"]["implementation"] == "docker-worker":
+ job["run"].setdefault("workdir", "/builds/worker")
+
+ taskdesc = copy.deepcopy(job)
+
+ # fill in some empty defaults to make run implementations easier
+ taskdesc.setdefault("attributes", {})
+ taskdesc.setdefault("dependencies", {})
+ taskdesc.setdefault("if-dependencies", [])
+ taskdesc.setdefault("soft-dependencies", [])
+ taskdesc.setdefault("routes", [])
+ taskdesc.setdefault("scopes", [])
+ taskdesc.setdefault("extra", {})
+
+ # give the function for job.run.using on this worker implementation a
+ # chance to set up the task description.
+ configure_taskdesc_for_run(
+ config, job, taskdesc, job["worker"]["implementation"]
+ )
+ del taskdesc["run"]
+
+ # yield only the task description, discarding the job description
+ yield taskdesc
+
+
+# A registry of all functions decorated with run_job_using
+registry = {}
+
+
+def run_job_using(worker_implementation, run_using, schema=None, defaults={}):
+ """Register the decorated function as able to set up a task description for
+ jobs with the given worker implementation and `run.using` property. If
+ `schema` is given, the job's run field will be verified to match it.
+
+ The decorated function should have the signature `using_foo(config, job, taskdesc)`
+ and should modify the task description in-place. The skeleton of
+ the task description is already set up, but without a payload."""
+
+ def wrap(func):
+ for_run_using = registry.setdefault(run_using, {})
+ if worker_implementation in for_run_using:
+ raise Exception(
+ "run_job_using({!r}, {!r}) already exists: {!r}".format(
+ run_using, worker_implementation, for_run_using[run_using]
+ )
+ )
+ for_run_using[worker_implementation] = (func, schema, defaults)
+ return func
+
+ return wrap
+
+
+@run_job_using(
+ "always-optimized", "always-optimized", Schema({"using": "always-optimized"})
+)
+def always_optimized(config, job, taskdesc):
+ pass
+
+
+def configure_taskdesc_for_run(config, job, taskdesc, worker_implementation):
+ """
+ Run the appropriate function for this job against the given task
+ description.
+
+ This will raise an appropriate error if no function exists, or if the job's
+ run is not valid according to the schema.
+ """
+ run_using = job["run"]["using"]
+ if run_using not in registry:
+ raise Exception("no functions for run.using {!r}".format(run_using))
+
+ if worker_implementation not in registry[run_using]:
+ raise Exception(
+ "no functions for run.using {!r} on {!r}".format(
+ run_using, worker_implementation
+ )
+ )
+
+ func, schema, defaults = registry[run_using][worker_implementation]
+ for k, v in defaults.items():
+ job["run"].setdefault(k, v)
+
+ if schema:
+ validate_schema(
+ schema,
+ job["run"],
+ "In job.run using {!r}/{!r} for job {!r}:".format(
+ job["run"]["using"], worker_implementation, job["label"]
+ ),
+ )
+ func(config, job, taskdesc)