summaryrefslogtreecommitdiffstats
path: root/taskcluster/gecko_taskgraph/util/taskcluster.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--taskcluster/gecko_taskgraph/util/taskcluster.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/taskcluster/gecko_taskgraph/util/taskcluster.py b/taskcluster/gecko_taskgraph/util/taskcluster.py
new file mode 100644
index 0000000000..cddb01fd37
--- /dev/null
+++ b/taskcluster/gecko_taskgraph/util/taskcluster.py
@@ -0,0 +1,128 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import logging
+import os
+
+import taskcluster_urls as liburls
+from taskcluster import Hooks
+from taskgraph.util import taskcluster as tc_util
+from taskgraph.util.taskcluster import (
+ _do_request,
+ get_index_url,
+ get_root_url,
+ get_task_definition,
+ get_task_url,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def insert_index(index_path, task_id, data=None, use_proxy=False):
+ index_url = get_index_url(index_path, use_proxy=use_proxy)
+
+ # Find task expiry.
+ expires = get_task_definition(task_id, use_proxy=use_proxy)["expires"]
+
+ response = _do_request(
+ index_url,
+ method="put",
+ json={
+ "taskId": task_id,
+ "rank": 0,
+ "data": data or {},
+ "expires": expires,
+ },
+ )
+ return response
+
+
+def status_task(task_id, use_proxy=False):
+ """Gets the status of a task given a task_id.
+
+ In testing mode, just logs that it would have retrieved status.
+
+ Args:
+ task_id (str): A task id.
+ use_proxy (bool): Whether to use taskcluster-proxy (default: False)
+
+ Returns:
+ dict: A dictionary object as defined here:
+ https://docs.taskcluster.net/docs/reference/platform/queue/api#status
+ """
+ if tc_util.testing:
+ logger.info(f"Would have gotten status for {task_id}.")
+ else:
+ resp = _do_request(get_task_url(task_id, use_proxy) + "/status")
+ status = resp.json().get("status", {})
+ return status
+
+
+def state_task(task_id, use_proxy=False):
+ """Gets the state of a task given a task_id.
+
+ In testing mode, just logs that it would have retrieved state. This is a subset of the
+ data returned by :func:`status_task`.
+
+ Args:
+ task_id (str): A task id.
+ use_proxy (bool): Whether to use taskcluster-proxy (default: False)
+
+ Returns:
+ str: The state of the task, one of
+ ``pending, running, completed, failed, exception, unknown``.
+ """
+ if tc_util.testing:
+ logger.info(f"Would have gotten state for {task_id}.")
+ else:
+ status = status_task(task_id, use_proxy=use_proxy).get("state") or "unknown"
+ return status
+
+
+def trigger_hook(hook_group_id, hook_id, hook_payload):
+ hooks = Hooks({"rootUrl": get_root_url(True)})
+ response = hooks.triggerHook(hook_group_id, hook_id, hook_payload)
+
+ logger.info(
+ "Task seen here: {}/tasks/{}".format(
+ get_root_url(os.environ.get("TASKCLUSTER_PROXY_URL")),
+ response["status"]["taskId"],
+ )
+ )
+
+
+def list_task_group_tasks(task_group_id):
+ """Generate the tasks in a task group"""
+ params = {}
+ while True:
+ url = liburls.api(
+ get_root_url(False),
+ "queue",
+ "v1",
+ f"task-group/{task_group_id}/list",
+ )
+ resp = _do_request(url, method="get", params=params).json()
+ yield from resp["tasks"]
+ if resp.get("continuationToken"):
+ params = {"continuationToken": resp.get("continuationToken")}
+ else:
+ break
+
+
+def list_task_group_incomplete_task_ids(task_group_id):
+ states = ("running", "pending", "unscheduled")
+ for task in [t["status"] for t in list_task_group_tasks(task_group_id)]:
+ if task["state"] in states:
+ yield task["taskId"]
+
+
+def list_task_group_complete_tasks(task_group_id):
+ tasks = {}
+ for task in list_task_group_tasks(task_group_id):
+ if task.get("status", {}).get("state", "") == "completed":
+ tasks[task.get("task", {}).get("metadata", {}).get("name", "")] = task.get(
+ "status", {}
+ ).get("taskId", "")
+ return tasks