summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py')
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py71
1 files changed, 48 insertions, 23 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
index a830a473b3..b467e98a97 100644
--- a/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py
@@ -3,10 +3,12 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+import copy
import datetime
import functools
import logging
import os
+from typing import Dict, List, Union
import requests
import taskcluster_urls as liburls
@@ -53,9 +55,11 @@ def get_root_url(use_proxy):
logger.debug(
"Running in Taskcluster instance {}{}".format(
os.environ["TASKCLUSTER_ROOT_URL"],
- " with taskcluster-proxy"
- if "TASKCLUSTER_PROXY_URL" in os.environ
- else "",
+ (
+ " with taskcluster-proxy"
+ if "TASKCLUSTER_PROXY_URL" in os.environ
+ else ""
+ ),
)
)
return liburls.normalize_root_url(os.environ["TASKCLUSTER_ROOT_URL"])
@@ -136,22 +140,9 @@ def _handle_artifact(path, response):
def get_artifact_url(task_id, path, use_proxy=False):
artifact_tmpl = liburls.api(
- get_root_url(False), "queue", "v1", "task/{}/artifacts/{}"
+ get_root_url(use_proxy), "queue", "v1", "task/{}/artifacts/{}"
)
- data = artifact_tmpl.format(task_id, path)
- if use_proxy:
- # Until Bug 1405889 is deployed, we can't download directly
- # from the taskcluster-proxy. Work around by using the /bewit
- # endpoint instead.
- # The bewit URL is the body of a 303 redirect, which we don't
- # want to follow (which fetches a potentially large resource).
- response = _do_request(
- os.environ["TASKCLUSTER_PROXY_URL"] + "/bewit",
- data=data,
- allow_redirects=False,
- )
- return response.text
- return data
+ return artifact_tmpl.format(task_id, path)
def get_artifact(task_id, path, use_proxy=False):
@@ -244,6 +235,7 @@ def get_task_url(task_id, use_proxy=False):
return task_tmpl.format(task_id)
+@memoize
def get_task_definition(task_id, use_proxy=False):
response = _do_request(get_task_url(task_id, use_proxy))
return response.json()
@@ -327,11 +319,7 @@ def get_purge_cache_url(provisioner_id, worker_type, use_proxy=False):
def purge_cache(provisioner_id, worker_type, cache_name, use_proxy=False):
"""Requests a cache purge from the purge-caches service."""
if testing:
- logger.info(
- "Would have purged {}/{}/{}.".format(
- provisioner_id, worker_type, cache_name
- )
- )
+ logger.info(f"Would have purged {provisioner_id}/{worker_type}/{cache_name}.")
else:
logger.info(f"Purging {provisioner_id}/{worker_type}/{cache_name}.")
purge_cache_url = get_purge_cache_url(provisioner_id, worker_type, use_proxy)
@@ -371,3 +359,40 @@ def list_task_group_incomplete_tasks(task_group_id):
params = {"continuationToken": resp.get("continuationToken")}
else:
break
+
+
+@memoize
+def _get_deps(task_ids, use_proxy):
+ upstream_tasks = {}
+ for task_id in task_ids:
+ task_def = get_task_definition(task_id, use_proxy)
+ upstream_tasks[task_def["metadata"]["name"]] = task_id
+
+ upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy))
+
+ return upstream_tasks
+
+
+def get_ancestors(
+ task_ids: Union[List[str], str], use_proxy: bool = False
+) -> Dict[str, str]:
+ """Gets the ancestor tasks of the given task_ids as a dictionary of label -> taskid.
+
+ Args:
+ task_ids (str or [str]): A single task id or a list of task ids to find the ancestors of.
+ use_proxy (bool): See get_root_url.
+
+ Returns:
+ dict: A dict whose keys are task labels and values are task ids.
+ """
+ upstream_tasks: Dict[str, str] = {}
+
+ if isinstance(task_ids, str):
+ task_ids = [task_ids]
+
+ for task_id in task_ids:
+ task_def = get_task_definition(task_id, use_proxy)
+
+ upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy))
+
+ return copy.deepcopy(upstream_tasks)