diff options
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/actions')
7 files changed, 1118 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py new file mode 100644 index 0000000000..590a957282 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/__init__.py @@ -0,0 +1,16 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from .registry import ( + register_callback_action, + render_actions_json, + trigger_action_callback, +) + +__all__ = [ + "register_callback_action", + "render_actions_json", + "trigger_action_callback", +] diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py new file mode 100644 index 0000000000..fc10668566 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/add_new_jobs.py @@ -0,0 +1,64 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +from taskgraph.actions.registry import register_callback_action +from taskgraph.actions.util import ( + combine_task_graph_files, + create_tasks, + fetch_graph_and_labels, +) + + +@register_callback_action( + name="add-new-jobs", + title="Add new jobs", + generic=True, + symbol="add-new", + description="Add new jobs using task labels.", + order=100, + context=[], + schema={ + "type": "object", + "properties": { + "tasks": { + "type": "array", + "description": "An array of task labels", + "items": {"type": "string"}, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + }, + }, +) +def add_new_jobs_action(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + to_run = [] + for elem in input["tasks"]: + if elem in full_task_graph.tasks: + to_run.append(elem) + else: + raise Exception(f"{elem} was not found in the task-graph") + + times = input.get("times", 1) + for i in range(times): + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + i, + ) + combine_task_graph_files(list(range(times))) diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py new file mode 100644 index 0000000000..03788c6538 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel.py @@ -0,0 +1,42 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging + +import requests + +from taskgraph.util.taskcluster import cancel_task + +from .registry import register_callback_action + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="Cancel Task", + name="cancel", + symbol="cx", + generic=True, + description=("Cancel the given task"), + order=350, + context=[{}], +) +def cancel_action(parameters, graph_config, input, task_group_id, task_id): + # Note that this is limited by the scopes afforded to generic actions to + # only cancel tasks with the level-specific schedulerId. + try: + cancel_task(task_id, use_proxy=True) + except requests.HTTPError as e: + if e.response.status_code == 409: + # A 409 response indicates that this task is past its deadline. It + # cannot be cancelled at this time, but it's also not running + # anymore, so we can ignore this error. + logger.info( + 'Task "{}" is past its deadline and cannot be cancelled.'.format( + task_id + ) + ) + return + raise diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py new file mode 100644 index 0000000000..b2636f46a3 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/cancel_all.py @@ -0,0 +1,61 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import concurrent.futures as futures +import logging +import os + +import requests + +from taskgraph.util.taskcluster import ( + CONCURRENCY, + cancel_task, + list_task_group_incomplete_tasks, +) + +from .registry import register_callback_action + +logger = logging.getLogger(__name__) + + +@register_callback_action( + title="Cancel All", + name="cancel-all", + generic=True, + symbol="cAll", + description=( + "Cancel all running and pending tasks created by the decision task " + "this action task is associated with." + ), + order=400, + context=[], +) +def cancel_all_action(parameters, graph_config, input, task_group_id, task_id): + def do_cancel_task(task_id): + logger.info(f"Cancelling task {task_id}") + try: + cancel_task(task_id, use_proxy=True) + except requests.HTTPError as e: + if e.response.status_code == 409: + # A 409 response indicates that this task is past its deadline. It + # cannot be cancelled at this time, but it's also not running + # anymore, so we can ignore this error. + logger.info( + "Task {} is past its deadline and cannot be cancelled.".format( + task_id + ) + ) + return + raise + + own_task_id = os.environ.get("TASK_ID", "") + to_cancel = [ + t for t in list_task_group_incomplete_tasks(task_group_id) if t != own_task_id + ] + logger.info(f"Cancelling {len(to_cancel)} tasks") + with futures.ThreadPoolExecutor(CONCURRENCY) as e: + cancel_futs = [e.submit(do_cancel_task, t) for t in to_cancel] + for f in futures.as_completed(cancel_futs): + f.result() diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py new file mode 100644 index 0000000000..1e909d30c7 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/registry.py @@ -0,0 +1,352 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import json +from collections import namedtuple +from types import FunctionType + +from mozilla_repo_urls import parse + +from taskgraph import create +from taskgraph.config import load_graph_config +from taskgraph.parameters import Parameters +from taskgraph.util import hash, taskcluster, yaml +from taskgraph.util.memoize import memoize +from taskgraph.util.python_path import import_sibling_modules + +actions = [] +callbacks = {} + +Action = namedtuple("Action", ["order", "cb_name", "generic", "action_builder"]) + + +def is_json(data): + """Return ``True``, if ``data`` is a JSON serializable data structure.""" + try: + json.dumps(data) + except ValueError: + return False + return True + + +@memoize +def read_taskcluster_yml(filename): + """Load and parse .taskcluster.yml, memoized to save some time""" + return yaml.load_yaml(filename) + + +@memoize +def hash_taskcluster_yml(filename): + """ + Generate a hash of the given .taskcluster.yml. This is the first 10 digits + of the sha256 of the file's content, and is used by administrative scripts + to create a hook based on this content. + """ + return hash.hash_path(filename)[:10] + + +def register_callback_action( + name, + title, + symbol, + description, + order=10000, + context=[], + available=lambda parameters: True, + schema=None, + generic=True, + cb_name=None, +): + """ + Register an action callback that can be triggered from supporting + user interfaces, such as Treeherder. + + This function is to be used as a decorator for a callback that takes + parameters as follows: + + ``parameters``: + Decision task :class:`parameters <taskgraph.parameters.Parameters>`. + ``input``: + Input matching specified JSON schema, ``None`` if no ``schema`` + parameter is given to ``register_callback_action``. + ``task_group_id``: + The id of the task-group this was triggered for. + ``task_id`` and `task``: + task identifier and task definition for task the action was triggered + for, ``None`` if no ``context`` parameters was given to + ``register_callback_action``. + + Args: + name (str): + An identifier for this action, used by UIs to find the action. + title (str): + A human readable title for the action to be used as label on a button + or text on a link for triggering the action. + symbol (str): + Treeherder symbol for the action callback, this is the symbol that the + task calling your callback will be displayed as. This is usually 1-3 + letters abbreviating the action title. + description (str): + A human readable description of the action in **markdown**. + This will be display as tooltip and in dialog window when the action + is triggered. This is a good place to describe how to use the action. + order (int): + Order of the action in menus, this is relative to the ``order`` of + other actions declared. + context (list of dict): + List of tag-sets specifying which tasks the action is can take as input. + If no tag-sets is specified as input the action is related to the + entire task-group, and won't be triggered with a given task. + + Otherwise, if ``context = [{'k': 'b', 'p': 'l'}, {'k': 't'}]`` will only + be displayed in the context menu for tasks that has + ``task.tags.k == 'b' && task.tags.p = 'l'`` or ``task.tags.k = 't'``. + Essentially, this allows filtering on ``task.tags``. + + If this is a function, it is given the decision parameters and must return + a value of the form described above. + available (function): + An optional function that given decision parameters decides if the + action is available. Defaults to a function that always returns ``True``. + schema (dict): + JSON schema specifying input accepted by the action. + This is optional and can be left ``null`` if no input is taken. + generic (bool) + Whether this is a generic action or has its own permissions. + cb_name (str): + The name under which this function should be registered, defaulting to + `name`. This is used to generation actionPerm for non-generic hook + actions, and thus appears in ci-configuration and various role and hook + names. Unlike `name`, which can appear multiple times, cb_name must be + unique among all registered callbacks. + + Returns: + function: Decorator to be used for the callback function. + """ + mem = {"registered": False} # workaround nonlocal missing in 2.x + + assert isinstance(title, str), "title must be a string" + assert isinstance(description, str), "description must be a string" + title = title.strip() + description = description.strip() + + # ensure that context is callable + if not callable(context): + context_value = context + context = lambda params: context_value # noqa + + def register_callback(cb, cb_name=cb_name): + assert isinstance(name, str), "name must be a string" + assert isinstance(order, int), "order must be an integer" + assert callable(schema) or is_json( + schema + ), "schema must be a JSON compatible object" + assert isinstance(cb, FunctionType), "callback must be a function" + # Allow for json-e > 25 chars in the symbol. + if "$" not in symbol: + assert 1 <= len(symbol) <= 25, "symbol must be between 1 and 25 characters" + assert isinstance(symbol, str), "symbol must be a string" + + assert not mem[ + "registered" + ], "register_callback_action must be used as decorator" + if not cb_name: + cb_name = name + assert cb_name not in callbacks, "callback name {} is not unique".format( + cb_name + ) + + def action_builder(parameters, graph_config, decision_task_id): + if not available(parameters): + return None + + actionPerm = "generic" if generic else cb_name + + # gather up the common decision-task-supplied data for this action + repo_param = "head_repository" + repository = { + "url": parameters[repo_param], + "project": parameters["project"], + "level": parameters["level"], + } + + revision = parameters["head_rev"] + push = { + "owner": "mozilla-taskcluster-maintenance@mozilla.com", + "pushlog_id": parameters["pushlog_id"], + "revision": revision, + } + branch = parameters.get("head_ref") + if branch: + push["branch"] = branch + + action = { + "name": name, + "title": title, + "description": description, + # target taskGroupId (the task group this decision task is creating) + "taskGroupId": decision_task_id, + "cb_name": cb_name, + "symbol": symbol, + } + + rv = { + "name": name, + "title": title, + "description": description, + "context": context(parameters), + } + if schema: + rv["schema"] = ( + schema(graph_config=graph_config) if callable(schema) else schema + ) + + trustDomain = graph_config["trust-domain"] + level = parameters["level"] + tcyml_hash = hash_taskcluster_yml(graph_config.taskcluster_yml) + + # the tcyml_hash is prefixed with `/` in the hookId, so users will be granted + # hooks:trigger-hook:project-gecko/in-tree-action-3-myaction/*; if another + # action was named `myaction/release`, then the `*` in the scope would also + # match that action. To prevent such an accident, we prohibit `/` in hook + # names. + if "/" in actionPerm: + raise Exception("`/` is not allowed in action names; use `-`") + + rv.update( + { + "kind": "hook", + "hookGroupId": f"project-{trustDomain}", + "hookId": "in-tree-action-{}-{}/{}".format( + level, actionPerm, tcyml_hash + ), + "hookPayload": { + # provide the decision-task parameters as context for triggerHook + "decision": { + "action": action, + "repository": repository, + "push": push, + }, + # and pass everything else through from our own context + "user": { + "input": {"$eval": "input"}, + "taskId": {"$eval": "taskId"}, # target taskId (or null) + "taskGroupId": { + "$eval": "taskGroupId" + }, # target task group + }, + }, + "extra": { + "actionPerm": actionPerm, + }, + } + ) + + return rv + + actions.append(Action(order, cb_name, generic, action_builder)) + + mem["registered"] = True + callbacks[cb_name] = cb + return cb + + return register_callback + + +def render_actions_json(parameters, graph_config, decision_task_id): + """ + Render JSON object for the ``public/actions.json`` artifact. + + Args: + parameters (:class:`~taskgraph.parameters.Parameters`): + Decision task parameters. + + Returns: + dict: + JSON object representation of the ``public/actions.json`` + artifact. + """ + assert isinstance(parameters, Parameters), "requires instance of Parameters" + actions = [] + for action in sorted(_get_actions(graph_config), key=lambda action: action.order): + action = action.action_builder(parameters, graph_config, decision_task_id) + if action: + assert is_json(action), "action must be a JSON compatible object" + actions.append(action) + return { + "version": 1, + "variables": {}, + "actions": actions, + } + + +def sanity_check_task_scope(callback, parameters, graph_config): + """ + If this action is not generic, then verify that this task has the necessary + scope to run the action. This serves as a backstop preventing abuse by + running non-generic actions using generic hooks. While scopes should + prevent serious damage from such abuse, it's never a valid thing to do. + """ + for action in _get_actions(graph_config): + if action.cb_name == callback: + break + else: + raise ValueError(f"No action with cb_name {callback}") + + actionPerm = "generic" if action.generic else action.cb_name + + repo_param = "head_repository" + raw_url = parameters[repo_param] + parsed_url = parse(raw_url) + expected_scope = f"assume:{parsed_url.taskcluster_role_prefix}:action:{actionPerm}" + + # the scope should appear literally; no need for a satisfaction check. The use of + # get_current_scopes here calls the auth service through the Taskcluster Proxy, giving + # the precise scopes available to this task. + if expected_scope not in taskcluster.get_current_scopes(): + raise ValueError(f"Expected task scope {expected_scope} for this action") + + +def trigger_action_callback( + task_group_id, task_id, input, callback, parameters, root, test=False +): + """ + Trigger action callback with the given inputs. If `test` is true, then run + the action callback in testing mode, without actually creating tasks. + """ + graph_config = load_graph_config(root) + graph_config.register() + callbacks = _get_callbacks(graph_config) + cb = callbacks.get(callback, None) + if not cb: + raise Exception( + "Unknown callback: {}. Known callbacks: {}".format( + callback, ", ".join(callbacks) + ) + ) + + if test: + create.testing = True + taskcluster.testing = True + + if not test: + sanity_check_task_scope(callback, parameters, graph_config) + + cb(Parameters(**parameters), graph_config, input, task_group_id, task_id) + + +def _load(graph_config): + # Load all modules from this folder, relying on the side-effects of register_ + # functions to populate the action registry. + import_sibling_modules(exceptions=("util.py",)) + return callbacks, actions + + +def _get_callbacks(graph_config): + return _load(graph_config)[0] + + +def _get_actions(graph_config): + return _load(graph_config)[1] diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py new file mode 100644 index 0000000000..4758beb625 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/retrigger.py @@ -0,0 +1,301 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import logging +import sys +import textwrap + +from slugid import nice as slugid + +from taskgraph.util import taskcluster + +from .registry import register_callback_action +from .util import ( + combine_task_graph_files, + create_task_from_def, + create_tasks, + fetch_graph_and_labels, + relativize_datestamps, +) + +logger = logging.getLogger(__name__) + +RERUN_STATES = ("exception", "failed") + + +def _should_retrigger(task_graph, label): + """ + Return whether a given task in the taskgraph should be retriggered. + + This handles the case where the task isn't there by assuming it should not be. + """ + if label not in task_graph: + logger.info( + "Task {} not in full taskgraph, assuming task should not be retriggered.".format( + label + ) + ) + return False + return task_graph[label].attributes.get("retrigger", False) + + +@register_callback_action( + title="Retrigger", + name="retrigger", + symbol="rt", + cb_name="retrigger-decision", + description=textwrap.dedent( + """\ + Create a clone of the task (retriggering decision, action, and cron tasks requires + special scopes).""" + ), + order=11, + context=[ + {"kind": "decision-task"}, + {"kind": "action-callback"}, + {"kind": "cron-task"}, + ], +) +def retrigger_decision_action(parameters, graph_config, input, task_group_id, task_id): + """For a single task, we try to just run exactly the same task once more. + It's quite possible that we don't have the scopes to do so (especially for + an action), but this is best-effort.""" + + # make all of the timestamps relative; they will then be turned back into + # absolute timestamps relative to the current time. + task = taskcluster.get_task_definition(task_id) + task = relativize_datestamps(task) + create_task_from_def(slugid(), task, parameters["level"]) + + +@register_callback_action( + title="Retrigger", + name="retrigger", + symbol="rt", + generic=True, + description=("Create a clone of the task."), + order=19, # must be greater than other orders in this file, as this is the fallback version + context=[{"retrigger": "true"}], + schema={ + "type": "object", + "properties": { + "downstream": { + "type": "boolean", + "description": ( + "If true, downstream tasks from this one will be cloned as well. " + "The dependencies will be updated to work with the new task at the root." + ), + "default": False, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + }, + }, +) +@register_callback_action( + title="Retrigger (disabled)", + name="retrigger", + cb_name="retrigger-disabled", + symbol="rt", + generic=True, + description=( + "Create a clone of the task.\n\n" + "This type of task should typically be re-run instead of re-triggered." + ), + order=20, # must be greater than other orders in this file, as this is the fallback version + context=[{}], + schema={ + "type": "object", + "properties": { + "downstream": { + "type": "boolean", + "description": ( + "If true, downstream tasks from this one will be cloned as well. " + "The dependencies will be updated to work with the new task at the root." + ), + "default": False, + }, + "times": { + "type": "integer", + "default": 1, + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + "force": { + "type": "boolean", + "default": False, + "description": ( + "This task should not be re-triggered. " + "This can be overridden by passing `true` here." + ), + }, + }, + }, +) +def retrigger_action(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + task = taskcluster.get_task_definition(task_id) + label = task["metadata"]["name"] + + with_downstream = " " + to_run = [label] + + if not input.get("force", None) and not _should_retrigger(full_task_graph, label): + logger.info( + "Not retriggering task {}, task should not be retrigged " + "and force not specified.".format(label) + ) + sys.exit(1) + + if input.get("downstream"): + to_run = full_task_graph.graph.transitive_closure( + set(to_run), reverse=True + ).nodes + to_run = to_run & set(label_to_taskid.keys()) + with_downstream = " (with downstream) " + + times = input.get("times", 1) + for i in range(times): + create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + i, + ) + + logger.info(f"Scheduled {label}{with_downstream}(time {i + 1}/{times})") + combine_task_graph_files(list(range(times))) + + +@register_callback_action( + title="Rerun", + name="rerun", + generic=True, + symbol="rr", + description=( + "Rerun a task.\n\n" + "This only works on failed or exception tasks in the original taskgraph," + " and is CoT friendly." + ), + order=300, + context=[{}], + schema={"type": "object", "properties": {}}, +) +def rerun_action(parameters, graph_config, input, task_group_id, task_id): + task = taskcluster.get_task_definition(task_id) + parameters = dict(parameters) + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + label = task["metadata"]["name"] + if task_id not in label_to_taskid.values(): + logger.error( + "Refusing to rerun {}: taskId {} not in decision task {} label_to_taskid!".format( + label, task_id, decision_task_id + ) + ) + + _rerun_task(task_id, label) + + +def _rerun_task(task_id, label): + state = taskcluster.state_task(task_id) + if state not in RERUN_STATES: + logger.warning( + "No need to rerun {}: state '{}' not in {}!".format( + label, state, RERUN_STATES + ) + ) + return + taskcluster.rerun_task(task_id) + logger.info(f"Reran {label}") + + +@register_callback_action( + title="Retrigger", + name="retrigger-multiple", + symbol="rt", + generic=True, + description=("Create a clone of the task."), + context=[], + schema={ + "type": "object", + "properties": { + "requests": { + "type": "array", + "items": { + "tasks": { + "type": "array", + "description": "An array of task labels", + "items": {"type": "string"}, + }, + "times": { + "type": "integer", + "minimum": 1, + "maximum": 100, + "title": "Times", + "description": "How many times to run each task.", + }, + "additionalProperties": False, + }, + }, + "additionalProperties": False, + }, + }, +) +def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id): + decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels( + parameters, graph_config + ) + + suffixes = [] + for i, request in enumerate(input.get("requests", [])): + times = request.get("times", 1) + rerun_tasks = [ + label + for label in request.get("tasks") + if not _should_retrigger(full_task_graph, label) + ] + retrigger_tasks = [ + label + for label in request.get("tasks") + if _should_retrigger(full_task_graph, label) + ] + + for label in rerun_tasks: + # XXX we should not re-run tasks pulled in from other pushes + # In practice, this shouldn't matter, as only completed tasks + # are pulled in from other pushes and treeherder won't pass + # those labels. + _rerun_task(label_to_taskid[label], label) + + for j in range(times): + suffix = f"{i}-{j}" + suffixes.append(suffix) + create_tasks( + graph_config, + retrigger_tasks, + full_task_graph, + label_to_taskid, + parameters, + decision_task_id, + suffix, + ) + + combine_task_graph_files(suffixes) diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py b/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py new file mode 100644 index 0000000000..dd3248d209 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/actions/util.py @@ -0,0 +1,282 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import concurrent.futures as futures +import copy +import logging +import os +import re +from functools import reduce + +from requests.exceptions import HTTPError + +from taskgraph import create +from taskgraph.decision import read_artifact, rename_artifact, write_artifact +from taskgraph.optimize.base import optimize_task_graph +from taskgraph.taskgraph import TaskGraph +from taskgraph.util.taskcluster import ( + CONCURRENCY, + get_artifact, + get_session, + list_tasks, + parse_time, +) +from taskgraph.util.taskgraph import find_decision_task + +logger = logging.getLogger(__name__) + + +def get_parameters(decision_task_id): + return get_artifact(decision_task_id, "public/parameters.yml") + + +def fetch_graph_and_labels(parameters, graph_config): + decision_task_id = find_decision_task(parameters, graph_config) + + # First grab the graph and labels generated during the initial decision task + full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json") + _, full_task_graph = TaskGraph.from_json(full_task_graph) + label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json") + + # fetch everything in parallel; this avoids serializing any delay in downloading + # each artifact (such as waiting for the artifact to be mirrored locally) + with futures.ThreadPoolExecutor(CONCURRENCY) as e: + fetches = [] + + # fetch any modifications made by action tasks and swap out new tasks + # for old ones + def fetch_action(task_id): + logger.info(f"fetching label-to-taskid.json for action task {task_id}") + try: + run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + label_to_taskid.update(run_label_to_id) + except HTTPError as e: + if e.response.status_code != 404: + raise + logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") + + namespace = "{}.v2.{}.pushlog-id.{}.actions".format( + graph_config["trust-domain"], + parameters["project"], + parameters["pushlog_id"], + ) + for task_id in list_tasks(namespace): + fetches.append(e.submit(fetch_action, task_id)) + + # Similarly for cron tasks.. + def fetch_cron(task_id): + logger.info(f"fetching label-to-taskid.json for cron task {task_id}") + try: + run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + label_to_taskid.update(run_label_to_id) + except HTTPError as e: + if e.response.status_code != 404: + raise + logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") + + namespace = "{}.v2.{}.revision.{}.cron".format( + graph_config["trust-domain"], parameters["project"], parameters["head_rev"] + ) + for task_id in list_tasks(namespace): + fetches.append(e.submit(fetch_cron, task_id)) + + # now wait for each fetch to complete, raising an exception if there + # were any issues + for f in futures.as_completed(fetches): + f.result() + + return (decision_task_id, full_task_graph, label_to_taskid) + + +def create_task_from_def(task_id, task_def, level): + """Create a new task from a definition rather than from a label + that is already in the full-task-graph. The task definition will + have {relative-datestamp': '..'} rendered just like in a decision task. + Use this for entirely new tasks or ones that change internals of the task. + It is useful if you want to "edit" the full_task_graph and then hand + it to this function. No dependencies will be scheduled. You must handle + this yourself. Seeing how create_tasks handles it might prove helpful.""" + task_def["schedulerId"] = f"gecko-level-{level}" + label = task_def["metadata"]["name"] + session = get_session() + create.create_task(session, task_id, label, task_def) + + +def update_parent(task, graph): + task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "") + return task + + +def update_dependencies(task, graph): + if os.environ.get("TASK_ID"): + task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"]) + return task + + +def create_tasks( + graph_config, + to_run, + full_task_graph, + label_to_taskid, + params, + decision_task_id=None, + suffix="", + modifier=lambda t: t, +): + """Create new tasks. The task definition will have {relative-datestamp': + '..'} rendered just like in a decision task. Action callbacks should use + this function to create new tasks, + allowing easy debugging with `mach taskgraph action-callback --test`. + This builds up all required tasks to run in order to run the tasks requested. + + Optionally this function takes a `modifier` function that is passed in each + task before it is put into a new graph. It should return a valid task. Note + that this is passed _all_ tasks in the graph, not just the set in to_run. You + may want to skip modifying tasks not in your to_run list. + + If `suffix` is given, then it is used to give unique names to the resulting + artifacts. If you call this function multiple times in the same action, + pass a different suffix each time to avoid overwriting artifacts. + + If you wish to create the tasks in a new group, leave out decision_task_id. + + Returns an updated label_to_taskid containing the new tasks""" + if suffix != "": + suffix = f"-{suffix}" + to_run = set(to_run) + + # Copy to avoid side-effects later + full_task_graph = copy.deepcopy(full_task_graph) + label_to_taskid = label_to_taskid.copy() + + target_graph = full_task_graph.graph.transitive_closure(to_run) + target_task_graph = TaskGraph( + {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph + ) + target_task_graph.for_each_task(update_parent) + if decision_task_id and decision_task_id != os.environ.get("TASK_ID"): + target_task_graph.for_each_task(update_dependencies) + optimized_task_graph, label_to_taskid = optimize_task_graph( + target_task_graph, + to_run, + params, + to_run, + decision_task_id, + existing_tasks=label_to_taskid, + ) + write_artifact(f"task-graph{suffix}.json", optimized_task_graph.to_json()) + write_artifact(f"label-to-taskid{suffix}.json", label_to_taskid) + write_artifact(f"to-run{suffix}.json", list(to_run)) + create.create_tasks( + graph_config, + optimized_task_graph, + label_to_taskid, + params, + decision_task_id, + ) + return label_to_taskid + + +def _update_reducer(accumulator, new_value): + "similar to set or dict `update` method, but returning the modified object" + accumulator.update(new_value) + return accumulator + + +def combine_task_graph_files(suffixes): + """Combine task-graph-{suffix}.json files into a single task-graph.json file. + + Since Chain of Trust verification requires a task-graph.json file that + contains all children tasks, we can combine the various task-graph-0.json + type files into a master task-graph.json file at the end. + + Actions also look for various artifacts, so we combine those in a similar + fashion. + + In the case where there is only one suffix, we simply rename it to avoid the + additional cost of uploading two copies of the same data. + """ + + if len(suffixes) == 1: + for filename in ["task-graph", "label-to-taskid", "to-run"]: + rename_artifact(f"{filename}-{suffixes[0]}.json", f"{filename}.json") + return + + def combine(file_contents, base): + return reduce(_update_reducer, file_contents, base) + + files = [read_artifact(f"task-graph-{suffix}.json") for suffix in suffixes] + write_artifact("task-graph.json", combine(files, dict())) + + files = [read_artifact(f"label-to-taskid-{suffix}.json") for suffix in suffixes] + write_artifact("label-to-taskid.json", combine(files, dict())) + + files = [read_artifact(f"to-run-{suffix}.json") for suffix in suffixes] + write_artifact("to-run.json", list(combine(files, set()))) + + +def relativize_datestamps(task_def): + """ + Given a task definition as received from the queue, convert all datestamps + to {relative_datestamp: ..} format, with the task creation time as "now". + The result is useful for handing to ``create_task``. + """ + base = parse_time(task_def["created"]) + # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js + ts_pattern = re.compile( + r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]" + r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?" + r"(?:z|[+-]\d\d:\d\d)$", + re.I, + ) + + def recurse(value): + if isinstance(value, str): + if ts_pattern.match(value): + value = parse_time(value) + diff = value - base + return {"relative-datestamp": f"{int(diff.total_seconds())} seconds"} + if isinstance(value, list): + return [recurse(e) for e in value] + if isinstance(value, dict): + return {k: recurse(v) for k, v in value.items()} + return value + + return recurse(task_def) + + +def add_args_to_command(cmd_parts, extra_args=[]): + """ + Add custom command line args to a given command. + + Args: + cmd_parts: the raw command as seen by taskcluster + extra_args: array of args we want to add + """ + cmd_type = "default" + if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict): + # windows has single cmd part as dict: 'task-reference', with long string + cmd_parts = cmd_parts[0]["task-reference"].split(" ") + cmd_type = "dict" + elif len(cmd_parts) == 1 and ( + isinstance(cmd_parts[0], str) or isinstance(cmd_parts[0], str) + ): + # windows has single cmd part as a long string + cmd_parts = cmd_parts[0].split(" ") + cmd_type = "unicode" + elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list): + # osx has an single value array with an array inside + cmd_parts = cmd_parts[0] + cmd_type = "subarray" + + cmd_parts.extend(extra_args) + + if cmd_type == "dict": + cmd_parts = [{"task-reference": " ".join(cmd_parts)}] + elif cmd_type == "unicode": + cmd_parts = [" ".join(cmd_parts)] + elif cmd_type == "subarray": + cmd_parts = [cmd_parts] + return cmd_parts |