diff options
Diffstat (limited to 'build/upload_generated_sources.py')
-rw-r--r-- | build/upload_generated_sources.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/build/upload_generated_sources.py b/build/upload_generated_sources.py new file mode 100644 index 0000000000..accce4beb9 --- /dev/null +++ b/build/upload_generated_sources.py @@ -0,0 +1,169 @@ +#!/usr/bin/env/python +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import argparse +import gzip +import io +import logging +import os +import sys +import tarfile +import time +from contextlib import contextmanager +from threading import Event, Thread + +import requests +from mozbuild.generated_sources import ( + get_filename_with_digest, + get_s3_region_and_bucket, +) +from requests.packages.urllib3.util.retry import Retry +from six.moves.queue import Queue + +# Arbitrary, should probably measure this. +NUM_WORKER_THREADS = 10 +log = logging.getLogger("upload-generated-sources") +log.setLevel(logging.INFO) + + +@contextmanager +def timed(): + """ + Yield a function that provides the elapsed time in seconds since this + function was called. + """ + start = time.time() + + def elapsed(): + return time.time() - start + + yield elapsed + + +def gzip_compress(data): + """ + Apply gzip compression to `data` and return the result as a `BytesIO`. + """ + b = io.BytesIO() + with gzip.GzipFile(fileobj=b, mode="w") as f: + f.write(data) + b.flush() + b.seek(0) + return b + + +def upload_worker(queue, event, bucket, session_args): + """ + Get `(name, contents)` entries from `queue` and upload `contents` + to S3 with gzip compression using `name` as the key, prefixed with + the SHA-512 digest of `contents` as a hex string. If an exception occurs, + set `event`. + """ + try: + import boto3 + + session = boto3.session.Session(**session_args) + s3 = session.client("s3") + while True: + if event.is_set(): + # Some other thread hit an exception. + return + (name, contents) = queue.get() + pathname = get_filename_with_digest(name, contents) + compressed = gzip_compress(contents) + extra_args = { + "ContentEncoding": "gzip", + "ContentType": "text/plain", + } + log.info( + 'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue())) + ) + with timed() as elapsed: + s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args) + log.info( + 'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed()) + ) + queue.task_done() + except Exception: + log.exception("Thread encountered exception:") + event.set() + + +def do_work(artifact, region, bucket): + session_args = {"region_name": region} + session = requests.Session() + retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) + http_adapter = requests.adapters.HTTPAdapter(max_retries=retry) + session.mount("https://", http_adapter) + session.mount("http://", http_adapter) + + if "TASK_ID" in os.environ: + level = os.environ.get("MOZ_SCM_LEVEL", "1") + secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa + level + ) + log.info( + 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url) + ) + res = session.get(secrets_url) + res.raise_for_status() + secret = res.json() + session_args.update( + aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"], + ) + else: + log.info("Trying to use your AWS credentials..") + + # First, fetch the artifact containing the sources. + log.info('Fetching generated sources artifact: "{}"'.format(artifact)) + with timed() as elapsed: + res = session.get(artifact) + log.info( + "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format( + res.status_code, len(res.content), elapsed() + ) + ) + res.raise_for_status() + # Create a queue and worker threads for uploading. + q = Queue() + event = Event() + log.info("Creating {} worker threads".format(NUM_WORKER_THREADS)) + for i in range(NUM_WORKER_THREADS): + t = Thread(target=upload_worker, args=(q, event, bucket, session_args)) + t.daemon = True + t.start() + with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar: + # Next, process each file. + for entry in tar: + if event.is_set(): + break + log.info('Queueing "{}"'.format(entry.name)) + q.put((entry.name, tar.extractfile(entry).read())) + # Wait until all uploads are finished. + # We don't use q.join() here because we want to also monitor event. + while q.unfinished_tasks: + if event.wait(0.1): + log.error("Worker thread encountered exception, exiting...") + break + + +def main(argv): + logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s") + parser = argparse.ArgumentParser( + description="Upload generated source files in ARTIFACT to BUCKET in S3." + ) + parser.add_argument("artifact", help="generated-sources artifact from build task") + args = parser.parse_args(argv) + region, bucket = get_s3_region_and_bucket() + + with timed() as elapsed: + do_work(region=region, bucket=bucket, artifact=args.artifact) + log.info("Finished in {:.03f}s".format(elapsed())) + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) |