diff options
Diffstat (limited to 'taskcluster/gecko_taskgraph/actions')
21 files changed, 3438 insertions, 0 deletions
diff --git a/taskcluster/gecko_taskgraph/actions/__init__.py b/taskcluster/gecko_taskgraph/actions/__init__.py new file mode 100644 index 0000000000..590a957282 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/__init__.py @@ -0,0 +1,16 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from .registry import ( + register_callback_action, + render_actions_json, + trigger_action_callback, +) + +__all__ = [ + "register_callback_action", + "render_actions_json", + "trigger_action_callback", +] diff --git a/taskcluster/gecko_taskgraph/actions/add_new_jobs.py b/taskcluster/gecko_taskgraph/actions/add_new_jobs.py new file mode 100644 index 0000000000..39200cff68 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/add_new_jobs.py @@ -0,0 +1,59 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from .registry import register_callback_action +from .util import combine_task_graph_files, create_tasks, fetch_graph_and_labels + + +@register_callback_action( + name="add-new-jobs", + title="Add new jobs", + symbol="add-new", + description="Add new jobs using task labels.", + order=100, + context=[], + schema={ + "type": "object", + "properties": { + "tasks": { + "type": "array", + "description": "An array of task labels", + "items": {"type": "string"}, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + }, + }, +) +def add_new_jobs_action(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + to_run = [] + for elem in input["tasks"]: + if elem in full_task_graph.tasks: + to_run.append(elem) + else: + raise Exception(f"{elem} was not found in the task-graph") + + times = input.get("times", 1) + for i in range(times): + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + i, + ) + combine_task_graph_files(list(range(times))) diff --git a/taskcluster/gecko_taskgraph/actions/add_talos.py b/taskcluster/gecko_taskgraph/actions/add_talos.py new file mode 100644 index 0000000000..56b0c49cc9 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/add_talos.py @@ -0,0 +1,59 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +from ..target_tasks import standard_filter +from .registry import register_callback_action +from .util import create_tasks, fetch_graph_and_labels + +logger = logging.getLogger(__name__) + + +@register_callback_action( + name="run-all-talos", + title="Run All Talos Tests", + symbol="raT", + description="Add all Talos tasks to a push.", + order=150, + context=[], + schema={ + "type": "object", + "properties": { + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 6, + "title": "Times", + "description": "How many times to run each task.", + } + }, + "additionalProperties": False, + }, +) +def add_all_talos(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + times = input.get("times", 1) + for i in range(times): + to_run = [ + label + for label, entry in full_task_graph.tasks.items() + if "talos_try_name" in entry.attributes + and standard_filter(entry, parameters) + ] + + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + ) + logger.info(f"Scheduled {len(to_run)} talos tasks (time {i + 1}/{times})") diff --git a/taskcluster/gecko_taskgraph/actions/backfill.py b/taskcluster/gecko_taskgraph/actions/backfill.py new file mode 100644 index 0000000000..4ec9b2e3c4 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/backfill.py @@ -0,0 +1,427 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import json +import logging +import re +import sys +from functools import partial + +from taskgraph.util.taskcluster import get_task_definition + +from .registry import register_callback_action +from .util import ( + combine_task_graph_files, + create_tasks, + fetch_graph_and_labels, + get_decision_task_id, + get_pushes, + get_pushes_from_params_input, + trigger_action, +) + +logger = logging.getLogger(__name__) +SYMBOL_REGEX = re.compile("^(.*)-[a-z0-9]{11}-bk$") +GROUP_SYMBOL_REGEX = re.compile("^(.*)-bk$") + + +def input_for_support_action(revision, task, times=1, retrigger=True): + """Generate input for action to be scheduled. + + Define what label to schedule with 'label'. + If it is a test task that uses explicit manifests add that information. + """ + input = { + "label": task["metadata"]["name"], + "revision": revision, + "times": times, + # We want the backfilled tasks to share the same symbol as the originating task + "symbol": task["extra"]["treeherder"]["symbol"], + "retrigger": retrigger, + } + + # Support tasks that are using manifest based scheduling + if task["payload"].get("env", {}).get("MOZHARNESS_TEST_PATHS"): + input["test_manifests"] = json.loads( + task["payload"]["env"]["MOZHARNESS_TEST_PATHS"] + ) + + return input + + +@register_callback_action( + title="Backfill", + name="backfill", + permission="backfill", + symbol="Bk", + description=("Given a task schedule it on previous pushes in the same project."), + order=200, + context=[{}], # This will be available for all tasks + schema={ + "type": "object", + "properties": { + "depth": { + "type": "integer", + "default": 19, + "minimum": 1, + "maximum": 25, + "title": "Depth", + "description": ( + "The number of previous pushes before the current " + "push to attempt to trigger this task on." + ), + }, + "inclusive": { + "type": "boolean", + "default": False, + "title": "Inclusive Range", + "description": ( + "If true, the backfill will also retrigger the task " + "on the selected push." + ), + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 10, + "title": "Times", + "description": ( + "The number of times to execute each job you are backfilling." + ), + }, + "retrigger": { + "type": "boolean", + "default": True, + "title": "Retrigger", + "description": ( + "If False, the task won't retrigger on pushes that have already " + "ran it." + ), + }, + }, + "additionalProperties": False, + }, + available=lambda parameters: True, +) +def backfill_action(parameters, graph_config, input, task_group_id, task_id): + """ + This action takes a task ID and schedules it on previous pushes (via support action). + + To execute this action locally follow the documentation here: + https://firefox-source-docs.mozilla.org/taskcluster/actions.html#testing-the-action-locally + """ + task = get_task_definition(task_id) + pushes = get_pushes_from_params_input(parameters, input) + failed = False + input_for_action = input_for_support_action( + revision=parameters["head_rev"], + task=task, + times=input.get("times", 1), + retrigger=input.get("retrigger", True), + ) + + for push_id in pushes: + try: + # The Gecko decision task can sometimes fail on a push and we need to handle + # the exception that this call will produce + push_decision_task_id = get_decision_task_id(parameters["project"], push_id) + except Exception: + logger.warning(f"Could not find decision task for push {push_id}") + # The decision task may have failed, this is common enough that we + # don't want to report an error for it. + continue + + try: + trigger_action( + action_name="backfill-task", + # This lets the action know on which push we want to add a new task + decision_task_id=push_decision_task_id, + input=input_for_action, + ) + except Exception: + logger.exception(f"Failed to trigger action for {push_id}") + failed = True + + if failed: + sys.exit(1) + + +def add_backfill_suffix(regex, symbol, suffix): + m = regex.match(symbol) + if m is None: + symbol += suffix + return symbol + + +def backfill_modifier(task, input): + if task.label != input["label"]: + return task + + logger.debug(f"Modifying test_manifests for {task.label}") + times = input.get("times", 1) + + # Set task duplicates based on 'times' value. + if times > 1: + task.attributes["task_duplicates"] = times + + # If the original task has defined test paths + test_manifests = input.get("test_manifests") + if test_manifests: + revision = input.get("revision") + + task.attributes["test_manifests"] = test_manifests + task.task["payload"]["env"]["MOZHARNESS_TEST_PATHS"] = json.dumps( + test_manifests + ) + # The name/label might have been modify in new_label, thus, change it here as well + task.task["metadata"]["name"] = task.label + th_info = task.task["extra"]["treeherder"] + # Use a job symbol of the originating task as defined in the backfill action + th_info["symbol"] = add_backfill_suffix( + SYMBOL_REGEX, th_info["symbol"], f"-{revision[0:11]}-bk" + ) + if th_info.get("groupSymbol"): + # Group all backfilled tasks together + th_info["groupSymbol"] = add_backfill_suffix( + GROUP_SYMBOL_REGEX, th_info["groupSymbol"], "-bk" + ) + task.task["tags"]["action"] = "backfill-task" + return task + + +def do_not_modify(task): + return task + + +def new_label(label, tasks): + """This is to handle the case when a previous push does not contain a specific task label + and we try to find a label we can reuse. + + For instance, we try to backfill chunk #3, however, a previous push does not contain such + chunk, thus, we try to reuse another task/label. + """ + begining_label, ending = label.rsplit("-", 1) + if ending.isdigit(): + # We assume that the taskgraph has chunk #1 OR unnumbered chunk and we hijack it + if begining_label in tasks: + return begining_label + elif begining_label + "-1" in tasks: + return begining_label + "-1" + else: + raise Exception(f"New label ({label}) was not found in the task-graph") + else: + raise Exception(f"{label} was not found in the task-graph") + + +@register_callback_action( + name="backfill-task", + title="Backfill task on a push.", + permission="backfill", + symbol="backfill-task", + description="This action is normally scheduled by the backfill action. " + "The intent is to schedule a task on previous pushes.", + order=500, + context=[], + schema={ + "type": "object", + "properties": { + "label": {"type": "string", "description": "A task label"}, + "revision": { + "type": "string", + "description": "Revision of the original push from where we backfill.", + }, + "symbol": { + "type": "string", + "description": "Symbol to be used by the scheduled task.", + }, + "test_manifests": { + "type": "array", + "default": [], + "description": "An array of test manifest paths", + "items": {"type": "string"}, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 10, + "title": "Times", + "description": ( + "The number of times to execute each job " "you are backfilling." + ), + }, + "retrigger": { + "type": "boolean", + "default": True, + "title": "Retrigger", + "description": ( + "If False, the task won't retrigger on pushes that have already " + "ran it." + ), + }, + }, + }, +) +def add_task_with_original_manifests( + parameters, graph_config, input, task_group_id, task_id +): + """ + This action is normally scheduled by the backfill action. The intent is to schedule a test + task with the test manifests from the original task (if available). + + The push in which we want to schedule a new task is defined by the parameters object. + + To execute this action locally follow the documentation here: + https://firefox-source-docs.mozilla.org/taskcluster/actions.html#testing-the-action-locally + """ + # This step takes a lot of time when executed locally + logger.info("Retreving the full task graph and labels.") + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + label = input.get("label") + if not input.get("retrigger") and label in label_to_taskid: + logger.info( + f"Skipping push with decision task ID {decision_task_id} as it already has this test." + ) + return + + if label not in full_task_graph.tasks: + label = new_label(label, full_task_graph.tasks) + + to_run = [label] + + logger.info("Creating tasks...") + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + suffix="0", + modifier=partial(backfill_modifier, input=input), + ) + + # TODO Implement a way to write out artifacts without assuming there's + # multiple sets of them so we can stop passing in "suffix". + combine_task_graph_files(["0"]) + + +@register_callback_action( + title="Backfill all browsertime", + name="backfill-all-browsertime", + permission="backfill", + symbol="baB", + description=( + "Schedule all browsertime tests for the current and previous push in the same project." + ), + order=800, + context=[], # This will be available for all tasks + available=lambda parameters: True, +) +def backfill_all_browsertime(parameters, graph_config, input, task_group_id, task_id): + """ + This action takes a revision and schedules it on previous pushes (via support action). + + To execute this action locally follow the documentation here: + https://firefox-source-docs.mozilla.org/taskcluster/actions.html#testing-the-action-locally + """ + pushes = get_pushes( + project=parameters["head_repository"], + end_id=int(parameters["pushlog_id"]), + depth=2, + ) + + for push_id in pushes: + try: + # The Gecko decision task can sometimes fail on a push and we need to handle + # the exception that this call will produce + push_decision_task_id = get_decision_task_id(parameters["project"], push_id) + except Exception: + logger.warning(f"Could not find decision task for push {push_id}") + # The decision task may have failed, this is common enough that we + # don't want to report an error for it. + continue + + try: + trigger_action( + action_name="add-all-browsertime", + # This lets the action know on which push we want to add a new task + decision_task_id=push_decision_task_id, + ) + except Exception: + logger.exception(f"Failed to trigger action for {push_id}") + sys.exit(1) + + +def filter_raptor_jobs(full_task_graph, label_to_taskid): + to_run = [] + for label, entry in full_task_graph.tasks.items(): + if entry.kind != "test": + continue + if entry.task.get("extra", {}).get("suite", "") != "raptor": + continue + if "browsertime" not in entry.attributes.get("raptor_try_name", ""): + continue + if not entry.attributes.get("test_platform", "").endswith("shippable-qr/opt"): + continue + if "android" in entry.attributes.get("test_platform", ""): + # Bug 1786254 - The backfill bot is scheduling too many tests atm + continue + exceptions = ("live", "profiling", "youtube-playback") + if any(e in entry.attributes.get("raptor_try_name", "") for e in exceptions): + continue + if "firefox" in entry.attributes.get( + "raptor_try_name", "" + ) and entry.attributes.get("test_platform", "").endswith("64-shippable-qr/opt"): + # add the browsertime test + if label not in label_to_taskid: + to_run.append(label) + if "geckoview" in entry.attributes.get("raptor_try_name", ""): + # add the pageload test + if label not in label_to_taskid: + to_run.append(label) + return to_run + + +@register_callback_action( + name="add-all-browsertime", + title="Add All Browsertime Tests.", + permission="backfill", + symbol="aaB", + description="This action is normally scheduled by the backfill-all-browsertime action. " + "The intent is to schedule all browsertime tests on a specific pushe.", + order=900, + context=[], +) +def add_all_browsertime(parameters, graph_config, input, task_group_id, task_id): + """ + This action is normally scheduled by the backfill-all-browsertime action. The intent is to + trigger all browsertime tasks for the current revision. + + The push in which we want to schedule a new task is defined by the parameters object. + + To execute this action locally follow the documentation here: + https://firefox-source-docs.mozilla.org/taskcluster/actions.html#testing-the-action-locally + """ + logger.info("Retreving the full task graph and labels.") + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + to_run = filter_raptor_jobs(full_task_graph, label_to_taskid) + + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + ) + logger.info(f"Scheduled {len(to_run)} raptor tasks (time 1)") diff --git a/taskcluster/gecko_taskgraph/actions/cancel.py b/taskcluster/gecko_taskgraph/actions/cancel.py new file mode 100644 index 0000000000..d895781395 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/cancel.py @@ -0,0 +1,36 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +import requests +from taskgraph.util.taskcluster import cancel_task + +from .registry import register_callback_action + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="Cancel Task", + name="cancel", + symbol="cx", + description=("Cancel the given task"), + order=350, + context=[{}], +) +def cancel_action(parameters, graph_config, input, task_group_id, task_id): + # Note that this is limited by the scopes afforded to generic actions to + # only cancel tasks with the level-specific schedulerId. + try: + cancel_task(task_id, use_proxy=True) + except requests.HTTPError as e: + if e.response.status_code == 409: + # A 409 response indicates that this task is past its deadline. It + # cannot be cancelled at this time, but it's also not running + # anymore, so we can ignore this error. + logger.info(f"Task {task_id} is past its deadline and cannot be cancelled.") + return + raise diff --git a/taskcluster/gecko_taskgraph/actions/cancel_all.py b/taskcluster/gecko_taskgraph/actions/cancel_all.py new file mode 100644 index 0000000000..d74b83b7d8 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/cancel_all.py @@ -0,0 +1,60 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import concurrent.futures as futures +import logging +import os + +import requests +from taskgraph.util.taskcluster import CONCURRENCY, cancel_task + +from gecko_taskgraph.util.taskcluster import list_task_group_incomplete_task_ids + +from .registry import register_callback_action + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="Cancel All", + name="cancel-all", + symbol="cAll", + description=( + "Cancel all running and pending tasks created by the decision task " + "this action task is associated with." + ), + order=400, + context=[], +) +def cancel_all_action(parameters, graph_config, input, task_group_id, task_id): + def do_cancel_task(task_id): + logger.info(f"Cancelling task {task_id}") + try: + cancel_task(task_id, use_proxy=True) + except requests.HTTPError as e: + if e.response.status_code == 409: + # A 409 response indicates that this task is past its deadline. It + # cannot be cancelled at this time, but it's also not running + # anymore, so we can ignore this error. + logger.info( + "Task {} is past its deadline and cannot be cancelled.".format( + task_id + ) + ) + return + raise + + own_task_id = os.environ.get("TASK_ID", "") + to_cancel = [ + t + for t in list_task_group_incomplete_task_ids(task_group_id) + if t != own_task_id + ] + + logger.info(f"Cancelling {len(to_cancel)} tasks") + with futures.ThreadPoolExecutor(CONCURRENCY) as e: + cancel_futs = [e.submit(do_cancel_task, t) for t in to_cancel] + for f in futures.as_completed(cancel_futs): + f.result() diff --git a/taskcluster/gecko_taskgraph/actions/create_interactive.py b/taskcluster/gecko_taskgraph/actions/create_interactive.py new file mode 100644 index 0000000000..9f7bd0203b --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/create_interactive.py @@ -0,0 +1,193 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging +import os +import re + +import taskcluster_urls +from taskgraph.util.taskcluster import get_root_url, get_task_definition, send_email + +from gecko_taskgraph.actions.registry import register_callback_action +from gecko_taskgraph.actions.util import create_tasks, fetch_graph_and_labels + +logger = logging.getLogger(__name__) + +EMAIL_SUBJECT = "Your Interactive Task for {label}" +EMAIL_CONTENT = """\ +As you requested, Firefox CI has created an interactive task to run {label} +on revision {revision} in {repo}. Click the button below to connect to the +task. You may need to wait for it to begin running. +""" + +### +# Security Concerns +# +# An "interactive task" is, quite literally, shell access to a worker. That +# is limited by being in a Docker container, but we assume that Docker has +# bugs so we do not want to rely on container isolation exclusively. +# +# Interactive tasks should never be allowed on hosts that build binaries +# leading to a release -- level 3 builders. +# +# Users must not be allowed to create interactive tasks for tasks above +# their own level. +# +# Interactive tasks must not have any routes that might make them appear +# in the index to be used by other production tasks. +# +# Interactive tasks should not be able to write to any docker-worker caches. + +SCOPE_WHITELIST = [ + # these are not actually secrets, and just about everything needs them + re.compile(r"^secrets:get:project/taskcluster/gecko/(hgfingerprint|hgmointernal)$"), + # public downloads are OK + re.compile(r"^docker-worker:relengapi-proxy:tooltool.download.public$"), + re.compile(r"^project:releng:services/tooltool/api/download/public$"), + # internal downloads are OK + re.compile(r"^docker-worker:relengapi-proxy:tooltool.download.internal$"), + re.compile(r"^project:releng:services/tooltool/api/download/internal$"), + # private toolchain artifacts from tasks + re.compile(r"^queue:get-artifact:project/gecko/.*$"), + # level-appropriate secrets are generally necessary to run a task; these + # also are "not that secret" - most of them are built into the resulting + # binary and could be extracted by someone with `strings`. + re.compile(r"^secrets:get:project/releng/gecko/build/level-[0-9]/\*"), + # ptracing is generally useful for interactive tasks, too! + re.compile(r"^docker-worker:feature:allowPtrace$"), + # docker-worker capabilities include loopback devices + re.compile(r"^docker-worker:capability:device:.*$"), + re.compile(r"^docker-worker:capability:privileged$"), + re.compile(r"^docker-worker:cache:gecko-level-1-checkouts.*$"), + re.compile(r"^docker-worker:cache:gecko-level-1-tooltool-cache.*$"), +] + + +def context(params): + # available for any docker-worker tasks at levels 1, 2; and for + # test tasks on level 3 (level-3 builders are firewalled off) + if int(params["level"]) < 3: + return [{"worker-implementation": "docker-worker"}] + else: + return [{"worker-implementation": "docker-worker", "kind": "test"}] + # Windows is not supported by one-click loaners yet. See + # https://wiki.mozilla.org/ReleaseEngineering/How_To/Self_Provision_a_TaskCluster_Windows_Instance + # for instructions for using them. + + +@register_callback_action( + title="Create Interactive Task", + name="create-interactive", + symbol="create-inter", + description=("Create a a copy of the task that you can interact with"), + order=50, + context=context, + schema={ + "type": "object", + "properties": { + "notify": { + "type": "string", + "format": "email", + "title": "Who to notify of the pending interactive task", + "description": ( + "Enter your email here to get an email containing a link " + "to interact with the task" + ), + # include a default for ease of users' editing + "default": "noreply@noreply.mozilla.org", + }, + }, + "additionalProperties": False, + }, +) +def create_interactive_action(parameters, graph_config, input, task_group_id, task_id): + # fetch the original task definition from the taskgraph, to avoid + # creating interactive copies of unexpected tasks. Note that this only applies + # to docker-worker tasks, so we can assume the docker-worker payload format. + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + task = get_task_definition(task_id) + label = task["metadata"]["name"] + + def edit(task): + if task.label != label: + return task + task_def = task.task + + # drop task routes (don't index this!) + task_def["routes"] = [] + + # only try this once + task_def["retries"] = 0 + + # short expirations, at least 3 hour maxRunTime + task_def["deadline"] = {"relative-datestamp": "12 hours"} + task_def["created"] = {"relative-datestamp": "0 hours"} + task_def["expires"] = {"relative-datestamp": "1 day"} + + # filter scopes with the SCOPE_WHITELIST + task.task["scopes"] = [ + s + for s in task.task.get("scopes", []) + if any(p.match(s) for p in SCOPE_WHITELIST) + ] + + payload = task_def["payload"] + + # make sure the task runs for long enough.. + payload["maxRunTime"] = max(3600 * 3, payload.get("maxRunTime", 0)) + + # no caches or artifacts + payload["cache"] = {} + payload["artifacts"] = {} + + # enable interactive mode + payload.setdefault("features", {})["interactive"] = True + payload.setdefault("env", {})["TASKCLUSTER_INTERACTIVE"] = "true" + + for key in task_def["payload"]["env"].keys(): + payload["env"][key] = task_def["payload"]["env"].get(key, "") + + return task + + # Create the task and any of its dependencies. This uses a new taskGroupId to avoid + # polluting the existing taskGroup with interactive tasks. + action_task_id = os.environ.get("TASK_ID") + label_to_taskid = create_tasks( + graph_config, + [label], + full_task_graph, + label_to_taskid, + parameters, + decision_task_id=action_task_id, + modifier=edit, + ) + + taskId = label_to_taskid[label] + logger.info(f"Created interactive task {taskId}; sending notification") + + if input and "notify" in input: + email = input["notify"] + # no point sending to a noreply address! + if email == "noreply@noreply.mozilla.org": + return + + info = { + "url": taskcluster_urls.ui(get_root_url(False), f"tasks/{taskId}/connect"), + "label": label, + "revision": parameters["head_rev"], + "repo": parameters["head_repository"], + } + send_email( + email, + subject=EMAIL_SUBJECT.format(**info), + content=EMAIL_CONTENT.format(**info), + link={ + "text": "Connect", + "href": info["url"], + }, + use_proxy=True, + ) diff --git a/taskcluster/gecko_taskgraph/actions/gecko_profile.py b/taskcluster/gecko_taskgraph/actions/gecko_profile.py new file mode 100644 index 0000000000..ce4394e77c --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/gecko_profile.py @@ -0,0 +1,138 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +import requests +from requests.exceptions import HTTPError +from taskgraph.taskgraph import TaskGraph +from taskgraph.util.taskcluster import get_artifact_from_index, get_task_definition + +from gecko_taskgraph.util.taskgraph import find_decision_task + +from .registry import register_callback_action +from .util import combine_task_graph_files, create_tasks + +PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}" +INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision" + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="GeckoProfile", + name="geckoprofile", + symbol="Gp", + description=( + "Take the label of the current task, " + "and trigger the task with that label " + "on previous pushes in the same project " + "while adding the --gecko-profile cmd arg." + ), + order=200, + context=[{"test-type": "talos"}, {"test-type": "raptor"}], + schema={}, + available=lambda parameters: True, +) +def geckoprofile_action(parameters, graph_config, input, task_group_id, task_id): + task = get_task_definition(task_id) + label = task["metadata"]["name"] + pushes = [] + depth = 2 + end_id = int(parameters["pushlog_id"]) + + while True: + start_id = max(end_id - depth, 0) + pushlog_url = PUSHLOG_TMPL.format( + parameters["head_repository"], start_id, end_id + ) + r = requests.get(pushlog_url) + r.raise_for_status() + pushes = pushes + list(r.json()["pushes"].keys()) + if len(pushes) >= depth: + break + + end_id = start_id - 1 + start_id -= depth + if start_id < 0: + break + + pushes = sorted(pushes)[-depth:] + backfill_pushes = [] + + for push in pushes: + try: + full_task_graph = get_artifact_from_index( + INDEX_TMPL.format(parameters["project"], push), + "public/full-task-graph.json", + ) + _, full_task_graph = TaskGraph.from_json(full_task_graph) + label_to_taskid = get_artifact_from_index( + INDEX_TMPL.format(parameters["project"], push), + "public/label-to-taskid.json", + ) + push_params = get_artifact_from_index( + INDEX_TMPL.format(parameters["project"], push), "public/parameters.yml" + ) + push_decision_task_id = find_decision_task(push_params, graph_config) + except HTTPError as e: + logger.info(f"Skipping {push} due to missing index artifacts! Error: {e}") + continue + + if label in full_task_graph.tasks.keys(): + + def modifier(task): + if task.label != label: + return task + + cmd = task.task["payload"]["command"] + task.task["payload"]["command"] = add_args_to_perf_command( + cmd, ["--gecko-profile"] + ) + task.task["extra"]["treeherder"]["symbol"] += "-p" + task.task["extra"]["treeherder"]["groupName"] += " (profiling)" + return task + + create_tasks( + graph_config, + [label], + full_task_graph, + label_to_taskid, + push_params, + push_decision_task_id, + push, + modifier=modifier, + ) + backfill_pushes.append(push) + else: + logging.info(f"Could not find {label} on {push}. Skipping.") + combine_task_graph_files(backfill_pushes) + + +def add_args_to_perf_command(payload_commands, extra_args=[]): + """ + Add custom command line args to a given command. + args: + payload_commands: the raw command as seen by taskcluster + extra_args: array of args we want to inject + """ + perf_command_idx = -1 # currently, it's the last (or only) command + perf_command = payload_commands[perf_command_idx] + + command_form = "default" + if isinstance(perf_command, str): + # windows has a single command, in long string form + perf_command = perf_command.split(" ") + command_form = "string" + # osx & linux have an array of subarrays + + perf_command.extend(extra_args) + + if command_form == "string": + # pack it back to list + perf_command = " ".join(perf_command) + + payload_commands[perf_command_idx] = perf_command + return payload_commands diff --git a/taskcluster/gecko_taskgraph/actions/isolate_test.py b/taskcluster/gecko_taskgraph/actions/isolate_test.py new file mode 100644 index 0000000000..b1c6ced703 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/isolate_test.py @@ -0,0 +1,254 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import copy +import json +import logging +import os +import re + +from taskgraph.util.parameterization import resolve_task_references +from taskgraph.util.taskcluster import get_artifact, get_task_definition, list_artifacts + +from .registry import register_callback_action +from .util import add_args_to_command, create_task_from_def, fetch_graph_and_labels + +logger = logging.getLogger(__name__) + + +def get_failures(task_id): + """Returns a dict containing properties containing a list of + directories containing test failures and a separate list of + individual test failures from the errorsummary.log artifact for + the task. + + Calls the helper function munge_test_path to attempt to find an + appropriate test path to pass to the task in + MOZHARNESS_TEST_PATHS. If no appropriate test path can be + determined, nothing is returned. + """ + + def re_compile_list(*lst): + # Ideally we'd just use rb"" literals and avoid the encode, but + # this file needs to be importable in python2 for now. + return [re.compile(s.encode("utf-8")) for s in lst] + + re_bad_tests = re_compile_list( + r"Last test finished", + r"LeakSanitizer", + r"Main app process exited normally", + r"ShutdownLeaks", + r"[(]SimpleTest/TestRunner.js[)]", + r"automation.py", + r"https?://localhost:\d+/\d+/\d+/.*[.]html", + r"jsreftest", + r"leakcheck", + r"mozrunner-startup", + r"pid: ", + r"RemoteProcessMonitor", + r"unknown test url", + ) + re_extract_tests = re_compile_list( + r'"test": "(?:[^:]+:)?(?:https?|file):[^ ]+/reftest/tests/([^ "]+)', + r'"test": "(?:[^:]+:)?(?:https?|file):[^:]+:[0-9]+/tests/([^ "]+)', + r'xpcshell-?[^ "]*\.ini:([^ "]+)', + r'/tests/([^ "]+) - finished .*', + r'"test": "([^ "]+)"', + r'"message": "Error running command run_test with arguments ' + r"[(]<wptrunner[.]wpttest[.]TestharnessTest ([^>]+)>", + r'"message": "TEST-[^ ]+ [|] ([^ "]+)[^|]*[|]', + ) + + def munge_test_path(line): + test_path = None + for r in re_bad_tests: + if r.search(line): + return None + for r in re_extract_tests: + m = r.search(line) + if m: + test_path = m.group(1) + break + return test_path + + dirs = set() + tests = set() + artifacts = list_artifacts(task_id) + for artifact in artifacts: + if "name" not in artifact or not artifact["name"].endswith("errorsummary.log"): + continue + + stream = get_artifact(task_id, artifact["name"]) + if not stream: + continue + + # The number of tasks created is determined by the + # `times` value and the number of distinct tests and + # directories as: times * (1 + len(tests) + len(dirs)). + # Since the maximum value of `times` specifiable in the + # Treeherder UI is 100, the number of tasks created can + # reach a very large value depending on the number of + # unique tests. During testing, it was found that 10 + # distinct tests were sufficient to cause the action task + # to exceed the maxRunTime of 1800 seconds resulting in it + # being aborted. We limit the number of distinct tests + # and thereby the number of distinct test directories to a + # maximum of 5 to keep the action task from timing out. + + # We handle the stream as raw bytes because it may contain invalid + # UTF-8 characters in portions other than those containing the error + # messages we're looking for. + for line in stream.read().split(b"\n"): + test_path = munge_test_path(line.strip()) + + if test_path: + tests.add(test_path.decode("utf-8")) + test_dir = os.path.dirname(test_path) + if test_dir: + dirs.add(test_dir.decode("utf-8")) + + if len(tests) > 4: + break + + return {"dirs": sorted(dirs), "tests": sorted(tests)} + + +def create_isolate_failure_tasks(task_definition, failures, level, times): + """ + Create tasks to re-run the original task plus tasks to test + each failing test directory and individual path. + + """ + logger.info(f"Isolate task:\n{json.dumps(task_definition, indent=2)}") + + # Operate on a copy of the original task_definition + task_definition = copy.deepcopy(task_definition) + + task_name = task_definition["metadata"]["name"] + repeatable_task = False + if ( + "crashtest" in task_name + or "mochitest" in task_name + or "reftest" in task_name + and "jsreftest" not in task_name + ): + repeatable_task = True + + th_dict = task_definition["extra"]["treeherder"] + symbol = th_dict["symbol"] + is_windows = "windows" in th_dict["machine"]["platform"] + + suite = task_definition["extra"]["suite"] + if "-coverage" in suite: + suite = suite[: suite.index("-coverage")] + is_wpt = "web-platform-tests" in suite + + # command is a copy of task_definition['payload']['command'] from the original task. + # It is used to create the new version containing the + # task_definition['payload']['command'] with repeat_args which is updated every time + # through the failure_group loop. + + command = copy.deepcopy(task_definition["payload"]["command"]) + + th_dict["groupSymbol"] = th_dict["groupSymbol"] + "-I" + th_dict["tier"] = 3 + + for i in range(times): + create_task_from_def(task_definition, level) + + if repeatable_task: + task_definition["payload"]["maxRunTime"] = 3600 * 3 + + for failure_group in failures: + if len(failures[failure_group]) == 0: + continue + if failure_group == "dirs": + failure_group_suffix = "-id" + # execute 5 total loops + repeat_args = ["--repeat=4"] if repeatable_task else [] + elif failure_group == "tests": + failure_group_suffix = "-it" + # execute 20 total loops + repeat_args = ["--repeat=19"] if repeatable_task else [] + else: + logger.error( + "create_isolate_failure_tasks: Unknown failure_group {}".format( + failure_group + ) + ) + continue + + if repeat_args: + task_definition["payload"]["command"] = add_args_to_command( + command, extra_args=repeat_args + ) + else: + task_definition["payload"]["command"] = command + + for failure_path in failures[failure_group]: + th_dict["symbol"] = symbol + failure_group_suffix + if is_wpt: + failure_path = "testing/web-platform/tests" + failure_path + if is_windows: + failure_path = "\\".join(failure_path.split("/")) + task_definition["payload"]["env"]["MOZHARNESS_TEST_PATHS"] = json.dumps( + {suite: [failure_path]}, sort_keys=True + ) + + logger.info( + "Creating task for path {} with command {}".format( + failure_path, task_definition["payload"]["command"] + ) + ) + for i in range(times): + create_task_from_def(task_definition, level) + + +@register_callback_action( + name="isolate-test-failures", + title="Isolate test failures in job", + symbol="it", + description="Re-run Tests for original manifest, directories and tests for failing tests.", + order=150, + context=[{"kind": "test"}], + schema={ + "type": "object", + "properties": { + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + } + }, + "additionalProperties": False, + }, +) +def isolate_test_failures(parameters, graph_config, input, task_group_id, task_id): + task = get_task_definition(task_id) + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + pre_task = full_task_graph.tasks[task["metadata"]["name"]] + + # fix up the task's dependencies, similar to how optimization would + # have done in the decision + dependencies = { + name: label_to_taskid[label] for name, label in pre_task.dependencies.items() + } + + task_definition = resolve_task_references( + pre_task.label, pre_task.task, task_id, decision_task_id, dependencies + ) + task_definition.setdefault("dependencies", []).extend(dependencies.values()) + + failures = get_failures(task_id) + logger.info("isolate_test_failures: %s" % failures) + create_isolate_failure_tasks( + task_definition, failures, parameters["level"], input["times"] + ) diff --git a/taskcluster/gecko_taskgraph/actions/merge_automation.py b/taskcluster/gecko_taskgraph/actions/merge_automation.py new file mode 100644 index 0000000000..8b9455536b --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/merge_automation.py @@ -0,0 +1,99 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from taskgraph.parameters import Parameters + +from gecko_taskgraph.actions.registry import register_callback_action +from gecko_taskgraph.decision import taskgraph_decision +from gecko_taskgraph.util.attributes import RELEASE_PROMOTION_PROJECTS + + +def is_release_promotion_available(parameters): + return parameters["project"] in RELEASE_PROMOTION_PROJECTS + + +@register_callback_action( + name="merge-automation", + title="Merge Day Automation", + symbol="${input.behavior}", + description="Merge repository branches.", + permission="merge-automation", + order=500, + context=[], + available=is_release_promotion_available, + schema=lambda graph_config: { + "type": "object", + "properties": { + "force-dry-run": { + "type": "boolean", + "description": "Override other options and do not push changes", + "default": True, + }, + "push": { + "type": "boolean", + "description": "Push changes using to_repo and to_branch", + "default": False, + }, + "behavior": { + "type": "string", + "description": "The type of release promotion to perform.", + "enum": sorted(graph_config["merge-automation"]["behaviors"].keys()), + "default": "central-to-beta", + }, + "from-repo": { + "type": "string", + "description": "The URI of the source repository", + }, + "to-repo": { + "type": "string", + "description": "The push URI of the target repository", + }, + "from-branch": { + "type": "string", + "description": "The fx head of the source, such as central", + }, + "to-branch": { + "type": "string", + "description": "The fx head of the target, such as beta", + }, + "ssh-user-alias": { + "type": "string", + "description": "The alias of an ssh account to use when pushing changes.", + }, + "fetch-version-from": { + "type": "string", + "description": "Path to file used when querying current version.", + }, + }, + "required": ["behavior"], + }, +) +def merge_automation_action(parameters, graph_config, input, task_group_id, task_id): + + # make parameters read-write + parameters = dict(parameters) + + parameters["target_tasks_method"] = "merge_automation" + parameters["merge_config"] = { + "force-dry-run": input.get("force-dry-run", False), + "behavior": input["behavior"], + } + + for field in [ + "from-repo", + "from-branch", + "to-repo", + "to-branch", + "ssh-user-alias", + "push", + "fetch-version-from", + ]: + if input.get(field): + parameters["merge_config"][field] = input[field] + parameters["tasks_for"] = "action" + + # make parameters read-only + parameters = Parameters(**parameters) + + taskgraph_decision({"root": graph_config.root_dir}, parameters=parameters) diff --git a/taskcluster/gecko_taskgraph/actions/openh264.py b/taskcluster/gecko_taskgraph/actions/openh264.py new file mode 100644 index 0000000000..226817f696 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/openh264.py @@ -0,0 +1,33 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from .registry import register_callback_action +from .util import create_tasks, fetch_graph_and_labels + + +@register_callback_action( + name="openh264", + title="OpenH264 Binaries", + symbol="h264", + description="Action to prepare openh264 binaries for shipping", + context=[], +) +def openh264_action(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + to_run = [ + label + for label, entry in full_task_graph.tasks.items() + if "openh264" in entry.kind + ] + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + ) diff --git a/taskcluster/gecko_taskgraph/actions/purge_caches.py b/taskcluster/gecko_taskgraph/actions/purge_caches.py new file mode 100644 index 0000000000..4905526f6c --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/purge_caches.py @@ -0,0 +1,34 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +from taskgraph.util.taskcluster import get_task_definition, purge_cache + +from .registry import register_callback_action + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="Purge Worker Caches", + name="purge-cache", + symbol="purge-cache", + description=( + "Purge any caches associated with this task " + "across all workers of the same workertype as the task." + ), + order=450, + context=[{"worker-implementation": "docker-worker"}], +) +def purge_caches_action(parameters, graph_config, input, task_group_id, task_id): + task = get_task_definition(task_id) + if task["payload"].get("cache"): + for cache in task["payload"]["cache"]: + purge_cache( + task["provisionerId"], task["workerType"], cache, use_proxy=True + ) + else: + logger.info("Task has no caches. Will not clear anything!") diff --git a/taskcluster/gecko_taskgraph/actions/rebuild_cached_tasks.py b/taskcluster/gecko_taskgraph/actions/rebuild_cached_tasks.py new file mode 100644 index 0000000000..9d8906caf1 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/rebuild_cached_tasks.py @@ -0,0 +1,37 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from .registry import register_callback_action +from .util import create_tasks, fetch_graph_and_labels + + +@register_callback_action( + name="rebuild-cached-tasks", + title="Rebuild Cached Tasks", + symbol="rebuild-cached", + description="Rebuild cached tasks.", + order=1000, + context=[], +) +def rebuild_cached_tasks_action( + parameters, graph_config, input, task_group_id, task_id +): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + cached_tasks = [ + label + for label, task in full_task_graph.tasks.items() + if task.attributes.get("cached_task", False) + ] + if cached_tasks: + create_tasks( + graph_config, + cached_tasks, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + ) diff --git a/taskcluster/gecko_taskgraph/actions/registry.py b/taskcluster/gecko_taskgraph/actions/registry.py new file mode 100644 index 0000000000..fd3937707a --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/registry.py @@ -0,0 +1,369 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import json +import re +from collections import namedtuple +from types import FunctionType + +from mozbuild.util import memoize +from taskgraph import create +from taskgraph.config import load_graph_config +from taskgraph.parameters import Parameters +from taskgraph.util import taskcluster, yaml +from taskgraph.util.python_path import import_sibling_modules + +from gecko_taskgraph.util import hash + +actions = [] +callbacks = {} + +Action = namedtuple("Action", ["order", "cb_name", "permission", "action_builder"]) + + +def is_json(data): + """Return ``True``, if ``data`` is a JSON serializable data structure.""" + try: + json.dumps(data) + except ValueError: + return False + return True + + +@memoize +def read_taskcluster_yml(filename): + """Load and parse .taskcluster.yml, memoized to save some time""" + return yaml.load_yaml(filename) + + +@memoize +def hash_taskcluster_yml(filename): + """ + Generate a hash of the given .taskcluster.yml. This is the first 10 digits + of the sha256 of the file's content, and is used by administrative scripts + to create a hook based on this content. + """ + return hash.hash_path(filename)[:10] + + +def register_callback_action( + name, + title, + symbol, + description, + order=10000, + context=[], + available=lambda parameters: True, + schema=None, + permission="generic", + cb_name=None, +): + """ + Register an action callback that can be triggered from supporting + user interfaces, such as Treeherder. + + This function is to be used as a decorator for a callback that takes + parameters as follows: + + ``parameters``: + Decision task parameters, see ``taskgraph.parameters.Parameters``. + ``input``: + Input matching specified JSON schema, ``None`` if no ``schema`` + parameter is given to ``register_callback_action``. + ``task_group_id``: + The id of the task-group this was triggered for. + ``task_id`` and `task``: + task identifier and task definition for task the action was triggered + for, ``None`` if no ``context`` parameters was given to + ``register_callback_action``. + + Parameters + ---------- + name : str + An identifier for this action, used by UIs to find the action. + title : str + A human readable title for the action to be used as label on a button + or text on a link for triggering the action. + symbol : str + Treeherder symbol for the action callback, this is the symbol that the + task calling your callback will be displayed as. This is usually 1-3 + letters abbreviating the action title. + description : str + A human readable description of the action in **markdown**. + This will be display as tooltip and in dialog window when the action + is triggered. This is a good place to describe how to use the action. + order : int + Order of the action in menus, this is relative to the ``order`` of + other actions declared. + context : list of dict + List of tag-sets specifying which tasks the action is can take as input. + If no tag-sets is specified as input the action is related to the + entire task-group, and won't be triggered with a given task. + + Otherwise, if ``context = [{'k': 'b', 'p': 'l'}, {'k': 't'}]`` will only + be displayed in the context menu for tasks that has + ``task.tags.k == 'b' && task.tags.p = 'l'`` or ``task.tags.k = 't'``. + Esentially, this allows filtering on ``task.tags``. + + If this is a function, it is given the decision parameters and must return + a value of the form described above. + available : function + An optional function that given decision parameters decides if the + action is available. Defaults to a function that always returns ``True``. + schema : dict + JSON schema specifying input accepted by the action. + This is optional and can be left ``null`` if no input is taken. + permission : string + This defaults to ``generic` and needs to be set for actions that need + additional permissions. It appears appears in ci-configuration and + various role and hook + names. + cb_name : string + The name under which this function should be registered, defaulting to + `name`. Unlike `name`, which can appear multiple times, cb_name must be + unique among all registered callbacks. + + Returns + ------- + function + To be used as decorator for the callback function. + """ + mem = {"registered": False} # workaround nonlocal missing in 2.x + + assert isinstance(title, str), "title must be a string" + assert isinstance(description, str), "description must be a string" + title = title.strip() + description = description.strip() + + if not cb_name: + cb_name = name + + # ensure that context is callable + if not callable(context): + context_value = context + + def context(params): + return context_value # noqa + + def register_callback(cb): + assert isinstance(name, str), "name must be a string" + assert isinstance(order, int), "order must be an integer" + assert callable(schema) or is_json( + schema + ), "schema must be a JSON compatible object" + assert isinstance(cb, FunctionType), "callback must be a function" + # Allow for json-e > 25 chars in the symbol. + if "$" not in symbol: + assert 1 <= len(symbol) <= 25, "symbol must be between 1 and 25 characters" + assert isinstance(symbol, str), "symbol must be a string" + + assert not mem[ + "registered" + ], "register_callback_action must be used as decorator" + assert cb_name not in callbacks, "callback name {} is not unique".format( + cb_name + ) + + def action_builder(parameters, graph_config, decision_task_id): + if not available(parameters): + return None + + # gather up the common decision-task-supplied data for this action + repo_param = "{}head_repository".format( + graph_config["project-repo-param-prefix"] + ) + repository = { + "url": parameters[repo_param], + "project": parameters["project"], + "level": parameters["level"], + } + + revision = parameters[ + "{}head_rev".format(graph_config["project-repo-param-prefix"]) + ] + base_revision = parameters[ + "{}base_rev".format(graph_config["project-repo-param-prefix"]) + ] + push = { + "owner": "mozilla-taskcluster-maintenance@mozilla.com", + "pushlog_id": parameters["pushlog_id"], + "revision": revision, + "base_revision": base_revision, + } + + match = re.match( + r"https://(hg.mozilla.org)/(.*?)/?$", parameters[repo_param] + ) + if not match: + raise Exception(f"Unrecognized {repo_param}") + action = { + "name": name, + "title": title, + "description": description, + # target taskGroupId (the task group this decision task is creating) + "taskGroupId": decision_task_id, + "cb_name": cb_name, + "symbol": symbol, + } + + rv = { + "name": name, + "title": title, + "description": description, + "context": context(parameters), + } + if schema: + rv["schema"] = ( + schema(graph_config=graph_config) if callable(schema) else schema + ) + + trustDomain = graph_config["trust-domain"] + level = parameters["level"] + tcyml_hash = hash_taskcluster_yml(graph_config.taskcluster_yml) + + # the tcyml_hash is prefixed with `/` in the hookId, so users will be granted + # hooks:trigger-hook:project-gecko/in-tree-action-3-myaction/*; if another + # action was named `myaction/release`, then the `*` in the scope would also + # match that action. To prevent such an accident, we prohibit `/` in hook + # names. + if "/" in permission: + raise Exception("`/` is not allowed in action names; use `-`") + + rv.update( + { + "kind": "hook", + "hookGroupId": f"project-{trustDomain}", + "hookId": "in-tree-action-{}-{}/{}".format( + level, permission, tcyml_hash + ), + "hookPayload": { + # provide the decision-task parameters as context for triggerHook + "decision": { + "action": action, + "repository": repository, + "push": push, + }, + # and pass everything else through from our own context + "user": { + "input": {"$eval": "input"}, + "taskId": {"$eval": "taskId"}, # target taskId (or null) + "taskGroupId": { + "$eval": "taskGroupId" + }, # target task group + }, + }, + "extra": { + "actionPerm": permission, + }, + } + ) + + return rv + + actions.append(Action(order, cb_name, permission, action_builder)) + + mem["registered"] = True + callbacks[cb_name] = cb + return cb + + return register_callback + + +def render_actions_json(parameters, graph_config, decision_task_id): + """ + Render JSON object for the ``public/actions.json`` artifact. + + Parameters + ---------- + parameters : taskgraph.parameters.Parameters + Decision task parameters. + + Returns + ------- + dict + JSON object representation of the ``public/actions.json`` artifact. + """ + assert isinstance(parameters, Parameters), "requires instance of Parameters" + actions = [] + for action in sorted(_get_actions(graph_config), key=lambda action: action.order): + action = action.action_builder(parameters, graph_config, decision_task_id) + if action: + assert is_json(action), "action must be a JSON compatible object" + actions.append(action) + return { + "version": 1, + "variables": {}, + "actions": actions, + } + + +def sanity_check_task_scope(callback, parameters, graph_config): + """ + If this action is not generic, then verify that this task has the necessary + scope to run the action. This serves as a backstop preventing abuse by + running non-generic actions using generic hooks. While scopes should + prevent serious damage from such abuse, it's never a valid thing to do. + """ + for action in _get_actions(graph_config): + if action.cb_name == callback: + break + else: + raise Exception(f"No action with cb_name {callback}") + + repo_param = "{}head_repository".format(graph_config["project-repo-param-prefix"]) + head_repository = parameters[repo_param] + assert head_repository.startswith("https://hg.mozilla.org/") + expected_scope = "assume:repo:{}:action:{}".format( + head_repository[8:], action.permission + ) + + # the scope should appear literally; no need for a satisfaction check. The use of + # get_current_scopes here calls the auth service through the Taskcluster Proxy, giving + # the precise scopes available to this task. + if expected_scope not in taskcluster.get_current_scopes(): + raise Exception(f"Expected task scope {expected_scope} for this action") + + +def trigger_action_callback( + task_group_id, task_id, input, callback, parameters, root, test=False +): + """ + Trigger action callback with the given inputs. If `test` is true, then run + the action callback in testing mode, without actually creating tasks. + """ + graph_config = load_graph_config(root) + graph_config.register() + callbacks = _get_callbacks(graph_config) + cb = callbacks.get(callback, None) + if not cb: + raise Exception( + "Unknown callback: {}. Known callbacks: {}".format( + callback, ", ".join(callbacks) + ) + ) + + if test: + create.testing = True + taskcluster.testing = True + + if not test: + sanity_check_task_scope(callback, parameters, graph_config) + + cb(Parameters(**parameters), graph_config, input, task_group_id, task_id) + + +def _load(graph_config): + # Load all modules from this folder, relying on the side-effects of register_ + # functions to populate the action registry. + import_sibling_modules(exceptions=("util.py",)) + return callbacks, actions + + +def _get_callbacks(graph_config): + return _load(graph_config)[0] + + +def _get_actions(graph_config): + return _load(graph_config)[1] diff --git a/taskcluster/gecko_taskgraph/actions/release_promotion.py b/taskcluster/gecko_taskgraph/actions/release_promotion.py new file mode 100644 index 0000000000..0d7ef2228f --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/release_promotion.py @@ -0,0 +1,424 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import json +import os + +import requests +from taskgraph.parameters import Parameters +from taskgraph.taskgraph import TaskGraph +from taskgraph.util.taskcluster import get_artifact, list_task_group_incomplete_tasks + +from gecko_taskgraph.actions.registry import register_callback_action +from gecko_taskgraph.decision import taskgraph_decision +from gecko_taskgraph.util.attributes import RELEASE_PROMOTION_PROJECTS, release_level +from gecko_taskgraph.util.partials import populate_release_history +from gecko_taskgraph.util.partners import ( + fix_partner_config, + get_partner_config_by_url, + get_partner_url_config, + get_token, +) +from gecko_taskgraph.util.taskgraph import ( + find_decision_task, + find_existing_tasks_from_previous_kinds, +) + +RELEASE_PROMOTION_SIGNOFFS = ("mar-signing",) + + +def is_release_promotion_available(parameters): + return parameters["project"] in RELEASE_PROMOTION_PROJECTS + + +def get_partner_config(partner_url_config, github_token): + partner_config = {} + for kind, url in partner_url_config.items(): + if url: + partner_config[kind] = get_partner_config_by_url(url, kind, github_token) + return partner_config + + +def get_signoff_properties(): + props = {} + for signoff in RELEASE_PROMOTION_SIGNOFFS: + props[signoff] = { + "type": "string", + } + return props + + +def get_required_signoffs(input, parameters): + input_signoffs = set(input.get("required_signoffs", [])) + params_signoffs = set(parameters["required_signoffs"] or []) + return sorted(list(input_signoffs | params_signoffs)) + + +def get_signoff_urls(input, parameters): + signoff_urls = parameters["signoff_urls"] + signoff_urls.update(input.get("signoff_urls", {})) + return signoff_urls + + +def get_flavors(graph_config, param): + """ + Get all flavors with the given parameter enabled. + """ + promotion_flavors = graph_config["release-promotion"]["flavors"] + return sorted( + flavor + for (flavor, config) in promotion_flavors.items() + if config.get(param, False) + ) + + +@register_callback_action( + name="release-promotion", + title="Release Promotion", + symbol="${input.release_promotion_flavor}", + description="Promote a release.", + permission="release-promotion", + order=500, + context=[], + available=is_release_promotion_available, + schema=lambda graph_config: { + "type": "object", + "properties": { + "build_number": { + "type": "integer", + "default": 1, + "minimum": 1, + "title": "The release build number", + "description": ( + "The release build number. Starts at 1 per " + "release version, and increments on rebuild." + ), + }, + "do_not_optimize": { + "type": "array", + "description": ( + "Optional: a list of labels to avoid optimizing out " + "of the graph (to force a rerun of, say, " + "funsize docker-image tasks)." + ), + "items": { + "type": "string", + }, + }, + "revision": { + "type": "string", + "title": "Optional: revision to promote", + "description": ( + "Optional: the revision to promote. If specified, " + "and `previous_graph_kinds is not specified, find the " + "push graph to promote based on the revision." + ), + }, + "release_promotion_flavor": { + "type": "string", + "description": "The flavor of release promotion to perform.", + "enum": sorted(graph_config["release-promotion"]["flavors"].keys()), + }, + "rebuild_kinds": { + "type": "array", + "description": ( + "Optional: an array of kinds to ignore from the previous " + "graph(s)." + ), + "items": { + "type": "string", + }, + }, + "previous_graph_ids": { + "type": "array", + "description": ( + "Optional: an array of taskIds of decision or action " + "tasks from the previous graph(s) to use to populate " + "our `previous_graph_kinds`." + ), + "items": { + "type": "string", + }, + }, + "version": { + "type": "string", + "description": ( + "Optional: override the version for release promotion. " + "Occasionally we'll land a taskgraph fix in a later " + "commit, but want to act on a build from a previous " + "commit. If a version bump has landed in the meantime, " + "relying on the in-tree version will break things." + ), + "default": "", + }, + "next_version": { + "type": "string", + "description": ( + "Next version. Required in the following flavors: " + "{}".format(get_flavors(graph_config, "version-bump")) + ), + "default": "", + }, + # Example: + # 'partial_updates': { + # '38.0': { + # 'buildNumber': 1, + # 'locales': ['de', 'en-GB', 'ru', 'uk', 'zh-TW'] + # }, + # '37.0': { + # 'buildNumber': 2, + # 'locales': ['de', 'en-GB', 'ru', 'uk'] + # } + # } + "partial_updates": { + "type": "object", + "description": ( + "Partial updates. Required in the following flavors: " + "{}".format(get_flavors(graph_config, "partial-updates")) + ), + "default": {}, + "additionalProperties": { + "type": "object", + "properties": { + "buildNumber": { + "type": "number", + }, + "locales": { + "type": "array", + "items": { + "type": "string", + }, + }, + }, + "required": [ + "buildNumber", + "locales", + ], + "additionalProperties": False, + }, + }, + "release_eta": { + "type": "string", + "default": "", + }, + "release_enable_partner_repack": { + "type": "boolean", + "description": "Toggle for creating partner repacks", + }, + "release_enable_partner_attribution": { + "type": "boolean", + "description": "Toggle for creating partner attribution", + }, + "release_partner_build_number": { + "type": "integer", + "default": 1, + "minimum": 1, + "description": ( + "The partner build number. This translates to, e.g. " + "`v1` in the path. We generally only have to " + "bump this on off-cycle partner rebuilds." + ), + }, + "release_partners": { + "type": "array", + "description": ( + "A list of partners to repack, or if null or empty then use " + "the current full set" + ), + "items": { + "type": "string", + }, + }, + "release_partner_config": { + "type": "object", + "description": "Partner configuration to use for partner repacks.", + "properties": {}, + "additionalProperties": True, + }, + "release_enable_emefree": { + "type": "boolean", + "description": "Toggle for creating EME-free repacks", + }, + "required_signoffs": { + "type": "array", + "description": ("The flavor of release promotion to perform."), + "items": { + "enum": RELEASE_PROMOTION_SIGNOFFS, + }, + }, + "signoff_urls": { + "type": "object", + "default": {}, + "additionalProperties": False, + "properties": get_signoff_properties(), + }, + }, + "required": ["release_promotion_flavor", "build_number"], + }, +) +def release_promotion_action(parameters, graph_config, input, task_group_id, task_id): + release_promotion_flavor = input["release_promotion_flavor"] + promotion_config = graph_config["release-promotion"]["flavors"][ + release_promotion_flavor + ] + release_history = {} + product = promotion_config["product"] + + next_version = str(input.get("next_version") or "") + if promotion_config.get("version-bump", False): + # We force str() the input, hence the 'None' + if next_version in ["", "None"]: + raise Exception( + "`next_version` property needs to be provided for `{}` " + "target.".format(release_promotion_flavor) + ) + + if promotion_config.get("partial-updates", False): + partial_updates = input.get("partial_updates", {}) + if not partial_updates and release_level(parameters["project"]) == "production": + raise Exception( + "`partial_updates` property needs to be provided for `{}`" + "target.".format(release_promotion_flavor) + ) + balrog_prefix = product.title() + os.environ["PARTIAL_UPDATES"] = json.dumps(partial_updates, sort_keys=True) + release_history = populate_release_history( + balrog_prefix, parameters["project"], partial_updates=partial_updates + ) + + target_tasks_method = promotion_config["target-tasks-method"].format( + project=parameters["project"] + ) + rebuild_kinds = input.get( + "rebuild_kinds", promotion_config.get("rebuild-kinds", []) + ) + do_not_optimize = input.get( + "do_not_optimize", promotion_config.get("do-not-optimize", []) + ) + + # Make sure no pending tasks remain from a previous run + own_task_id = os.environ.get("TASK_ID", "") + try: + for t in list_task_group_incomplete_tasks(own_task_id): + if t == own_task_id: + continue + raise Exception( + "task group has unexpected pre-existing incomplete tasks (e.g. {})".format( + t + ) + ) + except requests.exceptions.HTTPError as e: + # 404 means the task group doesn't exist yet, and we're fine + if e.response.status_code != 404: + raise + + # Build previous_graph_ids from ``previous_graph_ids``, ``revision``, + # or the action parameters. + previous_graph_ids = input.get("previous_graph_ids") + if not previous_graph_ids: + revision = input.get("revision") + if revision: + head_rev_param = "{}head_rev".format( + graph_config["project-repo-param-prefix"] + ) + push_parameters = { + head_rev_param: revision, + "project": parameters["project"], + } + else: + push_parameters = parameters + previous_graph_ids = [find_decision_task(push_parameters, graph_config)] + + # Download parameters from the first decision task + parameters = get_artifact(previous_graph_ids[0], "public/parameters.yml") + # Download and combine full task graphs from each of the previous_graph_ids. + # Sometimes previous relpro action tasks will add tasks, like partials, + # that didn't exist in the first full_task_graph, so combining them is + # important. The rightmost graph should take precedence in the case of + # conflicts. + combined_full_task_graph = {} + for graph_id in previous_graph_ids: + full_task_graph = get_artifact(graph_id, "public/full-task-graph.json") + combined_full_task_graph.update(full_task_graph) + _, combined_full_task_graph = TaskGraph.from_json(combined_full_task_graph) + parameters["existing_tasks"] = find_existing_tasks_from_previous_kinds( + combined_full_task_graph, previous_graph_ids, rebuild_kinds + ) + parameters["do_not_optimize"] = do_not_optimize + parameters["target_tasks_method"] = target_tasks_method + parameters["build_number"] = int(input["build_number"]) + parameters["next_version"] = next_version + parameters["release_history"] = release_history + if promotion_config.get("is-rc"): + parameters["release_type"] += "-rc" + parameters["release_eta"] = input.get("release_eta", "") + parameters["release_product"] = product + # When doing staging releases on try, we still want to re-use tasks from + # previous graphs. + parameters["optimize_target_tasks"] = True + + if release_promotion_flavor == "promote_firefox_partner_repack": + release_enable_partner_repack = True + release_enable_partner_attribution = False + release_enable_emefree = False + elif release_promotion_flavor == "promote_firefox_partner_attribution": + release_enable_partner_repack = False + release_enable_partner_attribution = True + release_enable_emefree = False + else: + # for promotion or ship phases, we use the action input to turn the repacks/attribution off + release_enable_partner_repack = input.get("release_enable_partner_repack", True) + release_enable_partner_attribution = input.get( + "release_enable_partner_attribution", True + ) + release_enable_emefree = input.get("release_enable_emefree", True) + + partner_url_config = get_partner_url_config(parameters, graph_config) + if ( + release_enable_partner_repack + and not partner_url_config["release-partner-repack"] + ): + raise Exception("Can't enable partner repacks when no config url found") + if ( + release_enable_partner_attribution + and not partner_url_config["release-partner-attribution"] + ): + raise Exception("Can't enable partner attribution when no config url found") + if release_enable_emefree and not partner_url_config["release-eme-free-repack"]: + raise Exception("Can't enable EMEfree repacks when no config url found") + parameters["release_enable_partner_repack"] = release_enable_partner_repack + parameters[ + "release_enable_partner_attribution" + ] = release_enable_partner_attribution + parameters["release_enable_emefree"] = release_enable_emefree + + partner_config = input.get("release_partner_config") + if not partner_config and any( + [ + release_enable_partner_repack, + release_enable_partner_attribution, + release_enable_emefree, + ] + ): + github_token = get_token(parameters) + partner_config = get_partner_config(partner_url_config, github_token) + if partner_config: + parameters["release_partner_config"] = fix_partner_config(partner_config) + parameters["release_partners"] = input.get("release_partners") + if input.get("release_partner_build_number"): + parameters["release_partner_build_number"] = input[ + "release_partner_build_number" + ] + + if input["version"]: + parameters["version"] = input["version"] + + parameters["required_signoffs"] = get_required_signoffs(input, parameters) + parameters["signoff_urls"] = get_signoff_urls(input, parameters) + + # make parameters read-only + parameters = Parameters(**parameters) + + taskgraph_decision({"root": graph_config.root_dir}, parameters=parameters) diff --git a/taskcluster/gecko_taskgraph/actions/retrigger.py b/taskcluster/gecko_taskgraph/actions/retrigger.py new file mode 100644 index 0000000000..fc8c05c83f --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/retrigger.py @@ -0,0 +1,301 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging +import sys +import textwrap + +from taskgraph.util.taskcluster import get_task_definition, rerun_task + +from gecko_taskgraph.util.taskcluster import state_task + +from .registry import register_callback_action +from .util import ( + combine_task_graph_files, + create_task_from_def, + create_tasks, + fetch_graph_and_labels, + get_tasks_with_downstream, + relativize_datestamps, +) + +logger = logging.getLogger(__name__) + +RERUN_STATES = ("exception", "failed") + + +def _should_retrigger(task_graph, label): + """ + Return whether a given task in the taskgraph should be retriggered. + + This handles the case where the task isn't there by assuming it should not be. + """ + if label not in task_graph: + logger.info( + "Task {} not in full taskgraph, assuming task should not be retriggered.".format( + label + ) + ) + return False + return task_graph[label].attributes.get("retrigger", False) + + +@register_callback_action( + title="Retrigger", + name="retrigger", + symbol="rt", + cb_name="retrigger-decision", + description=textwrap.dedent( + """\ + Create a clone of the task (retriggering decision, action, and cron tasks requires + special scopes).""" + ), + order=11, + context=[ + {"kind": "decision-task"}, + {"kind": "action-callback"}, + {"kind": "cron-task"}, + {"action": "backfill-task"}, + ], +) +def retrigger_decision_action(parameters, graph_config, input, task_group_id, task_id): + """For a single task, we try to just run exactly the same task once more. + It's quite possible that we don't have the scopes to do so (especially for + an action), but this is best-effort.""" + + # make all of the timestamps relative; they will then be turned back into + # absolute timestamps relative to the current time. + task = get_task_definition(task_id) + task = relativize_datestamps(task) + create_task_from_def( + task, parameters["level"], action_tag="retrigger-decision-task" + ) + + +@register_callback_action( + title="Retrigger", + name="retrigger", + symbol="rt", + description=("Create a clone of the task."), + order=19, # must be greater than other orders in this file, as this is the fallback version + context=[{"retrigger": "true"}], + schema={ + "type": "object", + "properties": { + "downstream": { + "type": "boolean", + "description": ( + "If true, downstream tasks from this one will be cloned as well. " + "The dependencies will be updated to work with the new task at the root." + ), + "default": False, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + }, + }, +) +@register_callback_action( + title="Retrigger (disabled)", + name="retrigger", + cb_name="retrigger-disabled", + symbol="rt", + description=( + "Create a clone of the task.\n\n" + "This type of task should typically be re-run instead of re-triggered." + ), + order=20, # must be greater than other orders in this file, as this is the fallback version + context=[{}], + schema={ + "type": "object", + "properties": { + "downstream": { + "type": "boolean", + "description": ( + "If true, downstream tasks from this one will be cloned as well. " + "The dependencies will be updated to work with the new task at the root." + ), + "default": False, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + "force": { + "type": "boolean", + "default": False, + "description": ( + "This task should not be re-triggered. " + "This can be overridden by passing `true` here." + ), + }, + }, + }, +) +def retrigger_action(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + task = get_task_definition(task_id) + label = task["metadata"]["name"] + + with_downstream = " " + to_run = [label] + + if not input.get("force", None) and not _should_retrigger(full_task_graph, label): + logger.info( + "Not retriggering task {}, task should not be retrigged " + "and force not specified.".format(label) + ) + sys.exit(1) + + if input.get("downstream"): + to_run = get_tasks_with_downstream(to_run, full_task_graph, label_to_taskid) + with_downstream = " (with downstream) " + + times = input.get("times", 1) + for i in range(times): + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + i, + action_tag="retrigger-task", + ) + + logger.info(f"Scheduled {label}{with_downstream}(time {i + 1}/{times})") + combine_task_graph_files(list(range(times))) + + +@register_callback_action( + title="Rerun", + name="rerun", + symbol="rr", + description=( + "Rerun a task.\n\n" + "This only works on failed or exception tasks in the original taskgraph," + " and is CoT friendly." + ), + order=300, + context=[{}], + schema={"type": "object", "properties": {}}, +) +def rerun_action(parameters, graph_config, input, task_group_id, task_id): + task = get_task_definition(task_id) + parameters = dict(parameters) + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + label = task["metadata"]["name"] + if task_id not in label_to_taskid.values(): + logger.error( + "Refusing to rerun {}: taskId {} not in decision task {} label_to_taskid!".format( + label, task_id, decision_task_id + ) + ) + + _rerun_task(task_id, label) + + +def _rerun_task(task_id, label): + state = state_task(task_id) + if state not in RERUN_STATES: + logger.warning( + "No need to rerun {}: state '{}' not in {}!".format( + label, state, RERUN_STATES + ) + ) + return + rerun_task(task_id) + logger.info(f"Reran {label}") + + +@register_callback_action( + title="Retrigger", + name="retrigger-multiple", + symbol="rt", + description=("Create a clone of the task."), + context=[], + schema={ + "type": "object", + "properties": { + "requests": { + "type": "array", + "items": { + "tasks": { + "type": "array", + "description": "An array of task labels", + "items": {"type": "string"}, + }, + "times": { + "type": "integer", + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + "additionalProperties": False, + }, + }, + "additionalProperties": False, + }, + }, +) +def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + suffixes = [] + for i, request in enumerate(input.get("requests", [])): + times = request.get("times", 1) + rerun_tasks = [ + label + for label in request.get("tasks") + if not _should_retrigger(full_task_graph, label) + ] + retrigger_tasks = [ + label + for label in request.get("tasks") + if _should_retrigger(full_task_graph, label) + ] + + for label in rerun_tasks: + # XXX we should not re-run tasks pulled in from other pushes + # In practice, this shouldn't matter, as only completed tasks + # are pulled in from other pushes and treeherder won't pass + # those labels. + _rerun_task(label_to_taskid[label], label) + + for j in range(times): + suffix = f"{i}-{j}" + suffixes.append(suffix) + create_tasks( + graph_config, + retrigger_tasks, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + suffix, + action_tag="retrigger-multiple-task", + ) + + if suffixes: + combine_task_graph_files(suffixes) diff --git a/taskcluster/gecko_taskgraph/actions/retrigger_custom.py b/taskcluster/gecko_taskgraph/actions/retrigger_custom.py new file mode 100644 index 0000000000..f7cf44a008 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/retrigger_custom.py @@ -0,0 +1,185 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import json +import logging + +from taskgraph.util.parameterization import resolve_task_references +from taskgraph.util.taskcluster import get_task_definition + +from .registry import register_callback_action +from .util import create_task_from_def, fetch_graph_and_labels + +logger = logging.getLogger(__name__) + +# Properties available for custom retrigger of any supported test suites +basic_properties = { + "path": { + "type": "string", + "maxLength": 255, + "default": "", + "title": "Path name", + "description": "Path of test(s) to retrigger", + }, + "logLevel": { + "type": "string", + "enum": ["debug", "info", "warning", "error", "critical"], + "default": "info", + "title": "Log level", + "description": "Log level for output (INFO is normal, DEBUG gives more detail)", + }, + "environment": { + "type": "object", + "default": {"MOZ_LOG": ""}, + "title": "Extra environment variables", + "description": "Extra environment variables to use for this run", + "additionalProperties": {"type": "string"}, + }, +} + +# Additional properties available for custom retrigger of some additional test suites +extended_properties = basic_properties.copy() +extended_properties.update( + { + "runUntilFail": { + "type": "boolean", + "default": False, + "title": "Run until failure", + "description": ( + "Runs the specified set of tests repeatedly " + "until failure (up to REPEAT times)" + ), + }, + "repeat": { + "type": "integer", + "default": 0, + "minimum": 0, + "title": "Repeat test(s) N times", + "description": ( + "Run test(s) repeatedly (usually used in " + "conjunction with runUntilFail)" + ), + }, + "preferences": { + "type": "object", + "default": {"marionette.log.level": "Info"}, + "title": "Extra gecko (about:config) preferences", + "description": "Extra gecko (about:config) preferences to use for this run", + "additionalProperties": {"type": "string"}, + }, + } +) + + +@register_callback_action( + name="retrigger-custom", + title="Retrigger task with custom parameters", + symbol="rt", + description="Retriggers the specified task with custom environment and parameters", + context=[ + {"test-type": "mochitest", "worker-implementation": "docker-worker"}, + {"test-type": "reftest", "worker-implementation": "docker-worker"}, + {"test-type": "geckoview-junit", "worker-implementation": "docker-worker"}, + ], + order=10, + schema={ + "type": "object", + "properties": extended_properties, + "additionalProperties": False, + "required": ["path"], + }, +) +def extended_custom_retrigger_action( + parameters, graph_config, input, task_group_id, task_id +): + handle_custom_retrigger(parameters, graph_config, input, task_group_id, task_id) + + +@register_callback_action( + name="retrigger-custom (gtest)", + title="Retrigger gtest task with custom parameters", + symbol="rt", + description="Retriggers the specified task with custom environment and parameters", + context=[{"test-type": "gtest", "worker-implementation": "docker-worker"}], + order=10, + schema={ + "type": "object", + "properties": basic_properties, + "additionalProperties": False, + "required": ["path"], + }, +) +def basic_custom_retrigger_action_basic( + parameters, graph_config, input, task_group_id, task_id +): + handle_custom_retrigger(parameters, graph_config, input, task_group_id, task_id) + + +def handle_custom_retrigger(parameters, graph_config, input, task_group_id, task_id): + task = get_task_definition(task_id) + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + pre_task = full_task_graph.tasks[task["metadata"]["name"]] + + # fix up the task's dependencies, similar to how optimization would + # have done in the decision + dependencies = { + name: label_to_taskid[label] for name, label in pre_task.dependencies.items() + } + new_task_definition = resolve_task_references( + pre_task.label, pre_task.task, task_id, decision_task_id, dependencies + ) + new_task_definition.setdefault("dependencies", []).extend(dependencies.values()) + + # don't want to run mozharness tests, want a custom mach command instead + new_task_definition["payload"]["command"] += ["--no-run-tests"] + + custom_mach_command = [task["tags"]["test-type"]] + + # mochitests may specify a flavor + if new_task_definition["payload"]["env"].get("MOCHITEST_FLAVOR"): + custom_mach_command += [ + "--keep-open=false", + "-f", + new_task_definition["payload"]["env"]["MOCHITEST_FLAVOR"], + ] + + enable_e10s = json.loads( + new_task_definition["payload"]["env"].get("ENABLE_E10S", "true") + ) + if not enable_e10s: + custom_mach_command += ["--disable-e10s"] + + custom_mach_command += [ + "--log-tbpl=-", + "--log-tbpl-level={}".format(input.get("logLevel", "debug")), + ] + if input.get("runUntilFail"): + custom_mach_command += ["--run-until-failure"] + if input.get("repeat"): + custom_mach_command += ["--repeat", str(input.get("repeat", 30))] + + # add any custom gecko preferences + for (key, val) in input.get("preferences", {}).items(): + custom_mach_command += ["--setpref", f"{key}={val}"] + + custom_mach_command += [input["path"]] + new_task_definition["payload"]["env"]["CUSTOM_MACH_COMMAND"] = " ".join( + custom_mach_command + ) + + # update environment + new_task_definition["payload"]["env"].update(input.get("environment", {})) + + # tweak the treeherder symbol + new_task_definition["extra"]["treeherder"]["symbol"] += "-custom" + + logging.info("New task definition: %s", new_task_definition) + + create_task_from_def( + new_task_definition, parameters["level"], action_tag="retrigger-custom-task" + ) diff --git a/taskcluster/gecko_taskgraph/actions/run_missing_tests.py b/taskcluster/gecko_taskgraph/actions/run_missing_tests.py new file mode 100644 index 0000000000..92310445f3 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/run_missing_tests.py @@ -0,0 +1,62 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +from taskgraph.util.taskcluster import get_artifact + +from .registry import register_callback_action +from .util import create_tasks, fetch_graph_and_labels + +logger = logging.getLogger(__name__) + + +@register_callback_action( + name="run-missing-tests", + title="Run Missing Tests", + symbol="rmt", + description=( + "Run tests in the selected push that were optimized away, usually by SETA." + "\n" + "This action is for use on pushes that will be merged into another branch," + "to check that optimization hasn't hidden any failures." + ), + order=250, + context=[], # Applies to decision task +) +def run_missing_tests(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + target_tasks = get_artifact(decision_task_id, "public/target-tasks.json") + + # The idea here is to schedule all tasks of the `test` kind that were + # targetted but did not appear in the final task-graph -- those were the + # optimized tasks. + to_run = [] + already_run = 0 + for label in target_tasks: + task = full_task_graph.tasks[label] + if task.kind != "test": + continue # not a test + if label in label_to_taskid: + already_run += 1 + continue + to_run.append(label) + + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + ) + + logger.info( + "Out of {} test tasks, {} already existed and the action created {}".format( + already_run + len(to_run), already_run, len(to_run) + ) + ) diff --git a/taskcluster/gecko_taskgraph/actions/scriptworker_canary.py b/taskcluster/gecko_taskgraph/actions/scriptworker_canary.py new file mode 100644 index 0000000000..e0057da9a6 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/scriptworker_canary.py @@ -0,0 +1,45 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from taskgraph.parameters import Parameters + +from gecko_taskgraph.actions.registry import register_callback_action +from gecko_taskgraph.decision import taskgraph_decision + + +@register_callback_action( + title="Push scriptworker canaries.", + name="scriptworker-canary", + symbol="scriptworker-canary", + description="Trigger scriptworker-canary pushes for the given scriptworkers.", + schema={ + "type": "object", + "properties": { + "scriptworkers": { + "type": "array", + "description": "List of scriptworker types to run canaries for.", + "items": {"type": "string"}, + }, + }, + }, + order=1000, + permission="scriptworker-canary", + context=[], +) +def scriptworker_canary(parameters, graph_config, input, task_group_id, task_id): + scriptworkers = input["scriptworkers"] + + # make parameters read-write + parameters = dict(parameters) + + parameters["target_tasks_method"] = "scriptworker_canary" + parameters["try_task_config"] = { + "scriptworker-canary-workers": scriptworkers, + } + parameters["tasks_for"] = "action" + + # make parameters read-only + parameters = Parameters(**parameters) + + taskgraph_decision({"root": graph_config.root_dir}, parameters=parameters) diff --git a/taskcluster/gecko_taskgraph/actions/side_by_side.py b/taskcluster/gecko_taskgraph/actions/side_by_side.py new file mode 100644 index 0000000000..3523ef86f8 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/side_by_side.py @@ -0,0 +1,174 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging +import os +import sys +from functools import partial + +from taskgraph.util.taskcluster import get_artifact, get_task_definition + +from ..util.taskcluster import list_task_group_complete_tasks +from .registry import register_callback_action +from .util import create_tasks, fetch_graph_and_labels, get_decision_task_id, get_pushes + +logger = logging.getLogger(__name__) + + +def input_for_support_action(revision, base_revision, base_branch, task): + """Generate input for action to be scheduled. + + Define what label to schedule with 'label'. + If it is a test task that uses explicit manifests add that information. + """ + platform, test_name = task["metadata"]["name"].split("/opt-") + new_branch = os.environ.get("GECKO_HEAD_REPOSITORY", "/try").split("/")[-1] + input = { + "label": "perftest-linux-side-by-side", + "new_revision": revision, + "base_revision": base_revision, + "test_name": test_name, + "platform": platform, + "base_branch": base_branch, + "new_branch": new_branch, + } + + return input + + +@register_callback_action( + title="Side by side", + name="side-by-side", + symbol="gen-sxs", + description=( + "Given a performance test pageload job generate a side-by-side comparison against" + "the pageload job from the revision at the input." + ), + order=200, + context=[{"test-type": "raptor"}], + schema={ + "type": "object", + "properties": { + "revision": { + "type": "string", + "default": "", + "description": "Revision of the push against the comparison is wanted.", + }, + "project": { + "type": "string", + "default": "autoland", + "description": "Revision of the push against the comparison is wanted.", + }, + }, + "additionalProperties": False, + }, +) +def side_by_side_action(parameters, graph_config, input, task_group_id, task_id): + """ + This action does a side-by-side comparison between current revision and + the revision entered manually or the latest revision that ran the + pageload job (via support action). + + To execute this action locally follow the documentation here: + https://firefox-source-docs.mozilla.org/taskcluster/actions.html#testing-the-action-locally + """ + task = get_task_definition(task_id) + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + # TODO: find another way to detect side-by-side comparable jobs + # (potentially lookig at the visual metrics flag) + if "browsertime-tp6" not in task["metadata"]["name"]: + logger.exception( + f"Task {task['metadata']['name']} is not side-by-side comparable." + ) + sys.exit(1) + + failed = False + input_for_action = {} + + if input.get("revision"): + # If base_revision was introduced manually, use that + input_for_action = input_for_support_action( + revision=parameters["head_rev"], + base_revision=input.get("revision"), + base_branch=input.get("project"), + task=task, + ) + else: + current_push_id = int(parameters["pushlog_id"]) - 1 + # Go decrementally through pushlog_id, get push data, decision task id, + # full task graph and everything needed to find which of the past revisions + # ran the pageload job to compare against + while int(parameters["pushlog_id"]) - current_push_id < 30: + pushes = get_pushes( + project=parameters["head_repository"], + end_id=current_push_id, + depth=1, + full_response=True, + ) + try: + # Get label-to-taskid.json artifact + the tasks triggered + # by the action tasks at a later time than the decision task + current_decision_task_id = get_decision_task_id( + parameters["project"], current_push_id + ) + current_task_group_id = get_task_definition(current_decision_task_id)[ + "taskGroupId" + ] + current_label_to_taskid = get_artifact( + current_decision_task_id, "public/label-to-taskid.json" + ) + current_full_label_to_taskid = current_label_to_taskid.copy() + action_task_triggered = list_task_group_complete_tasks( + current_task_group_id + ) + current_full_label_to_taskid.update(action_task_triggered) + if task["metadata"]["name"] in current_full_label_to_taskid.keys(): + input_for_action = input_for_support_action( + revision=parameters["head_rev"], + base_revision=pushes[str(current_push_id)]["changesets"][-1], + base_branch=input.get("project", parameters["project"]), + task=task, + ) + break + except Exception: + logger.warning( + f"Could not find decision task for push {current_push_id}" + ) + # The decision task may have failed, this is common enough that we + # don't want to report an error for it. + continue + current_push_id -= 1 + if not input_for_action: + raise Exception( + "Could not find a side-by-side comparable task within a depth of 30 revisions." + ) + + def modifier(task, input): + if task.label != input["label"]: + return task + + cmd = task.task["payload"]["command"] + task.task["payload"]["command"][1][-1] = cmd[1][-1].format(**input) + + return task + + try: + create_tasks( + graph_config, + [input_for_action["label"]], + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + modifier=partial(modifier, input=input_for_action), + ) + except Exception as e: + logger.exception(f"Failed to trigger action: {e}.") + failed = True + + if failed: + sys.exit(1) diff --git a/taskcluster/gecko_taskgraph/actions/util.py b/taskcluster/gecko_taskgraph/actions/util.py new file mode 100644 index 0000000000..c211b32567 --- /dev/null +++ b/taskcluster/gecko_taskgraph/actions/util.py @@ -0,0 +1,433 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import concurrent.futures as futures +import copy +import logging +import os +import re +from functools import reduce + +import jsone +import requests +from requests.exceptions import HTTPError +from slugid import nice as slugid +from taskgraph import create +from taskgraph.optimize.base import optimize_task_graph +from taskgraph.taskgraph import TaskGraph +from taskgraph.util.taskcluster import ( + CONCURRENCY, + find_task_id, + get_artifact, + get_session, + get_task_definition, + list_tasks, + parse_time, +) + +from gecko_taskgraph.decision import read_artifact, rename_artifact, write_artifact +from gecko_taskgraph.util.taskcluster import trigger_hook +from gecko_taskgraph.util.taskgraph import find_decision_task + +logger = logging.getLogger(__name__) + +INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision" +PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}" + + +def _tags_within_context(tags, context=[]): + """A context of [] means that it *only* applies to a task group""" + return any( + all(tag in tags and tags[tag] == tag_set[tag] for tag in tag_set.keys()) + for tag_set in context + ) + + +def _extract_applicable_action(actions_json, action_name, task_group_id, task_id): + """Extract action that applies to the given task or task group. + + A task (as defined by its tags) is said to match a tag-set if its + tags are a super-set of the tag-set. A tag-set is a set of key-value pairs. + + An action (as defined by its context) is said to be relevant for + a given task, if the task's tags match one of the tag-sets given + in the context property of the action. + + The order of the actions is significant. When multiple actions apply to a + task the first one takes precedence. + + For more details visit: + https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec + """ + if task_id: + tags = get_task_definition(task_id).get("tags") + + for _action in actions_json["actions"]: + if action_name != _action["name"]: + continue + + context = _action.get("context", []) + # Ensure the task is within the context of the action + if task_id and tags and _tags_within_context(tags, context): + return _action + elif context == []: + return _action + + available_actions = ", ".join(sorted({a["name"] for a in actions_json["actions"]})) + raise LookupError( + "{} action is not available for this task. Available: {}".format( + action_name, available_actions + ) + ) + + +def trigger_action(action_name, decision_task_id, task_id=None, input={}): + if not decision_task_id: + raise ValueError("No decision task. We can't find the actions artifact.") + actions_json = get_artifact(decision_task_id, "public/actions.json") + if actions_json["version"] != 1: + raise RuntimeError("Wrong version of actions.json, unable to continue") + + # These values substitute $eval in the template + context = { + "input": input, + "taskId": task_id, + "taskGroupId": decision_task_id, + } + # https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec#variables + context.update(actions_json["variables"]) + action = _extract_applicable_action( + actions_json, action_name, decision_task_id, task_id + ) + kind = action["kind"] + if create.testing: + logger.info(f"Skipped triggering action for {kind} as testing is enabled") + elif kind == "hook": + hook_payload = jsone.render(action["hookPayload"], context) + trigger_hook(action["hookGroupId"], action["hookId"], hook_payload) + else: + raise NotImplementedError(f"Unable to submit actions with {kind} kind.") + + +def get_pushes_from_params_input(parameters, input): + inclusive_tweak = 1 if input.get("inclusive") else 0 + return get_pushes( + project=parameters["head_repository"], + end_id=int(parameters["pushlog_id"]) - (1 - inclusive_tweak), + depth=input.get("depth", 9) + inclusive_tweak, + ) + + +def get_pushes(project, end_id, depth, full_response=False): + pushes = [] + while True: + start_id = max(end_id - depth, 0) + pushlog_url = PUSHLOG_TMPL.format(project, start_id, end_id) + logger.debug(pushlog_url) + r = requests.get(pushlog_url) + r.raise_for_status() + pushes = pushes + list(r.json()["pushes"].keys()) + if len(pushes) >= depth: + break + + end_id = start_id - 1 + start_id -= depth + if start_id < 0: + break + + pushes = sorted(pushes)[-depth:] + push_dict = {push: r.json()["pushes"][push] for push in pushes} + return push_dict if full_response else pushes + + +def get_decision_task_id(project, push_id): + return find_task_id(INDEX_TMPL.format(project, push_id)) + + +def get_parameters(decision_task_id): + return get_artifact(decision_task_id, "public/parameters.yml") + + +def get_tasks_with_downstream(labels, full_task_graph, label_to_taskid): + # Used to gather tasks when downstream tasks need to run as well + return full_task_graph.graph.transitive_closure( + set(labels), reverse=True + ).nodes & set(label_to_taskid.keys()) + + +def fetch_graph_and_labels(parameters, graph_config): + decision_task_id = find_decision_task(parameters, graph_config) + + # First grab the graph and labels generated during the initial decision task + full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json") + logger.info("Load taskgraph from JSON.") + _, full_task_graph = TaskGraph.from_json(full_task_graph) + label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json") + + logger.info("Fetching additional tasks from action and cron tasks.") + # fetch everything in parallel; this avoids serializing any delay in downloading + # each artifact (such as waiting for the artifact to be mirrored locally) + with futures.ThreadPoolExecutor(CONCURRENCY) as e: + fetches = [] + + # fetch any modifications made by action tasks and swap out new tasks + # for old ones + def fetch_action(task_id): + logger.info(f"fetching label-to-taskid.json for action task {task_id}") + try: + run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + label_to_taskid.update(run_label_to_id) + except HTTPError as e: + if e.response.status_code != 404: + raise + logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") + + head_rev_param = "{}head_rev".format(graph_config["project-repo-param-prefix"]) + + namespace = "{}.v2.{}.revision.{}.taskgraph.actions".format( + graph_config["trust-domain"], + parameters["project"], + parameters[head_rev_param], + ) + for task_id in list_tasks(namespace): + fetches.append(e.submit(fetch_action, task_id)) + + # Similarly for cron tasks.. + def fetch_cron(task_id): + logger.info(f"fetching label-to-taskid.json for cron task {task_id}") + try: + run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + label_to_taskid.update(run_label_to_id) + except HTTPError as e: + if e.response.status_code != 404: + raise + logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") + + namespace = "{}.v2.{}.revision.{}.cron".format( + graph_config["trust-domain"], + parameters["project"], + parameters[head_rev_param], + ) + for task_id in list_tasks(namespace): + fetches.append(e.submit(fetch_cron, task_id)) + + # now wait for each fetch to complete, raising an exception if there + # were any issues + for f in futures.as_completed(fetches): + f.result() + + return (decision_task_id, full_task_graph, label_to_taskid) + + +def create_task_from_def(task_def, level, action_tag=None): + """Create a new task from a definition rather than from a label + that is already in the full-task-graph. The task definition will + have {relative-datestamp': '..'} rendered just like in a decision task. + Use this for entirely new tasks or ones that change internals of the task. + It is useful if you want to "edit" the full_task_graph and then hand + it to this function. No dependencies will be scheduled. You must handle + this yourself. Seeing how create_tasks handles it might prove helpful.""" + task_def["schedulerId"] = f"gecko-level-{level}" + label = task_def["metadata"]["name"] + task_id = slugid() + session = get_session() + if action_tag: + task_def.setdefault("tags", {}).setdefault("action", action_tag) + create.create_task(session, task_id, label, task_def) + + +def update_parent(task, graph): + task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "") + return task + + +def update_action_tag(task, graph, action_tag): + task.task.setdefault("tags", {}).setdefault("action", action_tag) + return task + + +def update_dependencies(task, graph): + if os.environ.get("TASK_ID"): + task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"]) + return task + + +def create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + params, + decision_task_id, + suffix="", + modifier=lambda t: t, + action_tag=None, +): + """Create new tasks. The task definition will have {relative-datestamp': + '..'} rendered just like in a decision task. Action callbacks should use + this function to create new tasks, + allowing easy debugging with `mach taskgraph action-callback --test`. + This builds up all required tasks to run in order to run the tasks requested. + + Optionally this function takes a `modifier` function that is passed in each + task before it is put into a new graph. It should return a valid task. Note + that this is passed _all_ tasks in the graph, not just the set in to_run. You + may want to skip modifying tasks not in your to_run list. + + If `suffix` is given, then it is used to give unique names to the resulting + artifacts. If you call this function multiple times in the same action, + pass a different suffix each time to avoid overwriting artifacts. + + If you wish to create the tasks in a new group, leave out decision_task_id. + + Returns an updated label_to_taskid containing the new tasks""" + import gecko_taskgraph.optimize # noqa: triggers registration of strategies + + if suffix != "": + suffix = f"-{suffix}" + to_run = set(to_run) + + # Copy to avoid side-effects later + full_task_graph = copy.deepcopy(full_task_graph) + label_to_taskid = label_to_taskid.copy() + + target_graph = full_task_graph.graph.transitive_closure(to_run) + target_task_graph = TaskGraph( + {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph + ) + target_task_graph.for_each_task(update_parent) + if action_tag: + target_task_graph.for_each_task(update_action_tag, action_tag) + if decision_task_id and decision_task_id != os.environ.get("TASK_ID"): + target_task_graph.for_each_task(update_dependencies) + optimized_task_graph, label_to_taskid = optimize_task_graph( + target_task_graph, + to_run, + params, + to_run, + decision_task_id, + existing_tasks=label_to_taskid, + ) + write_artifact(f"task-graph{suffix}.json", optimized_task_graph.to_json()) + write_artifact(f"label-to-taskid{suffix}.json", label_to_taskid) + write_artifact(f"to-run{suffix}.json", list(to_run)) + create.create_tasks( + graph_config, + optimized_task_graph, + label_to_taskid, + params, + decision_task_id, + ) + return label_to_taskid + + +def _update_reducer(accumulator, new_value): + "similar to set or dict `update` method, but returning the modified object" + accumulator.update(new_value) + return accumulator + + +def combine_task_graph_files(suffixes): + """Combine task-graph-{suffix}.json files into a single task-graph.json file. + + Since Chain of Trust verification requires a task-graph.json file that + contains all children tasks, we can combine the various task-graph-0.json + type files into a master task-graph.json file at the end. + + Actions also look for various artifacts, so we combine those in a similar + fashion. + + In the case where there is only one suffix, we simply rename it to avoid the + additional cost of uploading two copies of the same data. + """ + + if len(suffixes) == 1: + for filename in ["task-graph", "label-to-taskid", "to-run"]: + rename_artifact(f"{filename}-{suffixes[0]}.json", f"{filename}.json") + return + + def combine(file_contents, base): + return reduce(_update_reducer, file_contents, base) + + files = [read_artifact(f"task-graph-{suffix}.json") for suffix in suffixes] + write_artifact("task-graph.json", combine(files, dict())) + + files = [read_artifact(f"label-to-taskid-{suffix}.json") for suffix in suffixes] + write_artifact("label-to-taskid.json", combine(files, dict())) + + files = [read_artifact(f"to-run-{suffix}.json") for suffix in suffixes] + write_artifact("to-run.json", list(combine(files, set()))) + + +def relativize_datestamps(task_def): + """ + Given a task definition as received from the queue, convert all datestamps + to {relative_datestamp: ..} format, with the task creation time as "now". + The result is useful for handing to ``create_task``. + """ + base = parse_time(task_def["created"]) + # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js + ts_pattern = re.compile( + r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]" + r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?" + r"(?:z|[+-]\d\d:\d\d)$", + re.I, + ) + + def recurse(value): + if isinstance(value, str): + if ts_pattern.match(value): + value = parse_time(value) + diff = value - base + return {"relative-datestamp": f"{int(diff.total_seconds())} seconds"} + if isinstance(value, list): + return [recurse(e) for e in value] + if isinstance(value, dict): + return {k: recurse(v) for k, v in value.items()} + return value + + return recurse(task_def) + + +def add_args_to_command(cmd_parts, extra_args=[]): + """ + Add custom command line args to a given command. + args: + cmd_parts: the raw command as seen by taskcluster + extra_args: array of args we want to add + """ + # Prevent modification of the caller's copy of cmd_parts + cmd_parts = copy.deepcopy(cmd_parts) + cmd_type = "default" + if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict): + # windows has single cmd part as dict: 'task-reference', with long string + cmd_parts = cmd_parts[0]["task-reference"].split(" ") + cmd_type = "dict" + elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], str): + # windows has single cmd part as a long string + cmd_parts = cmd_parts[0].split(" ") + cmd_type = "unicode" + elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list): + # osx has an single value array with an array inside + cmd_parts = cmd_parts[0] + cmd_type = "subarray" + elif len(cmd_parts) == 2 and isinstance(cmd_parts[1], list): + # osx has an double value array with an array inside each element. + # The first element is a pre-requisite command while the second + # is the actual test command. + cmd_type = "subarray2" + + if cmd_type == "subarray2": + cmd_parts[1].extend(extra_args) + else: + cmd_parts.extend(extra_args) + + if cmd_type == "dict": + cmd_parts = [{"task-reference": " ".join(cmd_parts)}] + elif cmd_type == "unicode": + cmd_parts = [" ".join(cmd_parts)] + elif cmd_type == "subarray": + cmd_parts = [cmd_parts] + return cmd_parts |