diff options
Diffstat (limited to 'taskcluster/taskgraph/decision.py')
-rw-r--r-- | taskcluster/taskgraph/decision.py | 534 |
1 files changed, 534 insertions, 0 deletions
diff --git a/taskcluster/taskgraph/decision.py b/taskcluster/taskgraph/decision.py new file mode 100644 index 0000000000..4f1d3c37dc --- /dev/null +++ b/taskcluster/taskgraph/decision.py @@ -0,0 +1,534 @@ +# -*- coding: utf-8 -*- +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import, print_function, unicode_literals + +import os +import json +import logging +import time +import sys +from collections import defaultdict + +import six +from six import text_type +from redo import retry +import yaml + +from . import GECKO +from .actions import render_actions_json +from .create import create_tasks +from .generator import TaskGraphGenerator +from .parameters import Parameters, get_version, get_app_version +from .taskgraph import TaskGraph +from taskgraph.util.python_path import find_object +from .try_option_syntax import parse_message +from .util.backstop import is_backstop +from .util.bugbug import push_schedules +from .util.chunking import resolver +from .util.hg import get_hg_revision_branch, get_hg_commit_message +from .util.partials import populate_release_history +from .util.schema import validate_schema, Schema +from .util.taskcluster import get_artifact, insert_index +from .util.taskgraph import find_decision_task, find_existing_tasks_from_previous_kinds +from .util.yaml import load_yaml +from voluptuous import Required, Optional, Any + + +logger = logging.getLogger(__name__) + +ARTIFACTS_DIR = "artifacts" + +# For each project, this gives a set of parameters specific to the project. +# See `taskcluster/docs/parameters.rst` for information on parameters. +PER_PROJECT_PARAMETERS = { + "try": { + "target_tasks_method": "try_tasks", + }, + "try-comm-central": { + "target_tasks_method": "try_tasks", + }, + "kaios-try": { + "target_tasks_method": "try_tasks", + }, + "ash": { + "target_tasks_method": "default", + }, + "cedar": { + "target_tasks_method": "default", + }, + "oak": { + "target_tasks_method": "nightly_desktop", + "release_type": "nightly-oak", + }, + "graphics": { + "target_tasks_method": "graphics_tasks", + }, + "autoland": { + "optimize_strategies": "taskgraph.optimize:project.autoland", + "target_tasks_method": "autoland_tasks", + "test_manifest_loader": "bugbug", # Remove this line to disable "manifest scheduling". + }, + "mozilla-central": { + "target_tasks_method": "mozilla_central_tasks", + "release_type": "nightly", + }, + "mozilla-beta": { + "target_tasks_method": "mozilla_beta_tasks", + "release_type": "beta", + }, + "mozilla-release": { + "target_tasks_method": "mozilla_release_tasks", + "release_type": "release", + }, + "mozilla-esr78": { + "target_tasks_method": "mozilla_esr78_tasks", + "release_type": "esr78", + }, + "comm-central": { + "target_tasks_method": "default", + "release_type": "nightly", + }, + "comm-beta": { + "target_tasks_method": "mozilla_beta_tasks", + "release_type": "beta", + }, + "comm-esr78": { + "target_tasks_method": "mozilla_esr78_tasks", + "release_type": "release", + }, + "pine": { + "target_tasks_method": "pine_tasks", + }, + "kaios": { + "target_tasks_method": "kaios_tasks", + }, + # the default parameters are used for projects that do not match above. + "default": { + "target_tasks_method": "default", + }, +} + +try_task_config_schema = Schema( + { + Required("tasks"): [text_type], + Optional("browsertime"): bool, + Optional("chemspill-prio"): bool, + Optional("disable-pgo"): bool, + Optional("env"): {text_type: text_type}, + Optional("gecko-profile"): bool, + Optional( + "perftest-options", + description="Options passed from `mach perftest` to try.", + ): object, + Optional( + "optimize-strategies", + description="Alternative optimization strategies to use instead of the default. " + "A module path pointing to a dict to be use as the `strategy_override` " + "argument in `taskgraph.optimize.optimize_task_graph`.", + ): text_type, + Optional("rebuild"): int, + Optional("tasks-regex"): { + "include": Any(None, [text_type]), + "exclude": Any(None, [text_type]), + }, + Optional("use-artifact-builds"): bool, + Optional( + "worker-overrides", + description="Mapping of worker alias to worker pools to use for those aliases.", + ): {text_type: text_type}, + Optional("routes"): [text_type], + } +) +""" +Schema for try_task_config.json files. +""" + +try_task_config_schema_v2 = Schema( + { + Optional("parameters"): {text_type: object}, + } +) + + +def full_task_graph_to_runnable_jobs(full_task_json): + runnable_jobs = {} + for label, node in six.iteritems(full_task_json): + if not ("extra" in node["task"] and "treeherder" in node["task"]["extra"]): + continue + + th = node["task"]["extra"]["treeherder"] + runnable_jobs[label] = {"symbol": th["symbol"]} + + for i in ("groupName", "groupSymbol", "collection"): + if i in th: + runnable_jobs[label][i] = th[i] + if th.get("machine", {}).get("platform"): + runnable_jobs[label]["platform"] = th["machine"]["platform"] + return runnable_jobs + + +def full_task_graph_to_manifests_by_task(full_task_json): + manifests_by_task = defaultdict(list) + for label, node in six.iteritems(full_task_json): + manifests = node["attributes"].get("test_manifests") + if not manifests: + continue + + manifests_by_task[label].extend(manifests) + return manifests_by_task + + +def try_syntax_from_message(message): + """ + Parse the try syntax out of a commit message, returning '' if none is + found. + """ + try_idx = message.find("try:") + if try_idx == -1: + return "" + return message[try_idx:].split("\n", 1)[0] + + +def taskgraph_decision(options, parameters=None): + """ + Run the decision task. This function implements `mach taskgraph decision`, + and is responsible for + + * processing decision task command-line options into parameters + * running task-graph generation exactly the same way the other `mach + taskgraph` commands do + * generating a set of artifacts to memorialize the graph + * calling TaskCluster APIs to create the graph + """ + + parameters = parameters or ( + lambda graph_config: get_decision_parameters(graph_config, options) + ) + + decision_task_id = os.environ["TASK_ID"] + + # create a TaskGraphGenerator instance + tgg = TaskGraphGenerator( + root_dir=options.get("root"), + parameters=parameters, + decision_task_id=decision_task_id, + write_artifacts=True, + ) + + # set additional index paths for the decision task + set_decision_indexes(decision_task_id, tgg.parameters, tgg.graph_config) + + # write out the parameters used to generate this graph + write_artifact("parameters.yml", dict(**tgg.parameters)) + + # write out the public/actions.json file + write_artifact( + "actions.json", + render_actions_json(tgg.parameters, tgg.graph_config, decision_task_id), + ) + + # write out the full graph for reference + full_task_json = tgg.full_task_graph.to_json() + write_artifact("full-task-graph.json", full_task_json) + + # write out the public/runnable-jobs.json file + write_artifact( + "runnable-jobs.json", full_task_graph_to_runnable_jobs(full_task_json) + ) + + # write out the public/manifests-by-task.json file + write_artifact( + "manifests-by-task.json.gz", + full_task_graph_to_manifests_by_task(full_task_json), + ) + + # write out the public/tests-by-manifest.json file + write_artifact("tests-by-manifest.json.gz", resolver.tests_by_manifest) + + # this is just a test to check whether the from_json() function is working + _, _ = TaskGraph.from_json(full_task_json) + + # write out the target task set to allow reproducing this as input + write_artifact("target-tasks.json", list(tgg.target_task_set.tasks.keys())) + + # write out the optimized task graph to describe what will actually happen, + # and the map of labels to taskids + write_artifact("task-graph.json", tgg.morphed_task_graph.to_json()) + write_artifact("label-to-taskid.json", tgg.label_to_taskid) + + # write bugbug scheduling information if it was invoked + if len(push_schedules) > 0: + write_artifact("bugbug-push-schedules.json", push_schedules.popitem()[1]) + + # actually create the graph + create_tasks( + tgg.graph_config, + tgg.morphed_task_graph, + tgg.label_to_taskid, + tgg.parameters, + decision_task_id=decision_task_id, + ) + + +def get_decision_parameters(graph_config, options): + """ + Load parameters from the command-line options for 'taskgraph decision'. + This also applies per-project parameters, based on the given project. + + """ + product_dir = graph_config["product-dir"] + + parameters = { + n: options[n] + for n in [ + "base_repository", + "head_repository", + "head_rev", + "head_ref", + "project", + "pushlog_id", + "pushdate", + "owner", + "level", + "target_tasks_method", + "tasks_for", + ] + if n in options + } + + for n in ( + "comm_base_repository", + "comm_head_repository", + "comm_head_rev", + "comm_head_ref", + ): + if n in options and options[n] is not None: + parameters[n] = options[n] + + commit_message = get_hg_commit_message(os.path.join(GECKO, product_dir)) + + # Define default filter list, as most configurations shouldn't need + # custom filters. + parameters["filters"] = [ + "target_tasks_method", + ] + parameters["existing_tasks"] = {} + parameters["do_not_optimize"] = [] + parameters["build_number"] = 1 + parameters["version"] = get_version(product_dir) + parameters["app_version"] = get_app_version(product_dir) + parameters["message"] = try_syntax_from_message(commit_message) + parameters["hg_branch"] = get_hg_revision_branch( + GECKO, revision=parameters["head_rev"] + ) + parameters["next_version"] = None + parameters["optimize_strategies"] = None + parameters["optimize_target_tasks"] = True + parameters["phabricator_diff"] = None + parameters["release_type"] = "" + parameters["release_eta"] = "" + parameters["release_enable_partner_repack"] = False + parameters["release_enable_partner_attribution"] = False + parameters["release_partners"] = [] + parameters["release_partner_config"] = {} + parameters["release_partner_build_number"] = 1 + parameters["release_enable_emefree"] = False + parameters["release_product"] = None + parameters["required_signoffs"] = [] + parameters["signoff_urls"] = {} + parameters["test_manifest_loader"] = "default" + parameters["try_mode"] = None + parameters["try_task_config"] = {} + parameters["try_options"] = None + + # owner must be an email, but sometimes (e.g., for ffxbld) it is not, in which + # case, fake it + if "@" not in parameters["owner"]: + parameters["owner"] += "@noreply.mozilla.org" + + # use the pushdate as build_date if given, else use current time + parameters["build_date"] = parameters["pushdate"] or int(time.time()) + # moz_build_date is the build identifier based on build_date + parameters["moz_build_date"] = six.ensure_text( + time.strftime("%Y%m%d%H%M%S", time.gmtime(parameters["build_date"])) + ) + + project = parameters["project"] + try: + parameters.update(PER_PROJECT_PARAMETERS[project]) + except KeyError: + logger.warning( + "using default project parameters; add {} to " + "PER_PROJECT_PARAMETERS in {} to customize behavior " + "for this project".format(project, __file__) + ) + parameters.update(PER_PROJECT_PARAMETERS["default"]) + + # `target_tasks_method` has higher precedence than `project` parameters + if options.get("target_tasks_method"): + parameters["target_tasks_method"] = options["target_tasks_method"] + + # ..but can be overridden by the commit message: if it contains the special + # string "DONTBUILD" and this is an on-push decision task, then use the + # special 'nothing' target task method. + if "DONTBUILD" in commit_message and options["tasks_for"] == "hg-push": + parameters["target_tasks_method"] = "nothing" + + if options.get("include_push_tasks"): + get_existing_tasks(options.get("rebuild_kinds", []), parameters, graph_config) + + # If the target method is nightly, we should build partials. This means + # knowing what has been released previously. + # An empty release_history is fine, it just means no partials will be built + parameters.setdefault("release_history", dict()) + if "nightly" in parameters.get("target_tasks_method", ""): + parameters["release_history"] = populate_release_history("Firefox", project) + + if options.get("try_task_config_file"): + task_config_file = os.path.abspath(options.get("try_task_config_file")) + else: + # if try_task_config.json is present, load it + task_config_file = os.path.join(os.getcwd(), "try_task_config.json") + + # load try settings + if "try" in project and options["tasks_for"] == "hg-push": + set_try_config(parameters, task_config_file) + + if options.get("optimize_target_tasks") is not None: + parameters["optimize_target_tasks"] = options["optimize_target_tasks"] + + if "decision-parameters" in graph_config["taskgraph"]: + find_object(graph_config["taskgraph"]["decision-parameters"])( + graph_config, parameters + ) + + # Determine if this should be a backstop push. + parameters["backstop"] = is_backstop(parameters) + + result = Parameters(**parameters) + result.check() + return result + + +def get_existing_tasks(rebuild_kinds, parameters, graph_config): + """ + Find the decision task corresponding to the on-push graph, and return + a mapping of labels to task-ids from it. This will skip the kinds specificed + by `rebuild_kinds`. + """ + try: + decision_task = retry( + find_decision_task, + args=(parameters, graph_config), + attempts=4, + sleeptime=5 * 60, + ) + except Exception: + logger.exception("Didn't find existing push task.") + sys.exit(1) + _, task_graph = TaskGraph.from_json( + get_artifact(decision_task, "public/full-task-graph.json") + ) + parameters["existing_tasks"] = find_existing_tasks_from_previous_kinds( + task_graph, [decision_task], rebuild_kinds + ) + + +def set_try_config(parameters, task_config_file): + if os.path.isfile(task_config_file): + logger.info("using try tasks from {}".format(task_config_file)) + with open(task_config_file, "r") as fh: + task_config = json.load(fh) + task_config_version = task_config.pop("version", 1) + if task_config_version == 1: + validate_schema( + try_task_config_schema, + task_config, + "Invalid v1 `try_task_config.json`.", + ) + parameters["try_mode"] = "try_task_config" + parameters["try_task_config"] = task_config + elif task_config_version == 2: + validate_schema( + try_task_config_schema_v2, + task_config, + "Invalid v2 `try_task_config.json`.", + ) + parameters.update(task_config["parameters"]) + return + else: + raise Exception( + "Unknown `try_task_config.json` version: {}".format(task_config_version) + ) + + if "try:" in parameters["message"]: + parameters["try_mode"] = "try_option_syntax" + parameters.update(parse_message(parameters["message"])) + else: + parameters["try_options"] = None + + if parameters["try_mode"] == "try_task_config": + # The user has explicitly requested a set of jobs, so run them all + # regardless of optimization. Their dependencies can be optimized, + # though. + parameters["optimize_target_tasks"] = False + else: + # For a try push with no task selection, apply the default optimization + # process to all of the tasks. + parameters["optimize_target_tasks"] = True + + +def set_decision_indexes(decision_task_id, params, graph_config): + index_paths = [] + if params["backstop"]: + index_paths.append("{trust-domain}.v2.{project}.latest.taskgraph.backstop") + + subs = params.copy() + subs["trust-domain"] = graph_config["trust-domain"] + + index_paths = [i.format(**subs) for i in index_paths] + for index_path in index_paths: + insert_index(index_path, decision_task_id, use_proxy=True) + + +def write_artifact(filename, data): + logger.info("writing artifact file `{}`".format(filename)) + if not os.path.isdir(ARTIFACTS_DIR): + os.mkdir(ARTIFACTS_DIR) + path = os.path.join(ARTIFACTS_DIR, filename) + if filename.endswith(".yml"): + with open(path, "w") as f: + yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False) + elif filename.endswith(".json"): + with open(path, "w") as f: + json.dump(data, f, sort_keys=True, indent=2, separators=(",", ": ")) + elif filename.endswith(".json.gz"): + import gzip + + with gzip.open(path, "wb") as f: + f.write(json.dumps(data).encode("utf-8")) + else: + raise TypeError("Don't know how to write to {}".format(filename)) + + +def read_artifact(filename): + path = os.path.join(ARTIFACTS_DIR, filename) + if filename.endswith(".yml"): + return load_yaml(path, filename) + elif filename.endswith(".json"): + with open(path, "r") as f: + return json.load(f) + elif filename.endswith(".json.gz"): + import gzip + + with gzip.open(path, "rb") as f: + return json.load(f.decode("utf-8")) + else: + raise TypeError("Don't know how to read {}".format(filename)) + + +def rename_artifact(src, dest): + os.rename(os.path.join(ARTIFACTS_DIR, src), os.path.join(ARTIFACTS_DIR, dest)) |