summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster/taskcluster/helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster/taskcluster/helper.py')
-rw-r--r--third_party/python/taskcluster/taskcluster/helper.py185
1 files changed, 185 insertions, 0 deletions
diff --git a/third_party/python/taskcluster/taskcluster/helper.py b/third_party/python/taskcluster/taskcluster/helper.py
new file mode 100644
index 0000000000..7fec5d5acc
--- /dev/null
+++ b/third_party/python/taskcluster/taskcluster/helper.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import os
+
+import datetime
+import logging
+import requests
+from taskcluster.generated import _client_importer
+from taskcluster.generated.aio import _client_importer as _async_client_importer
+from taskcluster.utils import stringDate
+import urllib.parse
+
+logger = logging.getLogger(__name__)
+
+
+class TaskclusterConfig(object):
+ """
+ Local configuration used to access Taskcluster service and objects
+ """
+
+ def __init__(self, url=None):
+ self.options = None
+ self.secrets = None
+ self.default_url = url if url is not None else os.environ.get("TASKCLUSTER_ROOT_URL")
+ assert self.default_url is not None, "You must specify a Taskcluster deployment url"
+
+ def auth(self, client_id=None, access_token=None, max_retries=12):
+ """
+ Build Taskcluster credentials options
+ Supports, by order of preference:
+ * directly provided credentials
+ * credentials from environment variables
+ * taskclusterProxy
+ * no authentication
+ """
+ self.options = {"maxRetries": max_retries}
+
+ if client_id is None and access_token is None:
+ # Credentials preference: Use env. variables
+ client_id = os.environ.get("TASKCLUSTER_CLIENT_ID")
+ access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN")
+ logger.info("Using taskcluster credentials from environment")
+ else:
+ logger.info("Using taskcluster credentials from cli")
+
+ if client_id is not None and access_token is not None:
+ # Use provided credentials
+ self.options["credentials"] = {
+ "clientId": client_id,
+ "accessToken": access_token,
+ }
+ self.options["rootUrl"] = self.default_url
+
+ elif "TASK_ID" in os.environ:
+ # Use Taskcluster Proxy when running in a task
+ logger.info("Taskcluster Proxy enabled")
+ self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster")
+
+ else:
+ logger.info("No Taskcluster authentication.")
+ self.options["rootUrl"] = self.default_url
+
+ def get_service(self, service_name, use_async=False):
+ """
+ Build a Taskcluster service instance using current authentication
+ """
+ if self.options is None:
+ self.auth()
+
+ client_importer = _async_client_importer if use_async else _client_importer
+ service = getattr(client_importer, service_name.capitalize(), None)
+ assert service is not None, "Invalid Taskcluster service {}".format(
+ service_name
+ )
+ return service(self.options)
+
+ def load_secrets(
+ self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
+ ):
+ """Shortcut to use load_secrets helper with current authentication"""
+ self.secrets = load_secrets(
+ self.get_service('secrets'),
+ secret_name,
+ prefixes,
+ required,
+ existing,
+ local_secrets,
+ )
+ return self.secrets
+
+ def upload_artifact(self, artifact_path, content, content_type, ttl):
+ """Shortcut to use upload_artifact helper with current authentication"""
+ path = upload_artifact(
+ self.get_service('queue'),
+ artifact_path,
+ content,
+ content_type,
+ ttl,
+ )
+
+ return urllib.parse.urljoin(self.default_url, path)
+
+
+def load_secrets(
+ secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
+):
+ """
+ Fetch a specific set of secrets by name and verify that the required
+ secrets exist.
+ Also supports providing local secrets to avoid using remote Taskcluster service
+ for local development (or contributor onboarding)
+ A user can specify prefixes to limit the part of secrets used (useful when a secret
+ is shared amongst several services)
+ """
+ secrets = {}
+ if existing:
+ secrets.update(existing)
+
+ if isinstance(local_secrets, dict):
+ # Use local secrets file to avoid using Taskcluster secrets
+ logger.info("Using provided local secrets")
+ all_secrets = local_secrets
+ else:
+ # Use Taskcluster secret service
+ assert secret_name is not None, "Missing Taskcluster secret secret_name"
+ all_secrets = secrets_service.get(secret_name).get("secret", dict())
+ logger.info("Loaded Taskcluster secret {}".format(secret_name))
+
+ if prefixes:
+ # Use secrets behind supported prefixes
+ for prefix in prefixes:
+ secrets.update(all_secrets.get(prefix, dict()))
+
+ else:
+ # Use all secrets available
+ secrets.update(all_secrets)
+
+ # Check required secrets
+ for required_secret in required:
+ if required_secret not in secrets:
+ raise Exception("Missing value {} in secrets.".format(required_secret))
+
+ return secrets
+
+
+def upload_artifact(queue_service, artifact_path, content, content_type, ttl):
+ """
+ DEPRECATED. Do not use.
+ """
+ task_id = os.environ.get("TASK_ID")
+ run_id = os.environ.get("RUN_ID")
+ proxy = os.environ.get("TASKCLUSTER_PROXY_URL")
+ assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy"
+ assert isinstance(content, str)
+ assert isinstance(ttl, datetime.timedelta)
+
+ # Create S3 artifact on Taskcluster
+ resp = queue_service.createArtifact(
+ task_id,
+ run_id,
+ artifact_path,
+ {
+ "storageType": "s3",
+ "expires": stringDate(datetime.datetime.utcnow() + ttl),
+ "contentType": content_type,
+ },
+ )
+ assert resp["storageType"] == "s3", "Not an s3 storage"
+ assert "putUrl" in resp, "Missing putUrl"
+ assert "contentType" in resp, "Missing contentType"
+
+ # Push the artifact on storage service
+ headers = {"Content-Type": resp["contentType"]}
+ push = requests.put(url=resp["putUrl"], headers=headers, data=content)
+ push.raise_for_status()
+
+ # Build the absolute url
+ return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format(
+ task_id=task_id,
+ run_id=run_id,
+ path=artifact_path,
+ )