summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py')
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py373
1 files changed, 373 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
new file mode 100644
index 0000000000..a830a473b3
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
@@ -0,0 +1,373 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+import datetime
+import functools
+import logging
+import os
+
+import requests
+import taskcluster_urls as liburls
+from requests.packages.urllib3.util.retry import Retry
+
+from taskgraph.task import Task
+from taskgraph.util import yaml
+from taskgraph.util.memoize import memoize
+
+logger = logging.getLogger(__name__)
+
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
+
+# Default rootUrl to use if none is given in the environment; this should point
+# to the production Taskcluster deployment used for CI.
+PRODUCTION_TASKCLUSTER_ROOT_URL = None
+
+# the maximum number of parallel Taskcluster API calls to make
+CONCURRENCY = 50
+
+
+@memoize
+def get_root_url(use_proxy):
+ """Get the current TASKCLUSTER_ROOT_URL.
+
+ When running in a task, this must come from $TASKCLUSTER_ROOT_URL; when run
+ on the command line, a default may be provided that points to the
+ production deployment of Taskcluster. If use_proxy is set, this attempts to
+ get TASKCLUSTER_PROXY_URL instead, failing if it is not set.
+ """
+ if use_proxy:
+ try:
+ return liburls.normalize_root_url(os.environ["TASKCLUSTER_PROXY_URL"])
+ except KeyError:
+ if "TASK_ID" not in os.environ:
+ raise RuntimeError(
+ "taskcluster-proxy is not available when not executing in a task"
+ )
+ else:
+ raise RuntimeError("taskcluster-proxy is not enabled for this task")
+
+ if "TASKCLUSTER_ROOT_URL" in os.environ:
+ logger.debug(
+ "Running in Taskcluster instance {}{}".format(
+ os.environ["TASKCLUSTER_ROOT_URL"],
+ " with taskcluster-proxy"
+ if "TASKCLUSTER_PROXY_URL" in os.environ
+ else "",
+ )
+ )
+ return liburls.normalize_root_url(os.environ["TASKCLUSTER_ROOT_URL"])
+
+ if "TASK_ID" in os.environ:
+ raise RuntimeError("$TASKCLUSTER_ROOT_URL must be set when running in a task")
+
+ if PRODUCTION_TASKCLUSTER_ROOT_URL is None:
+ raise RuntimeError(
+ "Could not detect Taskcluster instance, set $TASKCLUSTER_ROOT_URL"
+ )
+
+ logger.debug("Using default TASKCLUSTER_ROOT_URL")
+ return liburls.normalize_root_url(PRODUCTION_TASKCLUSTER_ROOT_URL)
+
+
+def requests_retry_session(
+ retries,
+ backoff_factor=0.1,
+ status_forcelist=(500, 502, 503, 504),
+ concurrency=CONCURRENCY,
+ session=None,
+):
+ session = session or requests.Session()
+ retry = Retry(
+ total=retries,
+ read=retries,
+ connect=retries,
+ backoff_factor=backoff_factor,
+ status_forcelist=status_forcelist,
+ )
+
+ # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
+ # that limit. Connections are established as needed, so using a large value
+ # should not negatively impact performance.
+ http_adapter = requests.adapters.HTTPAdapter(
+ pool_connections=concurrency,
+ pool_maxsize=concurrency,
+ max_retries=retry,
+ )
+ session.mount("http://", http_adapter)
+ session.mount("https://", http_adapter)
+
+ return session
+
+
+@memoize
+def get_session():
+ return requests_retry_session(retries=5)
+
+
+def _do_request(url, method=None, **kwargs):
+ if method is None:
+ method = "post" if kwargs else "get"
+
+ session = get_session()
+ if method == "get":
+ kwargs["stream"] = True
+
+ response = getattr(session, method)(url, **kwargs)
+
+ if response.status_code >= 400:
+ # Consume content before raise_for_status, so that the connection can be
+ # reused.
+ response.content
+ response.raise_for_status()
+ return response
+
+
+def _handle_artifact(path, response):
+ if path.endswith(".json"):
+ return response.json()
+ if path.endswith(".yml"):
+ return yaml.load_stream(response.text)
+ response.raw.read = functools.partial(response.raw.read, decode_content=True)
+ return response.raw
+
+
+def get_artifact_url(task_id, path, use_proxy=False):
+ artifact_tmpl = liburls.api(
+ get_root_url(False), "queue", "v1", "task/{}/artifacts/{}"
+ )
+ data = artifact_tmpl.format(task_id, path)
+ if use_proxy:
+ # Until Bug 1405889 is deployed, we can't download directly
+ # from the taskcluster-proxy. Work around by using the /bewit
+ # endpoint instead.
+ # The bewit URL is the body of a 303 redirect, which we don't
+ # want to follow (which fetches a potentially large resource).
+ response = _do_request(
+ os.environ["TASKCLUSTER_PROXY_URL"] + "/bewit",
+ data=data,
+ allow_redirects=False,
+ )
+ return response.text
+ return data
+
+
+def get_artifact(task_id, path, use_proxy=False):
+ """
+ Returns the artifact with the given path for the given task id.
+
+ If the path ends with ".json" or ".yml", the content is deserialized as,
+ respectively, json or yaml, and the corresponding python data (usually
+ dict) is returned.
+ For other types of content, a file-like object is returned.
+ """
+ response = _do_request(get_artifact_url(task_id, path, use_proxy))
+ return _handle_artifact(path, response)
+
+
+def list_artifacts(task_id, use_proxy=False):
+ response = _do_request(get_artifact_url(task_id, "", use_proxy).rstrip("/"))
+ return response.json()["artifacts"]
+
+
+def get_artifact_prefix(task):
+ prefix = None
+ if isinstance(task, dict):
+ prefix = task.get("attributes", {}).get("artifact_prefix")
+ elif isinstance(task, Task):
+ prefix = task.attributes.get("artifact_prefix")
+ else:
+ raise Exception(f"Can't find artifact-prefix of non-task: {task}")
+ return prefix or "public/build"
+
+
+def get_artifact_path(task, path):
+ return f"{get_artifact_prefix(task)}/{path}"
+
+
+def get_index_url(index_path, use_proxy=False, multiple=False):
+ index_tmpl = liburls.api(get_root_url(use_proxy), "index", "v1", "task{}/{}")
+ return index_tmpl.format("s" if multiple else "", index_path)
+
+
+def find_task_id(index_path, use_proxy=False):
+ try:
+ response = _do_request(get_index_url(index_path, use_proxy))
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 404:
+ raise KeyError(f"index path {index_path} not found")
+ raise
+ return response.json()["taskId"]
+
+
+def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
+ full_path = index_path + "/artifacts/" + artifact_path
+ response = _do_request(get_index_url(full_path, use_proxy))
+ return _handle_artifact(full_path, response)
+
+
+def list_tasks(index_path, use_proxy=False):
+ """
+ Returns a list of task_ids where each task_id is indexed under a path
+ in the index. Results are sorted by expiration date from oldest to newest.
+ """
+ results = []
+ data = {}
+ while True:
+ response = _do_request(
+ get_index_url(index_path, use_proxy, multiple=True), json=data
+ )
+ response = response.json()
+ results += response["tasks"]
+ if response.get("continuationToken"):
+ data = {"continuationToken": response.get("continuationToken")}
+ else:
+ break
+
+ # We can sort on expires because in the general case
+ # all of these tasks should be created with the same expires time so they end up in
+ # order from earliest to latest action. If more correctness is needed, consider
+ # fetching each task and sorting on the created date.
+ results.sort(key=lambda t: parse_time(t["expires"]))
+ return [t["taskId"] for t in results]
+
+
+def parse_time(timestamp):
+ """Turn a "JSON timestamp" as used in TC APIs into a datetime"""
+ return datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
+
+
+def get_task_url(task_id, use_proxy=False):
+ task_tmpl = liburls.api(get_root_url(use_proxy), "queue", "v1", "task/{}")
+ return task_tmpl.format(task_id)
+
+
+def get_task_definition(task_id, use_proxy=False):
+ response = _do_request(get_task_url(task_id, use_proxy))
+ return response.json()
+
+
+def cancel_task(task_id, use_proxy=False):
+ """Cancels a task given a task_id. In testing mode, just logs that it would
+ have cancelled."""
+ if testing:
+ logger.info(f"Would have cancelled {task_id}.")
+ else:
+ _do_request(get_task_url(task_id, use_proxy) + "/cancel", json={})
+
+
+def status_task(task_id, use_proxy=False):
+ """Gets the status of a task given a task_id.
+
+ In testing mode, just logs that it would have retrieved status.
+
+ Args:
+ task_id (str): A task id.
+ use_proxy (bool): Whether to use taskcluster-proxy (default: False)
+
+ Returns:
+ dict: A dictionary object as defined here:
+ https://docs.taskcluster.net/docs/reference/platform/queue/api#status
+ """
+ if testing:
+ logger.info(f"Would have gotten status for {task_id}.")
+ else:
+ resp = _do_request(get_task_url(task_id, use_proxy) + "/status")
+ status = resp.json().get("status", {})
+ return status
+
+
+def state_task(task_id, use_proxy=False):
+ """Gets the state of a task given a task_id.
+
+ In testing mode, just logs that it would have retrieved state. This is a subset of the
+ data returned by :func:`status_task`.
+
+ Args:
+ task_id (str): A task id.
+ use_proxy (bool): Whether to use taskcluster-proxy (default: False)
+
+ Returns:
+ str: The state of the task, one of
+ ``pending, running, completed, failed, exception, unknown``.
+ """
+ if testing:
+ logger.info(f"Would have gotten state for {task_id}.")
+ else:
+ status = status_task(task_id, use_proxy=use_proxy).get("state") or "unknown"
+ return status
+
+
+def rerun_task(task_id):
+ """Reruns a task given a task_id. In testing mode, just logs that it would
+ have reran."""
+ if testing:
+ logger.info(f"Would have rerun {task_id}.")
+ else:
+ _do_request(get_task_url(task_id, use_proxy=True) + "/rerun", json={})
+
+
+def get_current_scopes():
+ """Get the current scopes. This only makes sense in a task with the Taskcluster
+ proxy enabled, where it returns the actual scopes accorded to the task."""
+ auth_url = liburls.api(get_root_url(True), "auth", "v1", "scopes/current")
+ resp = _do_request(auth_url)
+ return resp.json().get("scopes", [])
+
+
+def get_purge_cache_url(provisioner_id, worker_type, use_proxy=False):
+ url_tmpl = liburls.api(
+ get_root_url(use_proxy), "purge-cache", "v1", "purge-cache/{}/{}"
+ )
+ return url_tmpl.format(provisioner_id, worker_type)
+
+
+def purge_cache(provisioner_id, worker_type, cache_name, use_proxy=False):
+ """Requests a cache purge from the purge-caches service."""
+ if testing:
+ logger.info(
+ "Would have purged {}/{}/{}.".format(
+ provisioner_id, worker_type, cache_name
+ )
+ )
+ else:
+ logger.info(f"Purging {provisioner_id}/{worker_type}/{cache_name}.")
+ purge_cache_url = get_purge_cache_url(provisioner_id, worker_type, use_proxy)
+ _do_request(purge_cache_url, json={"cacheName": cache_name})
+
+
+def send_email(address, subject, content, link, use_proxy=False):
+ """Sends an email using the notify service"""
+ logger.info(f"Sending email to {address}.")
+ url = liburls.api(get_root_url(use_proxy), "notify", "v1", "email")
+ _do_request(
+ url,
+ json={
+ "address": address,
+ "subject": subject,
+ "content": content,
+ "link": link,
+ },
+ )
+
+
+def list_task_group_incomplete_tasks(task_group_id):
+ """Generate the incomplete tasks in a task group"""
+ params = {}
+ while True:
+ url = liburls.api(
+ get_root_url(False),
+ "queue",
+ "v1",
+ f"task-group/{task_group_id}/list",
+ )
+ resp = _do_request(url, method="get", params=params).json()
+ for task in [t["status"] for t in resp["tasks"]]:
+ if task["state"] in ["running", "pending", "unscheduled"]:
+ yield task["taskId"]
+ if resp.get("continuationToken"):
+ params = {"continuationToken": resp.get("continuationToken")}
+ else:
+ break