diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py index a830a473b3..b467e98a97 100644 --- a/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py +++ b/third_party/python/taskcluster_taskgraph/taskgraph/util/taskcluster.py @@ -3,10 +3,12 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import copy import datetime import functools import logging import os +from typing import Dict, List, Union import requests import taskcluster_urls as liburls @@ -53,9 +55,11 @@ def get_root_url(use_proxy): logger.debug( "Running in Taskcluster instance {}{}".format( os.environ["TASKCLUSTER_ROOT_URL"], - " with taskcluster-proxy" - if "TASKCLUSTER_PROXY_URL" in os.environ - else "", + ( + " with taskcluster-proxy" + if "TASKCLUSTER_PROXY_URL" in os.environ + else "" + ), ) ) return liburls.normalize_root_url(os.environ["TASKCLUSTER_ROOT_URL"]) @@ -136,22 +140,9 @@ def _handle_artifact(path, response): def get_artifact_url(task_id, path, use_proxy=False): artifact_tmpl = liburls.api( - get_root_url(False), "queue", "v1", "task/{}/artifacts/{}" + get_root_url(use_proxy), "queue", "v1", "task/{}/artifacts/{}" ) - data = artifact_tmpl.format(task_id, path) - if use_proxy: - # Until Bug 1405889 is deployed, we can't download directly - # from the taskcluster-proxy. Work around by using the /bewit - # endpoint instead. - # The bewit URL is the body of a 303 redirect, which we don't - # want to follow (which fetches a potentially large resource). - response = _do_request( - os.environ["TASKCLUSTER_PROXY_URL"] + "/bewit", - data=data, - allow_redirects=False, - ) - return response.text - return data + return artifact_tmpl.format(task_id, path) def get_artifact(task_id, path, use_proxy=False): @@ -244,6 +235,7 @@ def get_task_url(task_id, use_proxy=False): return task_tmpl.format(task_id) +@memoize def get_task_definition(task_id, use_proxy=False): response = _do_request(get_task_url(task_id, use_proxy)) return response.json() @@ -327,11 +319,7 @@ def get_purge_cache_url(provisioner_id, worker_type, use_proxy=False): def purge_cache(provisioner_id, worker_type, cache_name, use_proxy=False): """Requests a cache purge from the purge-caches service.""" if testing: - logger.info( - "Would have purged {}/{}/{}.".format( - provisioner_id, worker_type, cache_name - ) - ) + logger.info(f"Would have purged {provisioner_id}/{worker_type}/{cache_name}.") else: logger.info(f"Purging {provisioner_id}/{worker_type}/{cache_name}.") purge_cache_url = get_purge_cache_url(provisioner_id, worker_type, use_proxy) @@ -371,3 +359,40 @@ def list_task_group_incomplete_tasks(task_group_id): params = {"continuationToken": resp.get("continuationToken")} else: break + + +@memoize +def _get_deps(task_ids, use_proxy): + upstream_tasks = {} + for task_id in task_ids: + task_def = get_task_definition(task_id, use_proxy) + upstream_tasks[task_def["metadata"]["name"]] = task_id + + upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy)) + + return upstream_tasks + + +def get_ancestors( + task_ids: Union[List[str], str], use_proxy: bool = False +) -> Dict[str, str]: + """Gets the ancestor tasks of the given task_ids as a dictionary of label -> taskid. + + Args: + task_ids (str or [str]): A single task id or a list of task ids to find the ancestors of. + use_proxy (bool): See get_root_url. + + Returns: + dict: A dict whose keys are task labels and values are task ids. + """ + upstream_tasks: Dict[str, str] = {} + + if isinstance(task_ids, str): + task_ids = [task_ids] + + for task_id in task_ids: + task_def = get_task_definition(task_id, use_proxy) + + upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy)) + + return copy.deepcopy(upstream_tasks) |