summaryrefslogtreecommitdiffstats
path: root/build/upload_generated_sources.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--build/upload_generated_sources.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/build/upload_generated_sources.py b/build/upload_generated_sources.py
new file mode 100644
index 0000000000..accce4beb9
--- /dev/null
+++ b/build/upload_generated_sources.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env/python
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import argparse
+import gzip
+import io
+import logging
+import os
+import sys
+import tarfile
+import time
+from contextlib import contextmanager
+from threading import Event, Thread
+
+import requests
+from mozbuild.generated_sources import (
+ get_filename_with_digest,
+ get_s3_region_and_bucket,
+)
+from requests.packages.urllib3.util.retry import Retry
+from six.moves.queue import Queue
+
+# Arbitrary, should probably measure this.
+NUM_WORKER_THREADS = 10
+log = logging.getLogger("upload-generated-sources")
+log.setLevel(logging.INFO)
+
+
+@contextmanager
+def timed():
+ """
+ Yield a function that provides the elapsed time in seconds since this
+ function was called.
+ """
+ start = time.time()
+
+ def elapsed():
+ return time.time() - start
+
+ yield elapsed
+
+
+def gzip_compress(data):
+ """
+ Apply gzip compression to `data` and return the result as a `BytesIO`.
+ """
+ b = io.BytesIO()
+ with gzip.GzipFile(fileobj=b, mode="w") as f:
+ f.write(data)
+ b.flush()
+ b.seek(0)
+ return b
+
+
+def upload_worker(queue, event, bucket, session_args):
+ """
+ Get `(name, contents)` entries from `queue` and upload `contents`
+ to S3 with gzip compression using `name` as the key, prefixed with
+ the SHA-512 digest of `contents` as a hex string. If an exception occurs,
+ set `event`.
+ """
+ try:
+ import boto3
+
+ session = boto3.session.Session(**session_args)
+ s3 = session.client("s3")
+ while True:
+ if event.is_set():
+ # Some other thread hit an exception.
+ return
+ (name, contents) = queue.get()
+ pathname = get_filename_with_digest(name, contents)
+ compressed = gzip_compress(contents)
+ extra_args = {
+ "ContentEncoding": "gzip",
+ "ContentType": "text/plain",
+ }
+ log.info(
+ 'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue()))
+ )
+ with timed() as elapsed:
+ s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
+ log.info(
+ 'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed())
+ )
+ queue.task_done()
+ except Exception:
+ log.exception("Thread encountered exception:")
+ event.set()
+
+
+def do_work(artifact, region, bucket):
+ session_args = {"region_name": region}
+ session = requests.Session()
+ retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+ http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
+ session.mount("https://", http_adapter)
+ session.mount("http://", http_adapter)
+
+ if "TASK_ID" in os.environ:
+ level = os.environ.get("MOZ_SCM_LEVEL", "1")
+ secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
+ level
+ )
+ log.info(
+ 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url)
+ )
+ res = session.get(secrets_url)
+ res.raise_for_status()
+ secret = res.json()
+ session_args.update(
+ aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"],
+ )
+ else:
+ log.info("Trying to use your AWS credentials..")
+
+ # First, fetch the artifact containing the sources.
+ log.info('Fetching generated sources artifact: "{}"'.format(artifact))
+ with timed() as elapsed:
+ res = session.get(artifact)
+ log.info(
+ "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
+ res.status_code, len(res.content), elapsed()
+ )
+ )
+ res.raise_for_status()
+ # Create a queue and worker threads for uploading.
+ q = Queue()
+ event = Event()
+ log.info("Creating {} worker threads".format(NUM_WORKER_THREADS))
+ for i in range(NUM_WORKER_THREADS):
+ t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
+ t.daemon = True
+ t.start()
+ with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar:
+ # Next, process each file.
+ for entry in tar:
+ if event.is_set():
+ break
+ log.info('Queueing "{}"'.format(entry.name))
+ q.put((entry.name, tar.extractfile(entry).read()))
+ # Wait until all uploads are finished.
+ # We don't use q.join() here because we want to also monitor event.
+ while q.unfinished_tasks:
+ if event.wait(0.1):
+ log.error("Worker thread encountered exception, exiting...")
+ break
+
+
+def main(argv):
+ logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
+ parser = argparse.ArgumentParser(
+ description="Upload generated source files in ARTIFACT to BUCKET in S3."
+ )
+ parser.add_argument("artifact", help="generated-sources artifact from build task")
+ args = parser.parse_args(argv)
+ region, bucket = get_s3_region_and_bucket()
+
+ with timed() as elapsed:
+ do_work(region=region, bucket=bucket, artifact=args.artifact)
+ log.info("Finished in {:.03f}s".format(elapsed()))
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))