summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/actions
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/actions')
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py16
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py64
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py42
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py61
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py352
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py301
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py282
7 files changed, 1118 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py
new file mode 100644
index 0000000000..590a957282
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py
@@ -0,0 +1,16 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+from .registry import (
+ register_callback_action,
+ render_actions_json,
+ trigger_action_callback,
+)
+
+__all__ = [
+ "register_callback_action",
+ "render_actions_json",
+ "trigger_action_callback",
+]
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py
new file mode 100644
index 0000000000..fc10668566
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py
@@ -0,0 +1,64 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+from taskgraph.actions.registry import register_callback_action
+from taskgraph.actions.util import (
+ combine_task_graph_files,
+ create_tasks,
+ fetch_graph_and_labels,
+)
+
+
+@register_callback_action(
+ name="add-new-jobs",
+ title="Add new jobs",
+ generic=True,
+ symbol="add-new",
+ description="Add new jobs using task labels.",
+ order=100,
+ context=[],
+ schema={
+ "type": "object",
+ "properties": {
+ "tasks": {
+ "type": "array",
+ "description": "An array of task labels",
+ "items": {"type": "string"},
+ },
+ "times": {
+ "type": "integer",
+ "default": 1,
+ "minimum": 1,
+ "maximum": 100,
+ "title": "Times",
+ "description": "How many times to run each task.",
+ },
+ },
+ },
+)
+def add_new_jobs_action(parameters, graph_config, input, task_group_id, task_id):
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
+ parameters, graph_config
+ )
+
+ to_run = []
+ for elem in input["tasks"]:
+ if elem in full_task_graph.tasks:
+ to_run.append(elem)
+ else:
+ raise Exception(f"{elem} was not found in the task-graph")
+
+ times = input.get("times", 1)
+ for i in range(times):
+ create_tasks(
+ graph_config,
+ to_run,
+ full_task_graph,
+ label_to_taskid,
+ parameters,
+ decision_task_id,
+ i,
+ )
+ combine_task_graph_files(list(range(times)))
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py
new file mode 100644
index 0000000000..03788c6538
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py
@@ -0,0 +1,42 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import logging
+
+import requests
+
+from taskgraph.util.taskcluster import cancel_task
+
+from .registry import register_callback_action
+
+logger = logging.getLogger(__name__)
+
+
+@register_callback_action(
+ title="Cancel Task",
+ name="cancel",
+ symbol="cx",
+ generic=True,
+ description=("Cancel the given task"),
+ order=350,
+ context=[{}],
+)
+def cancel_action(parameters, graph_config, input, task_group_id, task_id):
+ # Note that this is limited by the scopes afforded to generic actions to
+ # only cancel tasks with the level-specific schedulerId.
+ try:
+ cancel_task(task_id, use_proxy=True)
+ except requests.HTTPError as e:
+ if e.response.status_code == 409:
+ # A 409 response indicates that this task is past its deadline. It
+ # cannot be cancelled at this time, but it's also not running
+ # anymore, so we can ignore this error.
+ logger.info(
+ 'Task "{}" is past its deadline and cannot be cancelled.'.format(
+ task_id
+ )
+ )
+ return
+ raise
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py
new file mode 100644
index 0000000000..b2636f46a3
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py
@@ -0,0 +1,61 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import concurrent.futures as futures
+import logging
+import os
+
+import requests
+
+from taskgraph.util.taskcluster import (
+ CONCURRENCY,
+ cancel_task,
+ list_task_group_incomplete_tasks,
+)
+
+from .registry import register_callback_action
+
+logger = logging.getLogger(__name__)
+
+
+@register_callback_action(
+ title="Cancel All",
+ name="cancel-all",
+ generic=True,
+ symbol="cAll",
+ description=(
+ "Cancel all running and pending tasks created by the decision task "
+ "this action task is associated with."
+ ),
+ order=400,
+ context=[],
+)
+def cancel_all_action(parameters, graph_config, input, task_group_id, task_id):
+ def do_cancel_task(task_id):
+ logger.info(f"Cancelling task {task_id}")
+ try:
+ cancel_task(task_id, use_proxy=True)
+ except requests.HTTPError as e:
+ if e.response.status_code == 409:
+ # A 409 response indicates that this task is past its deadline. It
+ # cannot be cancelled at this time, but it's also not running
+ # anymore, so we can ignore this error.
+ logger.info(
+ "Task {} is past its deadline and cannot be cancelled.".format(
+ task_id
+ )
+ )
+ return
+ raise
+
+ own_task_id = os.environ.get("TASK_ID", "")
+ to_cancel = [
+ t for t in list_task_group_incomplete_tasks(task_group_id) if t != own_task_id
+ ]
+ logger.info(f"Cancelling {len(to_cancel)} tasks")
+ with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+ cancel_futs = [e.submit(do_cancel_task, t) for t in to_cancel]
+ for f in futures.as_completed(cancel_futs):
+ f.result()
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py
new file mode 100644
index 0000000000..1e909d30c7
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py
@@ -0,0 +1,352 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import json
+from collections import namedtuple
+from types import FunctionType
+
+from mozilla_repo_urls import parse
+
+from taskgraph import create
+from taskgraph.config import load_graph_config
+from taskgraph.parameters import Parameters
+from taskgraph.util import hash, taskcluster, yaml
+from taskgraph.util.memoize import memoize
+from taskgraph.util.python_path import import_sibling_modules
+
+actions = []
+callbacks = {}
+
+Action = namedtuple("Action", ["order", "cb_name", "generic", "action_builder"])
+
+
+def is_json(data):
+ """Return ``True``, if ``data`` is a JSON serializable data structure."""
+ try:
+ json.dumps(data)
+ except ValueError:
+ return False
+ return True
+
+
+@memoize
+def read_taskcluster_yml(filename):
+ """Load and parse .taskcluster.yml, memoized to save some time"""
+ return yaml.load_yaml(filename)
+
+
+@memoize
+def hash_taskcluster_yml(filename):
+ """
+ Generate a hash of the given .taskcluster.yml. This is the first 10 digits
+ of the sha256 of the file's content, and is used by administrative scripts
+ to create a hook based on this content.
+ """
+ return hash.hash_path(filename)[:10]
+
+
+def register_callback_action(
+ name,
+ title,
+ symbol,
+ description,
+ order=10000,
+ context=[],
+ available=lambda parameters: True,
+ schema=None,
+ generic=True,
+ cb_name=None,
+):
+ """
+ Register an action callback that can be triggered from supporting
+ user interfaces, such as Treeherder.
+
+ This function is to be used as a decorator for a callback that takes
+ parameters as follows:
+
+ ``parameters``:
+ Decision task :class:`parameters <taskgraph.parameters.Parameters>`.
+ ``input``:
+ Input matching specified JSON schema, ``None`` if no ``schema``
+ parameter is given to ``register_callback_action``.
+ ``task_group_id``:
+ The id of the task-group this was triggered for.
+ ``task_id`` and `task``:
+ task identifier and task definition for task the action was triggered
+ for, ``None`` if no ``context`` parameters was given to
+ ``register_callback_action``.
+
+ Args:
+ name (str):
+ An identifier for this action, used by UIs to find the action.
+ title (str):
+ A human readable title for the action to be used as label on a button
+ or text on a link for triggering the action.
+ symbol (str):
+ Treeherder symbol for the action callback, this is the symbol that the
+ task calling your callback will be displayed as. This is usually 1-3
+ letters abbreviating the action title.
+ description (str):
+ A human readable description of the action in **markdown**.
+ This will be display as tooltip and in dialog window when the action
+ is triggered. This is a good place to describe how to use the action.
+ order (int):
+ Order of the action in menus, this is relative to the ``order`` of
+ other actions declared.
+ context (list of dict):
+ List of tag-sets specifying which tasks the action is can take as input.
+ If no tag-sets is specified as input the action is related to the
+ entire task-group, and won't be triggered with a given task.
+
+ Otherwise, if ``context = [{'k': 'b', 'p': 'l'}, {'k': 't'}]`` will only
+ be displayed in the context menu for tasks that has
+ ``task.tags.k == 'b' && task.tags.p = 'l'`` or ``task.tags.k = 't'``.
+ Essentially, this allows filtering on ``task.tags``.
+
+ If this is a function, it is given the decision parameters and must return
+ a value of the form described above.
+ available (function):
+ An optional function that given decision parameters decides if the
+ action is available. Defaults to a function that always returns ``True``.
+ schema (dict):
+ JSON schema specifying input accepted by the action.
+ This is optional and can be left ``null`` if no input is taken.
+ generic (bool)
+ Whether this is a generic action or has its own permissions.
+ cb_name (str):
+ The name under which this function should be registered, defaulting to
+ `name`. This is used to generation actionPerm for non-generic hook
+ actions, and thus appears in ci-configuration and various role and hook
+ names. Unlike `name`, which can appear multiple times, cb_name must be
+ unique among all registered callbacks.
+
+ Returns:
+ function: Decorator to be used for the callback function.
+ """
+ mem = {"registered": False} # workaround nonlocal missing in 2.x
+
+ assert isinstance(title, str), "title must be a string"
+ assert isinstance(description, str), "description must be a string"
+ title = title.strip()
+ description = description.strip()
+
+ # ensure that context is callable
+ if not callable(context):
+ context_value = context
+ context = lambda params: context_value # noqa
+
+ def register_callback(cb, cb_name=cb_name):
+ assert isinstance(name, str), "name must be a string"
+ assert isinstance(order, int), "order must be an integer"
+ assert callable(schema) or is_json(
+ schema
+ ), "schema must be a JSON compatible object"
+ assert isinstance(cb, FunctionType), "callback must be a function"
+ # Allow for json-e > 25 chars in the symbol.
+ if "$" not in symbol:
+ assert 1 <= len(symbol) <= 25, "symbol must be between 1 and 25 characters"
+ assert isinstance(symbol, str), "symbol must be a string"
+
+ assert not mem[
+ "registered"
+ ], "register_callback_action must be used as decorator"
+ if not cb_name:
+ cb_name = name
+ assert cb_name not in callbacks, "callback name {} is not unique".format(
+ cb_name
+ )
+
+ def action_builder(parameters, graph_config, decision_task_id):
+ if not available(parameters):
+ return None
+
+ actionPerm = "generic" if generic else cb_name
+
+ # gather up the common decision-task-supplied data for this action
+ repo_param = "head_repository"
+ repository = {
+ "url": parameters[repo_param],
+ "project": parameters["project"],
+ "level": parameters["level"],
+ }
+
+ revision = parameters["head_rev"]
+ push = {
+ "owner": "mozilla-taskcluster-maintenance@mozilla.com",
+ "pushlog_id": parameters["pushlog_id"],
+ "revision": revision,
+ }
+ branch = parameters.get("head_ref")
+ if branch:
+ push["branch"] = branch
+
+ action = {
+ "name": name,
+ "title": title,
+ "description": description,
+ # target taskGroupId (the task group this decision task is creating)
+ "taskGroupId": decision_task_id,
+ "cb_name": cb_name,
+ "symbol": symbol,
+ }
+
+ rv = {
+ "name": name,
+ "title": title,
+ "description": description,
+ "context": context(parameters),
+ }
+ if schema:
+ rv["schema"] = (
+ schema(graph_config=graph_config) if callable(schema) else schema
+ )
+
+ trustDomain = graph_config["trust-domain"]
+ level = parameters["level"]
+ tcyml_hash = hash_taskcluster_yml(graph_config.taskcluster_yml)
+
+ # the tcyml_hash is prefixed with `/` in the hookId, so users will be granted
+ # hooks:trigger-hook:project-gecko/in-tree-action-3-myaction/*; if another
+ # action was named `myaction/release`, then the `*` in the scope would also
+ # match that action. To prevent such an accident, we prohibit `/` in hook
+ # names.
+ if "/" in actionPerm:
+ raise Exception("`/` is not allowed in action names; use `-`")
+
+ rv.update(
+ {
+ "kind": "hook",
+ "hookGroupId": f"project-{trustDomain}",
+ "hookId": "in-tree-action-{}-{}/{}".format(
+ level, actionPerm, tcyml_hash
+ ),
+ "hookPayload": {
+ # provide the decision-task parameters as context for triggerHook
+ "decision": {
+ "action": action,
+ "repository": repository,
+ "push": push,
+ },
+ # and pass everything else through from our own context
+ "user": {
+ "input": {"$eval": "input"},
+ "taskId": {"$eval": "taskId"}, # target taskId (or null)
+ "taskGroupId": {
+ "$eval": "taskGroupId"
+ }, # target task group
+ },
+ },
+ "extra": {
+ "actionPerm": actionPerm,
+ },
+ }
+ )
+
+ return rv
+
+ actions.append(Action(order, cb_name, generic, action_builder))
+
+ mem["registered"] = True
+ callbacks[cb_name] = cb
+ return cb
+
+ return register_callback
+
+
+def render_actions_json(parameters, graph_config, decision_task_id):
+ """
+ Render JSON object for the ``public/actions.json`` artifact.
+
+ Args:
+ parameters (:class:`~taskgraph.parameters.Parameters`):
+ Decision task parameters.
+
+ Returns:
+ dict:
+ JSON object representation of the ``public/actions.json``
+ artifact.
+ """
+ assert isinstance(parameters, Parameters), "requires instance of Parameters"
+ actions = []
+ for action in sorted(_get_actions(graph_config), key=lambda action: action.order):
+ action = action.action_builder(parameters, graph_config, decision_task_id)
+ if action:
+ assert is_json(action), "action must be a JSON compatible object"
+ actions.append(action)
+ return {
+ "version": 1,
+ "variables": {},
+ "actions": actions,
+ }
+
+
+def sanity_check_task_scope(callback, parameters, graph_config):
+ """
+ If this action is not generic, then verify that this task has the necessary
+ scope to run the action. This serves as a backstop preventing abuse by
+ running non-generic actions using generic hooks. While scopes should
+ prevent serious damage from such abuse, it's never a valid thing to do.
+ """
+ for action in _get_actions(graph_config):
+ if action.cb_name == callback:
+ break
+ else:
+ raise ValueError(f"No action with cb_name {callback}")
+
+ actionPerm = "generic" if action.generic else action.cb_name
+
+ repo_param = "head_repository"
+ raw_url = parameters[repo_param]
+ parsed_url = parse(raw_url)
+ expected_scope = f"assume:{parsed_url.taskcluster_role_prefix}:action:{actionPerm}"
+
+ # the scope should appear literally; no need for a satisfaction check. The use of
+ # get_current_scopes here calls the auth service through the Taskcluster Proxy, giving
+ # the precise scopes available to this task.
+ if expected_scope not in taskcluster.get_current_scopes():
+ raise ValueError(f"Expected task scope {expected_scope} for this action")
+
+
+def trigger_action_callback(
+ task_group_id, task_id, input, callback, parameters, root, test=False
+):
+ """
+ Trigger action callback with the given inputs. If `test` is true, then run
+ the action callback in testing mode, without actually creating tasks.
+ """
+ graph_config = load_graph_config(root)
+ graph_config.register()
+ callbacks = _get_callbacks(graph_config)
+ cb = callbacks.get(callback, None)
+ if not cb:
+ raise Exception(
+ "Unknown callback: {}. Known callbacks: {}".format(
+ callback, ", ".join(callbacks)
+ )
+ )
+
+ if test:
+ create.testing = True
+ taskcluster.testing = True
+
+ if not test:
+ sanity_check_task_scope(callback, parameters, graph_config)
+
+ cb(Parameters(**parameters), graph_config, input, task_group_id, task_id)
+
+
+def _load(graph_config):
+ # Load all modules from this folder, relying on the side-effects of register_
+ # functions to populate the action registry.
+ import_sibling_modules(exceptions=("util.py",))
+ return callbacks, actions
+
+
+def _get_callbacks(graph_config):
+ return _load(graph_config)[0]
+
+
+def _get_actions(graph_config):
+ return _load(graph_config)[1]
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py
new file mode 100644
index 0000000000..4758beb625
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py
@@ -0,0 +1,301 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import logging
+import sys
+import textwrap
+
+from slugid import nice as slugid
+
+from taskgraph.util import taskcluster
+
+from .registry import register_callback_action
+from .util import (
+ combine_task_graph_files,
+ create_task_from_def,
+ create_tasks,
+ fetch_graph_and_labels,
+ relativize_datestamps,
+)
+
+logger = logging.getLogger(__name__)
+
+RERUN_STATES = ("exception", "failed")
+
+
+def _should_retrigger(task_graph, label):
+ """
+ Return whether a given task in the taskgraph should be retriggered.
+
+ This handles the case where the task isn't there by assuming it should not be.
+ """
+ if label not in task_graph:
+ logger.info(
+ "Task {} not in full taskgraph, assuming task should not be retriggered.".format(
+ label
+ )
+ )
+ return False
+ return task_graph[label].attributes.get("retrigger", False)
+
+
+@register_callback_action(
+ title="Retrigger",
+ name="retrigger",
+ symbol="rt",
+ cb_name="retrigger-decision",
+ description=textwrap.dedent(
+ """\
+ Create a clone of the task (retriggering decision, action, and cron tasks requires
+ special scopes)."""
+ ),
+ order=11,
+ context=[
+ {"kind": "decision-task"},
+ {"kind": "action-callback"},
+ {"kind": "cron-task"},
+ ],
+)
+def retrigger_decision_action(parameters, graph_config, input, task_group_id, task_id):
+ """For a single task, we try to just run exactly the same task once more.
+ It's quite possible that we don't have the scopes to do so (especially for
+ an action), but this is best-effort."""
+
+ # make all of the timestamps relative; they will then be turned back into
+ # absolute timestamps relative to the current time.
+ task = taskcluster.get_task_definition(task_id)
+ task = relativize_datestamps(task)
+ create_task_from_def(slugid(), task, parameters["level"])
+
+
+@register_callback_action(
+ title="Retrigger",
+ name="retrigger",
+ symbol="rt",
+ generic=True,
+ description=("Create a clone of the task."),
+ order=19, # must be greater than other orders in this file, as this is the fallback version
+ context=[{"retrigger": "true"}],
+ schema={
+ "type": "object",
+ "properties": {
+ "downstream": {
+ "type": "boolean",
+ "description": (
+ "If true, downstream tasks from this one will be cloned as well. "
+ "The dependencies will be updated to work with the new task at the root."
+ ),
+ "default": False,
+ },
+ "times": {
+ "type": "integer",
+ "default": 1,
+ "minimum": 1,
+ "maximum": 100,
+ "title": "Times",
+ "description": "How many times to run each task.",
+ },
+ },
+ },
+)
+@register_callback_action(
+ title="Retrigger (disabled)",
+ name="retrigger",
+ cb_name="retrigger-disabled",
+ symbol="rt",
+ generic=True,
+ description=(
+ "Create a clone of the task.\n\n"
+ "This type of task should typically be re-run instead of re-triggered."
+ ),
+ order=20, # must be greater than other orders in this file, as this is the fallback version
+ context=[{}],
+ schema={
+ "type": "object",
+ "properties": {
+ "downstream": {
+ "type": "boolean",
+ "description": (
+ "If true, downstream tasks from this one will be cloned as well. "
+ "The dependencies will be updated to work with the new task at the root."
+ ),
+ "default": False,
+ },
+ "times": {
+ "type": "integer",
+ "default": 1,
+ "minimum": 1,
+ "maximum": 100,
+ "title": "Times",
+ "description": "How many times to run each task.",
+ },
+ "force": {
+ "type": "boolean",
+ "default": False,
+ "description": (
+ "This task should not be re-triggered. "
+ "This can be overridden by passing `true` here."
+ ),
+ },
+ },
+ },
+)
+def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
+ parameters, graph_config
+ )
+
+ task = taskcluster.get_task_definition(task_id)
+ label = task["metadata"]["name"]
+
+ with_downstream = " "
+ to_run = [label]
+
+ if not input.get("force", None) and not _should_retrigger(full_task_graph, label):
+ logger.info(
+ "Not retriggering task {}, task should not be retrigged "
+ "and force not specified.".format(label)
+ )
+ sys.exit(1)
+
+ if input.get("downstream"):
+ to_run = full_task_graph.graph.transitive_closure(
+ set(to_run), reverse=True
+ ).nodes
+ to_run = to_run & set(label_to_taskid.keys())
+ with_downstream = " (with downstream) "
+
+ times = input.get("times", 1)
+ for i in range(times):
+ create_tasks(
+ graph_config,
+ to_run,
+ full_task_graph,
+ label_to_taskid,
+ parameters,
+ decision_task_id,
+ i,
+ )
+
+ logger.info(f"Scheduled {label}{with_downstream}(time {i + 1}/{times})")
+ combine_task_graph_files(list(range(times)))
+
+
+@register_callback_action(
+ title="Rerun",
+ name="rerun",
+ generic=True,
+ symbol="rr",
+ description=(
+ "Rerun a task.\n\n"
+ "This only works on failed or exception tasks in the original taskgraph,"
+ " and is CoT friendly."
+ ),
+ order=300,
+ context=[{}],
+ schema={"type": "object", "properties": {}},
+)
+def rerun_action(parameters, graph_config, input, task_group_id, task_id):
+ task = taskcluster.get_task_definition(task_id)
+ parameters = dict(parameters)
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
+ parameters, graph_config
+ )
+ label = task["metadata"]["name"]
+ if task_id not in label_to_taskid.values():
+ logger.error(
+ "Refusing to rerun {}: taskId {} not in decision task {} label_to_taskid!".format(
+ label, task_id, decision_task_id
+ )
+ )
+
+ _rerun_task(task_id, label)
+
+
+def _rerun_task(task_id, label):
+ state = taskcluster.state_task(task_id)
+ if state not in RERUN_STATES:
+ logger.warning(
+ "No need to rerun {}: state '{}' not in {}!".format(
+ label, state, RERUN_STATES
+ )
+ )
+ return
+ taskcluster.rerun_task(task_id)
+ logger.info(f"Reran {label}")
+
+
+@register_callback_action(
+ title="Retrigger",
+ name="retrigger-multiple",
+ symbol="rt",
+ generic=True,
+ description=("Create a clone of the task."),
+ context=[],
+ schema={
+ "type": "object",
+ "properties": {
+ "requests": {
+ "type": "array",
+ "items": {
+ "tasks": {
+ "type": "array",
+ "description": "An array of task labels",
+ "items": {"type": "string"},
+ },
+ "times": {
+ "type": "integer",
+ "minimum": 1,
+ "maximum": 100,
+ "title": "Times",
+ "description": "How many times to run each task.",
+ },
+ "additionalProperties": False,
+ },
+ },
+ "additionalProperties": False,
+ },
+ },
+)
+def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id):
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
+ parameters, graph_config
+ )
+
+ suffixes = []
+ for i, request in enumerate(input.get("requests", [])):
+ times = request.get("times", 1)
+ rerun_tasks = [
+ label
+ for label in request.get("tasks")
+ if not _should_retrigger(full_task_graph, label)
+ ]
+ retrigger_tasks = [
+ label
+ for label in request.get("tasks")
+ if _should_retrigger(full_task_graph, label)
+ ]
+
+ for label in rerun_tasks:
+ # XXX we should not re-run tasks pulled in from other pushes
+ # In practice, this shouldn't matter, as only completed tasks
+ # are pulled in from other pushes and treeherder won't pass
+ # those labels.
+ _rerun_task(label_to_taskid[label], label)
+
+ for j in range(times):
+ suffix = f"{i}-{j}"
+ suffixes.append(suffix)
+ create_tasks(
+ graph_config,
+ retrigger_tasks,
+ full_task_graph,
+ label_to_taskid,
+ parameters,
+ decision_task_id,
+ suffix,
+ )
+
+ combine_task_graph_files(suffixes)
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py
new file mode 100644
index 0000000000..dd3248d209
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py
@@ -0,0 +1,282 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import concurrent.futures as futures
+import copy
+import logging
+import os
+import re
+from functools import reduce
+
+from requests.exceptions import HTTPError
+
+from taskgraph import create
+from taskgraph.decision import read_artifact, rename_artifact, write_artifact
+from taskgraph.optimize.base import optimize_task_graph
+from taskgraph.taskgraph import TaskGraph
+from taskgraph.util.taskcluster import (
+ CONCURRENCY,
+ get_artifact,
+ get_session,
+ list_tasks,
+ parse_time,
+)
+from taskgraph.util.taskgraph import find_decision_task
+
+logger = logging.getLogger(__name__)
+
+
+def get_parameters(decision_task_id):
+ return get_artifact(decision_task_id, "public/parameters.yml")
+
+
+def fetch_graph_and_labels(parameters, graph_config):
+ decision_task_id = find_decision_task(parameters, graph_config)
+
+ # First grab the graph and labels generated during the initial decision task
+ full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+ _, full_task_graph = TaskGraph.from_json(full_task_graph)
+ label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+ # fetch everything in parallel; this avoids serializing any delay in downloading
+ # each artifact (such as waiting for the artifact to be mirrored locally)
+ with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+ fetches = []
+
+ # fetch any modifications made by action tasks and swap out new tasks
+ # for old ones
+ def fetch_action(task_id):
+ logger.info(f"fetching label-to-taskid.json for action task {task_id}")
+ try:
+ run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
+ label_to_taskid.update(run_label_to_id)
+ except HTTPError as e:
+ if e.response.status_code != 404:
+ raise
+ logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")
+
+ namespace = "{}.v2.{}.pushlog-id.{}.actions".format(
+ graph_config["trust-domain"],
+ parameters["project"],
+ parameters["pushlog_id"],
+ )
+ for task_id in list_tasks(namespace):
+ fetches.append(e.submit(fetch_action, task_id))
+
+ # Similarly for cron tasks..
+ def fetch_cron(task_id):
+ logger.info(f"fetching label-to-taskid.json for cron task {task_id}")
+ try:
+ run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
+ label_to_taskid.update(run_label_to_id)
+ except HTTPError as e:
+ if e.response.status_code != 404:
+ raise
+ logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")
+
+ namespace = "{}.v2.{}.revision.{}.cron".format(
+ graph_config["trust-domain"], parameters["project"], parameters["head_rev"]
+ )
+ for task_id in list_tasks(namespace):
+ fetches.append(e.submit(fetch_cron, task_id))
+
+ # now wait for each fetch to complete, raising an exception if there
+ # were any issues
+ for f in futures.as_completed(fetches):
+ f.result()
+
+ return (decision_task_id, full_task_graph, label_to_taskid)
+
+
+def create_task_from_def(task_id, task_def, level):
+ """Create a new task from a definition rather than from a label
+ that is already in the full-task-graph. The task definition will
+ have {relative-datestamp': '..'} rendered just like in a decision task.
+ Use this for entirely new tasks or ones that change internals of the task.
+ It is useful if you want to "edit" the full_task_graph and then hand
+ it to this function. No dependencies will be scheduled. You must handle
+ this yourself. Seeing how create_tasks handles it might prove helpful."""
+ task_def["schedulerId"] = f"gecko-level-{level}"
+ label = task_def["metadata"]["name"]
+ session = get_session()
+ create.create_task(session, task_id, label, task_def)
+
+
+def update_parent(task, graph):
+ task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "")
+ return task
+
+
+def update_dependencies(task, graph):
+ if os.environ.get("TASK_ID"):
+ task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"])
+ return task
+
+
+def create_tasks(
+ graph_config,
+ to_run,
+ full_task_graph,
+ label_to_taskid,
+ params,
+ decision_task_id=None,
+ suffix="",
+ modifier=lambda t: t,
+):
+ """Create new tasks. The task definition will have {relative-datestamp':
+ '..'} rendered just like in a decision task. Action callbacks should use
+ this function to create new tasks,
+ allowing easy debugging with `mach taskgraph action-callback --test`.
+ This builds up all required tasks to run in order to run the tasks requested.
+
+ Optionally this function takes a `modifier` function that is passed in each
+ task before it is put into a new graph. It should return a valid task. Note
+ that this is passed _all_ tasks in the graph, not just the set in to_run. You
+ may want to skip modifying tasks not in your to_run list.
+
+ If `suffix` is given, then it is used to give unique names to the resulting
+ artifacts. If you call this function multiple times in the same action,
+ pass a different suffix each time to avoid overwriting artifacts.
+
+ If you wish to create the tasks in a new group, leave out decision_task_id.
+
+ Returns an updated label_to_taskid containing the new tasks"""
+ if suffix != "":
+ suffix = f"-{suffix}"
+ to_run = set(to_run)
+
+ # Copy to avoid side-effects later
+ full_task_graph = copy.deepcopy(full_task_graph)
+ label_to_taskid = label_to_taskid.copy()
+
+ target_graph = full_task_graph.graph.transitive_closure(to_run)
+ target_task_graph = TaskGraph(
+ {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph
+ )
+ target_task_graph.for_each_task(update_parent)
+ if decision_task_id and decision_task_id != os.environ.get("TASK_ID"):
+ target_task_graph.for_each_task(update_dependencies)
+ optimized_task_graph, label_to_taskid = optimize_task_graph(
+ target_task_graph,
+ to_run,
+ params,
+ to_run,
+ decision_task_id,
+ existing_tasks=label_to_taskid,
+ )
+ write_artifact(f"task-graph{suffix}.json", optimized_task_graph.to_json())
+ write_artifact(f"label-to-taskid{suffix}.json", label_to_taskid)
+ write_artifact(f"to-run{suffix}.json", list(to_run))
+ create.create_tasks(
+ graph_config,
+ optimized_task_graph,
+ label_to_taskid,
+ params,
+ decision_task_id,
+ )
+ return label_to_taskid
+
+
+def _update_reducer(accumulator, new_value):
+ "similar to set or dict `update` method, but returning the modified object"
+ accumulator.update(new_value)
+ return accumulator
+
+
+def combine_task_graph_files(suffixes):
+ """Combine task-graph-{suffix}.json files into a single task-graph.json file.
+
+ Since Chain of Trust verification requires a task-graph.json file that
+ contains all children tasks, we can combine the various task-graph-0.json
+ type files into a master task-graph.json file at the end.
+
+ Actions also look for various artifacts, so we combine those in a similar
+ fashion.
+
+ In the case where there is only one suffix, we simply rename it to avoid the
+ additional cost of uploading two copies of the same data.
+ """
+
+ if len(suffixes) == 1:
+ for filename in ["task-graph", "label-to-taskid", "to-run"]:
+ rename_artifact(f"{filename}-{suffixes[0]}.json", f"{filename}.json")
+ return
+
+ def combine(file_contents, base):
+ return reduce(_update_reducer, file_contents, base)
+
+ files = [read_artifact(f"task-graph-{suffix}.json") for suffix in suffixes]
+ write_artifact("task-graph.json", combine(files, dict()))
+
+ files = [read_artifact(f"label-to-taskid-{suffix}.json") for suffix in suffixes]
+ write_artifact("label-to-taskid.json", combine(files, dict()))
+
+ files = [read_artifact(f"to-run-{suffix}.json") for suffix in suffixes]
+ write_artifact("to-run.json", list(combine(files, set())))
+
+
+def relativize_datestamps(task_def):
+ """
+ Given a task definition as received from the queue, convert all datestamps
+ to {relative_datestamp: ..} format, with the task creation time as "now".
+ The result is useful for handing to ``create_task``.
+ """
+ base = parse_time(task_def["created"])
+ # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js
+ ts_pattern = re.compile(
+ r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]"
+ r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?"
+ r"(?:z|[+-]\d\d:\d\d)$",
+ re.I,
+ )
+
+ def recurse(value):
+ if isinstance(value, str):
+ if ts_pattern.match(value):
+ value = parse_time(value)
+ diff = value - base
+ return {"relative-datestamp": f"{int(diff.total_seconds())} seconds"}
+ if isinstance(value, list):
+ return [recurse(e) for e in value]
+ if isinstance(value, dict):
+ return {k: recurse(v) for k, v in value.items()}
+ return value
+
+ return recurse(task_def)
+
+
+def add_args_to_command(cmd_parts, extra_args=[]):
+ """
+ Add custom command line args to a given command.
+
+ Args:
+ cmd_parts: the raw command as seen by taskcluster
+ extra_args: array of args we want to add
+ """
+ cmd_type = "default"
+ if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict):
+ # windows has single cmd part as dict: 'task-reference', with long string
+ cmd_parts = cmd_parts[0]["task-reference"].split(" ")
+ cmd_type = "dict"
+ elif len(cmd_parts) == 1 and (
+ isinstance(cmd_parts[0], str) or isinstance(cmd_parts[0], str)
+ ):
+ # windows has single cmd part as a long string
+ cmd_parts = cmd_parts[0].split(" ")
+ cmd_type = "unicode"
+ elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list):
+ # osx has an single value array with an array inside
+ cmd_parts = cmd_parts[0]
+ cmd_type = "subarray"
+
+ cmd_parts.extend(extra_args)
+
+ if cmd_type == "dict":
+ cmd_parts = [{"task-reference": " ".join(cmd_parts)}]
+ elif cmd_type == "unicode":
+ cmd_parts = [" ".join(cmd_parts)]
+ elif cmd_type == "subarray":
+ cmd_parts = [cmd_parts]
+ return cmd_parts