From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- .../gecko_taskgraph/transforms/job/__init__.py | 504 +++++++++++++++++++++ 1 file changed, 504 insertions(+) create mode 100644 taskcluster/gecko_taskgraph/transforms/job/__init__.py (limited to 'taskcluster/gecko_taskgraph/transforms/job/__init__.py') diff --git a/taskcluster/gecko_taskgraph/transforms/job/__init__.py b/taskcluster/gecko_taskgraph/transforms/job/__init__.py new file mode 100644 index 0000000000..9b6924b605 --- /dev/null +++ b/taskcluster/gecko_taskgraph/transforms/job/__init__.py @@ -0,0 +1,504 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +""" +Convert a job description into a task description. + +Jobs descriptions are similar to task descriptions, but they specify how to run +the job at a higher level, using a "run" field that can be interpreted by +run-using handlers in `taskcluster/gecko_taskgraph/transforms/job`. +""" + + +import json +import logging + +import mozpack.path as mozpath +from taskgraph.transforms.base import TransformSequence +from taskgraph.util.python_path import import_sibling_modules +from taskgraph.util.schema import Schema, validate_schema +from taskgraph.util.taskcluster import get_artifact_prefix +from voluptuous import Any, Exclusive, Extra, Optional, Required + +from gecko_taskgraph.transforms.cached_tasks import order_tasks +from gecko_taskgraph.transforms.task import task_description_schema +from gecko_taskgraph.util.copy_task import copy_task +from gecko_taskgraph.util.workertypes import worker_type_implementation + +logger = logging.getLogger(__name__) + +# Schema for a build description +job_description_schema = Schema( + { + # The name of the job and the job's label. At least one must be specified, + # and the label will be generated from the name if necessary, by prepending + # the kind. + Optional("name"): str, + Optional("label"): str, + # the following fields are passed directly through to the task description, + # possibly modified by the run implementation. See + # taskcluster/gecko_taskgraph/transforms/task.py for the schema details. + Required("description"): task_description_schema["description"], + Optional("attributes"): task_description_schema["attributes"], + Optional("job-from"): task_description_schema["job-from"], + Optional("dependencies"): task_description_schema["dependencies"], + Optional("if-dependencies"): task_description_schema["if-dependencies"], + Optional("soft-dependencies"): task_description_schema["soft-dependencies"], + Optional("if-dependencies"): task_description_schema["if-dependencies"], + Optional("requires"): task_description_schema["requires"], + Optional("expires-after"): task_description_schema["expires-after"], + Optional("expiration-policy"): task_description_schema["expiration-policy"], + Optional("routes"): task_description_schema["routes"], + Optional("scopes"): task_description_schema["scopes"], + Optional("tags"): task_description_schema["tags"], + Optional("extra"): task_description_schema["extra"], + Optional("treeherder"): task_description_schema["treeherder"], + Optional("index"): task_description_schema["index"], + Optional("run-on-projects"): task_description_schema["run-on-projects"], + Optional("shipping-phase"): task_description_schema["shipping-phase"], + Optional("shipping-product"): task_description_schema["shipping-product"], + Optional("always-target"): task_description_schema["always-target"], + Exclusive("optimization", "optimization"): task_description_schema[ + "optimization" + ], + Optional("use-sccache"): task_description_schema["use-sccache"], + Optional("use-system-python"): bool, + Optional("priority"): task_description_schema["priority"], + # The "when" section contains descriptions of the circumstances under which + # this task should be included in the task graph. This will be converted + # into an optimization, so it cannot be specified in a job description that + # also gives 'optimization'. + Exclusive("when", "optimization"): Any( + None, + { + # This task only needs to be run if a file matching one of the given + # patterns has changed in the push. The patterns use the mozpack + # match function (python/mozbuild/mozpack/path.py). + Optional("files-changed"): [str], + }, + ), + # A list of artifacts to install from 'fetch' tasks. + Optional("fetches"): { + str: [ + str, + { + Required("artifact"): str, + Optional("dest"): str, + Optional("extract"): bool, + Optional("verify-hash"): bool, + }, + ], + }, + # A description of how to run this job. + "run": { + # The key to a job implementation in a peer module to this one + "using": str, + # Base work directory used to set up the task. + Optional("workdir"): str, + # Any remaining content is verified against that job implementation's + # own schema. + Extra: object, + }, + Required("worker-type"): task_description_schema["worker-type"], + # This object will be passed through to the task description, with additions + # provided by the job's run-using function + Optional("worker"): dict, + } +) + +transforms = TransformSequence() +transforms.add_validate(job_description_schema) + + +@transforms.add +def rewrite_when_to_optimization(config, jobs): + for job in jobs: + when = job.pop("when", {}) + if not when: + yield job + continue + + files_changed = when.get("files-changed") + + # implicitly add task config directory. + files_changed.append(f"{config.path}/**") + + # "only when files changed" implies "skip if files have not changed" + job["optimization"] = {"skip-unless-changed": files_changed} + + assert "when" not in job + yield job + + +@transforms.add +def set_implementation(config, jobs): + for job in jobs: + impl, os = worker_type_implementation( + config.graph_config, config.params, job["worker-type"] + ) + if os: + job.setdefault("tags", {})["os"] = os + if impl: + job.setdefault("tags", {})["worker-implementation"] = impl + worker = job.setdefault("worker", {}) + assert "implementation" not in worker + worker["implementation"] = impl + if os: + worker["os"] = os + yield job + + +@transforms.add +def set_label(config, jobs): + for job in jobs: + if "label" not in job: + if "name" not in job: + raise Exception("job has neither a name nor a label") + job["label"] = "{}-{}".format(config.kind, job["name"]) + if job.get("name"): + del job["name"] + yield job + + +@transforms.add +def add_resource_monitor(config, jobs): + for job in jobs: + if job.get("attributes", {}).get("resource-monitor"): + worker_implementation, worker_os = worker_type_implementation( + config.graph_config, config.params, job["worker-type"] + ) + # Normalise worker os so that linux-bitbar and similar use linux tools. + worker_os = worker_os.split("-")[0] + # We don't currently support an Arm worker, due to gopsutil's indirect + # dependencies (go-ole) + if "aarch64" in job["worker-type"]: + yield job + continue + elif "win7" in job["worker-type"]: + arch = "32" + else: + arch = "64" + job.setdefault("fetches", {}) + job["fetches"].setdefault("toolchain", []) + job["fetches"]["toolchain"].append(f"{worker_os}{arch}-resource-monitor") + + if worker_implementation == "docker-worker": + artifact_source = "/builds/worker/monitoring/resource-monitor.json" + else: + artifact_source = "monitoring/resource-monitor.json" + job["worker"].setdefault("artifacts", []) + job["worker"]["artifacts"].append( + { + "name": "public/monitoring/resource-monitor.json", + "type": "file", + "path": artifact_source, + } + ) + # Set env for output file + job["worker"].setdefault("env", {}) + job["worker"]["env"]["RESOURCE_MONITOR_OUTPUT"] = artifact_source + + yield job + + +@transforms.add +def make_task_description(config, jobs): + """Given a build description, create a task description""" + # import plugin modules first, before iterating over jobs + import_sibling_modules(exceptions=("common.py",)) + + for job in jobs: + # only docker-worker uses a fixed absolute path to find directories + if job["worker"]["implementation"] == "docker-worker": + job["run"].setdefault("workdir", "/builds/worker") + + taskdesc = copy_task(job) + + # fill in some empty defaults to make run implementations easier + taskdesc.setdefault("attributes", {}) + taskdesc.setdefault("dependencies", {}) + taskdesc.setdefault("if-dependencies", []) + taskdesc.setdefault("soft-dependencies", []) + taskdesc.setdefault("routes", []) + taskdesc.setdefault("scopes", []) + taskdesc.setdefault("extra", {}) + + # give the function for job.run.using on this worker implementation a + # chance to set up the task description. + configure_taskdesc_for_run( + config, job, taskdesc, job["worker"]["implementation"] + ) + del taskdesc["run"] + + # yield only the task description, discarding the job description + yield taskdesc + + +def get_attribute(dict, key, attributes, attribute_name): + """Get `attribute_name` from the given `attributes` dict, and if there + is a corresponding value, set `key` in `dict` to that value.""" + value = attributes.get(attribute_name) + if value: + dict[key] = value + + +@transforms.add +def use_system_python(config, jobs): + for job in jobs: + if job.pop("use-system-python", True): + yield job + else: + fetches = job.setdefault("fetches", {}) + toolchain = fetches.setdefault("toolchain", []) + + moz_python_home = mozpath.join("fetches", "python") + if "win" in job["worker"]["os"]: + platform = "win64" + elif "linux" in job["worker"]["os"]: + platform = "linux64" + elif "macosx" in job["worker"]["os"]: + platform = "macosx64" + else: + raise ValueError("unexpected worker.os value {}".format(platform)) + + toolchain.append("{}-python".format(platform)) + + worker = job.setdefault("worker", {}) + env = worker.setdefault("env", {}) + env["MOZ_PYTHON_HOME"] = moz_python_home + + yield job + + +@transforms.add +def use_fetches(config, jobs): + artifact_names = {} + extra_env = {} + aliases = {} + tasks = [] + + if config.kind in ("toolchain", "fetch"): + jobs = list(jobs) + tasks.extend((config.kind, j) for j in jobs) + + tasks.extend( + (task.kind, task.__dict__) + for task in config.kind_dependencies_tasks.values() + if task.kind in ("fetch", "toolchain") + ) + for (kind, task) in tasks: + get_attribute( + artifact_names, task["label"], task["attributes"], f"{kind}-artifact" + ) + get_attribute(extra_env, task["label"], task["attributes"], f"{kind}-env") + value = task["attributes"].get(f"{kind}-alias") + if not value: + value = [] + elif isinstance(value, str): + value = [value] + for alias in value: + fully_qualified = f"{kind}-{alias}" + label = task["label"] + if fully_qualified == label: + raise Exception(f"The alias {alias} of task {label} points to itself!") + aliases[fully_qualified] = label + + artifact_prefixes = {} + for job in order_tasks(config, jobs): + artifact_prefixes[job["label"]] = get_artifact_prefix(job) + + fetches = job.pop("fetches", None) + if not fetches: + yield job + continue + + job_fetches = [] + name = job.get("name") or job.get("label").replace(f"{config.kind}-", "") + dependencies = job.setdefault("dependencies", {}) + worker = job.setdefault("worker", {}) + env = worker.setdefault("env", {}) + prefix = get_artifact_prefix(job) + has_sccache = False + for kind, artifacts in fetches.items(): + if kind in ("fetch", "toolchain"): + for fetch_name in artifacts: + label = f"{kind}-{fetch_name}" + label = aliases.get(label, label) + if label not in artifact_names: + raise Exception( + "Missing fetch job for {kind}-{name}: {fetch}".format( + kind=config.kind, name=name, fetch=fetch_name + ) + ) + if label in extra_env: + env.update(extra_env[label]) + + path = artifact_names[label] + + dependencies[label] = label + job_fetches.append( + { + "artifact": path, + "task": f"<{label}>", + "extract": True, + } + ) + + if kind == "toolchain" and fetch_name.endswith("-sccache"): + has_sccache = True + else: + if kind not in dependencies: + raise Exception( + "{name} can't fetch {kind} artifacts because " + "it has no {kind} dependencies!".format(name=name, kind=kind) + ) + dep_label = dependencies[kind] + if dep_label in artifact_prefixes: + prefix = artifact_prefixes[dep_label] + else: + if dep_label not in config.kind_dependencies_tasks: + raise Exception( + "{name} can't fetch {kind} artifacts because " + "there are no tasks with label {label} in kind dependencies!".format( + name=name, + kind=kind, + label=dependencies[kind], + ) + ) + + prefix = get_artifact_prefix( + config.kind_dependencies_tasks[dep_label] + ) + + for artifact in artifacts: + if isinstance(artifact, str): + path = artifact + dest = None + extract = True + verify_hash = False + else: + path = artifact["artifact"] + dest = artifact.get("dest") + extract = artifact.get("extract", True) + verify_hash = artifact.get("verify-hash", False) + + fetch = { + "artifact": f"{prefix}/{path}" + if not path.startswith("/") + else path[1:], + "task": f"<{kind}>", + "extract": extract, + } + if dest is not None: + fetch["dest"] = dest + if verify_hash: + fetch["verify-hash"] = verify_hash + job_fetches.append(fetch) + + if job.get("use-sccache") and not has_sccache: + raise Exception("Must provide an sccache toolchain if using sccache.") + + job_artifact_prefixes = { + mozpath.dirname(fetch["artifact"]) + for fetch in job_fetches + if not fetch["artifact"].startswith("public/") + } + if job_artifact_prefixes: + # Use taskcluster-proxy and request appropriate scope. For example, add + # 'scopes: [queue:get-artifact:path/to/*]' for 'path/to/artifact.tar.xz'. + worker["taskcluster-proxy"] = True + for prefix in sorted(job_artifact_prefixes): + scope = f"queue:get-artifact:{prefix}/*" + if scope not in job.setdefault("scopes", []): + job["scopes"].append(scope) + + artifacts = {} + for f in job_fetches: + _, __, artifact = f["artifact"].rpartition("/") + if "dest" in f: + artifact = f"{f['dest']}/{artifact}" + task = f["task"][1:-1] + if artifact in artifacts: + raise Exception( + f"Task {name} depends on {artifacts[artifact]} and {task} " + f"that both provide {artifact}" + ) + artifacts[artifact] = task + + env["MOZ_FETCHES"] = { + "task-reference": json.dumps( + sorted(job_fetches, key=lambda x: sorted(x.items())), sort_keys=True + ) + } + # The path is normalized to an absolute path in run-task + env.setdefault("MOZ_FETCHES_DIR", "fetches") + + yield job + + +# A registry of all functions decorated with run_job_using +registry = {} + + +def run_job_using(worker_implementation, run_using, schema=None, defaults={}): + """Register the decorated function as able to set up a task description for + jobs with the given worker implementation and `run.using` property. If + `schema` is given, the job's run field will be verified to match it. + + The decorated function should have the signature `using_foo(config, job, taskdesc)` + and should modify the task description in-place. The skeleton of + the task description is already set up, but without a payload.""" + + def wrap(func): + for_run_using = registry.setdefault(run_using, {}) + if worker_implementation in for_run_using: + raise Exception( + "run_job_using({!r}, {!r}) already exists: {!r}".format( + run_using, worker_implementation, for_run_using[run_using] + ) + ) + for_run_using[worker_implementation] = (func, schema, defaults) + return func + + return wrap + + +@run_job_using( + "always-optimized", "always-optimized", Schema({"using": "always-optimized"}) +) +def always_optimized(config, job, taskdesc): + pass + + +def configure_taskdesc_for_run(config, job, taskdesc, worker_implementation): + """ + Run the appropriate function for this job against the given task + description. + + This will raise an appropriate error if no function exists, or if the job's + run is not valid according to the schema. + """ + run_using = job["run"]["using"] + if run_using not in registry: + raise Exception(f"no functions for run.using {run_using!r}") + + if worker_implementation not in registry[run_using]: + raise Exception( + "no functions for run.using {!r} on {!r}".format( + run_using, worker_implementation + ) + ) + + func, schema, defaults = registry[run_using][worker_implementation] + for k, v in defaults.items(): + job["run"].setdefault(k, v) + + if schema: + validate_schema( + schema, + job["run"], + "In job.run using {!r}/{!r} for job {!r}:".format( + job["run"]["using"], worker_implementation, job["label"] + ), + ) + func(config, job, taskdesc) -- cgit v1.2.3