diff options
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/run-task')
4 files changed, 3065 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content new file mode 100755 index 0000000000..42dc5e2b28 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content @@ -0,0 +1,899 @@ +#!/usr/bin/python3 -u +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import argparse +import bz2 +import concurrent.futures +import contextlib +import datetime +import gzip +import hashlib +import json +import lzma +import multiprocessing +import os +import pathlib +import random +import re +import stat +import subprocess +import sys +import tarfile +import tempfile +import time +import urllib.parse +import urllib.request +import zipfile + +try: + import zstandard +except ImportError: + zstandard = None + +try: + import certifi +except ImportError: + certifi = None + + +CONCURRENCY = multiprocessing.cpu_count() + + +def log(msg): + print(msg, file=sys.stderr) + sys.stderr.flush() + + +class IntegrityError(Exception): + """Represents an integrity error when downloading a URL.""" + + +def ZstdCompressor(*args, **kwargs): + if not zstandard: + raise ValueError("zstandard Python package not available") + return zstandard.ZstdCompressor(*args, **kwargs) + + +def ZstdDecompressor(*args, **kwargs): + if not zstandard: + raise ValueError("zstandard Python package not available") + return zstandard.ZstdDecompressor(*args, **kwargs) + + +@contextlib.contextmanager +def rename_after_close(fname, *args, **kwargs): + """ + Context manager that opens a temporary file to use as a writer, + and closes the file on context exit, renaming it to the expected + file name in case of success, or removing it in case of failure. + + Takes the same options as open(), but must be used as a context + manager. + """ + path = pathlib.Path(fname) + tmp = path.with_name("%s.tmp" % path.name) + try: + with tmp.open(*args, **kwargs) as fh: + yield fh + except Exception: + tmp.unlink() + raise + else: + tmp.rename(fname) + + +# The following is copied from +# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85 +def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1): + """ + A generator function that sleeps between retries, handles exponential + backoff and jitter. The action you are retrying is meant to run after + retrier yields. + + At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter). + Afterwards sleeptime is multiplied by sleepscale for the next iteration. + + Args: + attempts (int): maximum number of times to try; defaults to 5 + sleeptime (float): how many seconds to sleep between tries; defaults to + 60s (one minute) + max_sleeptime (float): the longest we'll sleep, in seconds; defaults to + 300s (five minutes) + sleepscale (float): how much to multiply the sleep time by each + iteration; defaults to 1.5 + jitter (int): random jitter to introduce to sleep time each iteration. + the amount is chosen at random between [-jitter, +jitter] + defaults to 1 + + Yields: + None, a maximum of `attempts` number of times + + Example: + >>> n = 0 + >>> for _ in retrier(sleeptime=0, jitter=0): + ... if n == 3: + ... # We did the thing! + ... break + ... n += 1 + >>> n + 3 + + >>> n = 0 + >>> for _ in retrier(sleeptime=0, jitter=0): + ... if n == 6: + ... # We did the thing! + ... break + ... n += 1 + ... else: + ... print("max tries hit") + max tries hit + """ + jitter = jitter or 0 # py35 barfs on the next line if jitter is None + if jitter > sleeptime: + # To prevent negative sleep times + raise Exception( + "jitter ({}) must be less than sleep time ({})".format(jitter, sleeptime) + ) + + sleeptime_real = sleeptime + for _ in range(attempts): + log("attempt %i/%i" % (_ + 1, attempts)) + + yield sleeptime_real + + if jitter: + sleeptime_real = sleeptime + random.randint(-jitter, jitter) + # our jitter should scale along with the sleeptime + jitter = int(jitter * sleepscale) + else: + sleeptime_real = sleeptime + + sleeptime *= sleepscale + + if sleeptime_real > max_sleeptime: + sleeptime_real = max_sleeptime + + # Don't need to sleep the last time + if _ < attempts - 1: + log( + "sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts) + ) + time.sleep(sleeptime_real) + + +def stream_download(url, sha256=None, size=None, headers=None): + """Download a URL to a generator, optionally with content verification. + + If ``sha256`` or ``size`` are defined, the downloaded URL will be + validated against those requirements and ``IntegrityError`` will be + raised if expectations do not match. + + Because verification cannot occur until the file is completely downloaded + it is recommended for consumers to not do anything meaningful with the + data if content verification is being used. To securely handle retrieved + content, it should be streamed to a file or memory and only operated + on after the generator is exhausted without raising. + """ + log("Downloading %s" % url) + headers = headers or [] + + h = hashlib.sha256() + length = 0 + + t0 = time.time() + req_headers = {} + for header in headers: + key, val = header.split(":") + req_headers[key.strip()] = val.strip() + + req = urllib.request.Request(url, None, req_headers) + with urllib.request.urlopen( + req, cafile=certifi.where() + ) if certifi else urllib.request.urlopen(req) as fh: + if not url.endswith(".gz") and fh.info().get("Content-Encoding") == "gzip": + fh = gzip.GzipFile(fileobj=fh) + + while True: + chunk = fh.read(65536) + if not chunk: + break + + h.update(chunk) + length += len(chunk) + + yield chunk + + duration = time.time() - t0 + digest = h.hexdigest() + + log( + "%s resolved to %d bytes with sha256 %s in %.3fs" + % (url, length, digest, duration) + ) + + if size: + if size == length: + log("Verified size of %s" % url) + else: + raise IntegrityError( + "size mismatch on %s: wanted %d; got %d" % (url, size, length) + ) + + if sha256: + if digest == sha256: + log("Verified sha256 integrity of %s" % url) + else: + raise IntegrityError( + "sha256 mismatch on %s: wanted %s; got %s" % (url, sha256, digest) + ) + + +def download_to_path(url, path, sha256=None, size=None, headers=None): + """Download a URL to a filesystem path, possibly with verification.""" + + # We download to a temporary file and rename at the end so there's + # no chance of the final file being partially written or containing + # bad data. + try: + path.unlink() + except FileNotFoundError: + pass + + for _ in retrier(attempts=5, sleeptime=60): + try: + log("Downloading %s to %s" % (url, path)) + + with rename_after_close(path, "wb") as fh: + for chunk in stream_download( + url, sha256=sha256, size=size, headers=headers + ): + fh.write(chunk) + + return + except IntegrityError: + raise + except Exception as e: + log("Download failed: {}".format(e)) + continue + + raise Exception("Download failed, no more retries!") + + +def download_to_memory(url, sha256=None, size=None): + """Download a URL to memory, possibly with verification.""" + + data = b"" + for _ in retrier(attempts=5, sleeptime=60): + try: + log("Downloading %s" % (url)) + + for chunk in stream_download(url, sha256=sha256, size=size): + data += chunk + + return data + except IntegrityError: + raise + except Exception as e: + log("Download failed: {}".format(e)) + continue + + raise Exception("Download failed, no more retries!") + + +def gpg_verify_path(path: pathlib.Path, public_key_data: bytes, signature_data: bytes): + """Verify that a filesystem path verifies using GPG. + + Takes a Path defining a file to verify. ``public_key_data`` contains + bytes with GPG public key data. ``signature_data`` contains a signed + GPG document to use with ``gpg --verify``. + """ + log("Validating GPG signature of %s" % path) + log("GPG key data:\n%s" % public_key_data.decode("ascii")) + + with tempfile.TemporaryDirectory() as td: + try: + # --batch since we're running unattended. + gpg_args = ["gpg", "--homedir", td, "--batch"] + + log("Importing GPG key...") + subprocess.run(gpg_args + ["--import"], input=public_key_data, check=True) + + log("Verifying GPG signature...") + subprocess.run( + gpg_args + ["--verify", "-", "%s" % path], + input=signature_data, + check=True, + ) + + log("GPG signature verified!") + finally: + # There is a race between the agent self-terminating and + # shutil.rmtree() from the temporary directory cleanup that can + # lead to exceptions. Kill the agent before cleanup to prevent this. + env = dict(os.environ) + env["GNUPGHOME"] = td + subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=env) + + +def open_tar_stream(path: pathlib.Path): + """""" + if path.suffix == ".bz2": + return bz2.open(str(path), "rb") + elif path.suffix == ".gz": + return gzip.open(str(path), "rb") + elif path.suffix == ".xz": + return lzma.open(str(path), "rb") + elif path.suffix == ".zst": + dctx = ZstdDecompressor() + return dctx.stream_reader(path.open("rb")) + elif path.suffix == ".tar": + return path.open("rb") + else: + raise ValueError("unknown archive format for tar file: %s" % path) + + +def archive_type(path: pathlib.Path): + """Attempt to identify a path as an extractable archive.""" + if path.suffixes[-2:-1] == [".tar"]: + return "tar" + elif path.suffix == ".zip": + return "zip" + else: + return None + + +def extract_archive(path, dest_dir, typ): + """Extract an archive to a destination directory.""" + + # Resolve paths to absolute variants. + path = path.resolve() + dest_dir = dest_dir.resolve() + + log("Extracting %s to %s" % (path, dest_dir)) + t0 = time.time() + + # We pipe input to the decompressor program so that we can apply + # custom decompressors that the program may not know about. + if typ == "tar": + ifh = open_tar_stream(path) + # On Windows, the tar program doesn't support things like symbolic + # links, while Windows actually support them. The tarfile module in + # python does. So use that. But since it's significantly slower than + # the tar program on Linux, only use tarfile on Windows (tarfile is + # also not much slower on Windows, presumably because of the + # notoriously bad I/O). + if sys.platform == "win32": + tar = tarfile.open(fileobj=ifh, mode="r|") + tar.extractall(str(dest_dir)) + args = [] + else: + args = ["tar", "xf", "-"] + pipe_stdin = True + elif typ == "zip": + # unzip from stdin has wonky behavior. We don't use a pipe for it. + ifh = open(os.devnull, "rb") + args = ["unzip", "-o", str(path)] + pipe_stdin = False + else: + raise ValueError("unknown archive format: %s" % path) + + if args: + with ifh, subprocess.Popen( + args, cwd=str(dest_dir), bufsize=0, stdin=subprocess.PIPE + ) as p: + while True: + if not pipe_stdin: + break + + chunk = ifh.read(131072) + if not chunk: + break + + p.stdin.write(chunk) + + if p.returncode: + raise Exception("%r exited %d" % (args, p.returncode)) + + log("%s extracted in %.3fs" % (path, time.time() - t0)) + + +def repack_archive( + orig: pathlib.Path, dest: pathlib.Path, strip_components=0, prefix="" +): + assert orig != dest + log("Repacking as %s" % dest) + orig_typ = archive_type(orig) + typ = archive_type(dest) + if not orig_typ: + raise Exception("Archive type not supported for %s" % orig.name) + if not typ: + raise Exception("Archive type not supported for %s" % dest.name) + + if dest.suffixes[-2:] != [".tar", ".zst"]: + raise Exception("Only producing .tar.zst archives is supported.") + + if strip_components or prefix: + + def filter(name): + if strip_components: + stripped = "/".join(name.split("/")[strip_components:]) + if not stripped: + raise Exception( + "Stripping %d components would remove files" % strip_components + ) + name = stripped + return prefix + name + + else: + filter = None + + with rename_after_close(dest, "wb") as fh: + ctx = ZstdCompressor() + if orig_typ == "zip": + assert typ == "tar" + zip = zipfile.ZipFile(orig) + # Convert the zip stream to a tar on the fly. + with ctx.stream_writer(fh) as compressor, tarfile.open( + fileobj=compressor, mode="w:" + ) as tar: + for zipinfo in zip.infolist(): + if zipinfo.is_dir(): + continue + tarinfo = tarfile.TarInfo() + filename = zipinfo.filename + tarinfo.name = filter(filename) if filter else filename + tarinfo.size = zipinfo.file_size + # Zip files don't have any knowledge of the timezone + # they were created in. Which is not really convenient to + # reliably convert to a timestamp. But we don't really + # care about accuracy, but rather about reproducibility, + # so we pick UTC. + time = datetime.datetime( + *zipinfo.date_time, tzinfo=datetime.timezone.utc + ) + tarinfo.mtime = time.timestamp() + # 0 is MS-DOS, 3 is UNIX. Only in the latter case do we + # get anything useful for the tar file mode. + if zipinfo.create_system == 3: + mode = zipinfo.external_attr >> 16 + else: + mode = 0o0644 + tarinfo.mode = stat.S_IMODE(mode) + if stat.S_ISLNK(mode): + tarinfo.type = tarfile.SYMTYPE + tarinfo.linkname = zip.read(filename).decode() + tar.addfile(tarinfo, zip.open(filename)) + elif stat.S_ISREG(mode) or stat.S_IFMT(mode) == 0: + tar.addfile(tarinfo, zip.open(filename)) + else: + raise Exception("Unsupported file mode %o" % stat.S_IFMT(mode)) + + elif orig_typ == "tar": + if typ == "zip": + raise Exception("Repacking a tar to zip is not supported") + assert typ == "tar" + + ifh = open_tar_stream(orig) + if filter: + # To apply the filter, we need to open the tar stream and + # tweak it. + origtar = tarfile.open(fileobj=ifh, mode="r|") + with ctx.stream_writer(fh) as compressor, tarfile.open( + fileobj=compressor, + mode="w:", + format=origtar.format, + ) as tar: + for tarinfo in origtar: + if tarinfo.isdir(): + continue + tarinfo.name = filter(tarinfo.name) + if "path" in tarinfo.pax_headers: + tarinfo.pax_headers["path"] = filter( + tarinfo.pax_headers["path"] + ) + if tarinfo.isfile(): + tar.addfile(tarinfo, origtar.extractfile(tarinfo)) + else: + tar.addfile(tarinfo) + else: + # We only change compression here. The tar stream is unchanged. + ctx.copy_stream(ifh, fh) + + +def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None): + """Fetch a URL and extract it to a destination path. + + If the downloaded URL is an archive, it is extracted automatically + and the archive is deleted. Otherwise the file remains in place in + the destination directory. + """ + + basename = urllib.parse.urlparse(url).path.split("/")[-1] + dest_path = dest_dir / basename + + download_to_path(url, dest_path, sha256=sha256, size=size) + + if not extract: + return + + typ = archive_type(dest_path) + if typ: + extract_archive(dest_path, dest_dir, typ) + log("Removing %s" % dest_path) + dest_path.unlink() + + +def fetch_urls(downloads): + """Fetch URLs pairs to a pathlib.Path.""" + with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e: + fs = [] + + for download in downloads: + fs.append(e.submit(fetch_and_extract, *download)) + + for f in fs: + f.result() + + +def _git_checkout_github_archive( + dest_path: pathlib.Path, repo: str, commit: str, prefix: str +): + "Use github archive generator to speed up github git repo cloning" + repo = repo.rstrip("/") + github_url = "{repo}/archive/{commit}.tar.gz".format(**locals()) + + with tempfile.TemporaryDirectory() as td: + temp_dir = pathlib.Path(td) + dl_dest = temp_dir / "archive.tar.gz" + download_to_path(github_url, dl_dest) + repack_archive(dl_dest, dest_path, strip_components=1, prefix=prefix + "/") + + +def _github_submodule_required(repo: str, commit: str): + "Use github API to check if submodules are used" + url = "{repo}/blob/{commit}/.gitmodules".format(**locals()) + try: + status_code = urllib.request.urlopen(url).getcode() + return status_code == 200 + except: + return False + + +def git_checkout_archive( + dest_path: pathlib.Path, + repo: str, + commit: str, + prefix=None, + ssh_key=None, + include_dot_git=False, +): + """Produce an archive of the files comprising a Git checkout.""" + dest_path.parent.mkdir(parents=True, exist_ok=True) + + if not prefix: + prefix = repo.rstrip("/").rsplit("/", 1)[-1] + + if dest_path.suffixes[-2:] != [".tar", ".zst"]: + raise Exception("Only producing .tar.zst archives is supported.") + + if repo.startswith("https://github.com/"): + if not include_dot_git and not _github_submodule_required(repo, commit): + log("Using github archive service to speedup archive creation") + # Always log sha1 info, either from commit or resolved from repo. + if re.match(r"^[a-fA-F0-9]{40}$", commit): + revision = commit + else: + ref_output = subprocess.check_output(["git", "ls-remote", repo, + 'refs/heads/' + commit]) + revision, _ = ref_output.decode().split(maxsplit=1) + log("Fetching revision {}".format(revision)) + return _git_checkout_github_archive(dest_path, repo, commit, prefix) + + with tempfile.TemporaryDirectory() as td: + temp_dir = pathlib.Path(td) + + git_dir = temp_dir / prefix + + # This could be faster with a shallow clone. However, Git requires a ref + # to initiate a clone. Since the commit-ish may not refer to a ref, we + # simply perform a full clone followed by a checkout. + print("cloning %s to %s" % (repo, git_dir)) + + env = os.environ.copy() + keypath = "" + if ssh_key: + taskcluster_secret_url = api( + os.environ.get("TASKCLUSTER_PROXY_URL"), + "secrets", + "v1", + "secret/{keypath}".format(keypath=ssh_key), + ) + taskcluster_secret = b"".join(stream_download(taskcluster_secret_url)) + taskcluster_secret = json.loads(taskcluster_secret) + sshkey = taskcluster_secret["secret"]["ssh_privkey"] + + keypath = temp_dir.joinpath("ssh-key") + keypath.write_text(sshkey) + keypath.chmod(0o600) + + env = { + "GIT_SSH_COMMAND": "ssh -o 'StrictHostKeyChecking no' -i {keypath}".format( + keypath=keypath + ) + } + + subprocess.run(["git", "clone", "-n", repo, str(git_dir)], check=True, env=env) + + # Always use a detached head so that git prints out what it checked out. + subprocess.run( + ["git", "checkout", "--detach", commit], cwd=str(git_dir), check=True + ) + + # When including the .git, we want --depth 1, but a direct clone would not + # necessarily be able to give us the right commit. + if include_dot_git: + initial_clone = git_dir.with_name(git_dir.name + ".orig") + git_dir.rename(initial_clone) + subprocess.run( + [ + "git", + "clone", + "file://" + str(initial_clone), + str(git_dir), + "--depth", + "1", + ], + check=True, + ) + subprocess.run( + ["git", "remote", "set-url", "origin", repo], + cwd=str(git_dir), + check=True, + ) + + # --depth 1 can induce more work on the server side, so only use it for + # submodule initialization when we want to keep the .git directory. + depth = ["--depth", "1"] if include_dot_git else [] + subprocess.run( + ["git", "submodule", "update", "--init"] + depth, + cwd=str(git_dir), + check=True, + ) + + if keypath: + os.remove(keypath) + + print("creating archive %s of commit %s" % (dest_path, commit)) + exclude_dot_git = [] if include_dot_git else ["--exclude=.git"] + proc = subprocess.Popen( + [ + "tar", + "cf", + "-", + ] + + exclude_dot_git + + [ + "-C", + str(temp_dir), + prefix, + ], + stdout=subprocess.PIPE, + ) + + with rename_after_close(dest_path, "wb") as out: + ctx = ZstdCompressor() + ctx.copy_stream(proc.stdout, out) + + proc.wait() + + +def command_git_checkout_archive(args): + dest = pathlib.Path(args.dest) + + try: + git_checkout_archive( + dest, + args.repo, + args.commit, + prefix=args.path_prefix, + ssh_key=args.ssh_key_secret, + include_dot_git=args.include_dot_git, + ) + except Exception: + try: + dest.unlink() + except FileNotFoundError: + pass + + raise + + +def command_static_url(args): + gpg_sig_url = args.gpg_sig_url + gpg_env_key = args.gpg_key_env + + if bool(gpg_sig_url) != bool(gpg_env_key): + print("--gpg-sig-url and --gpg-key-env must both be defined") + return 1 + + if gpg_sig_url: + gpg_signature = b"".join(stream_download(gpg_sig_url)) + gpg_key = os.environb[gpg_env_key.encode("ascii")] + + dest = pathlib.Path(args.dest) + dest.parent.mkdir(parents=True, exist_ok=True) + + basename = urllib.parse.urlparse(args.url).path.split("/")[-1] + if basename.endswith("".join(dest.suffixes)): + dl_dest = dest + else: + dl_dest = dest.parent / basename + + try: + download_to_path( + args.url, dl_dest, sha256=args.sha256, size=args.size, headers=args.headers + ) + + if gpg_sig_url: + gpg_verify_path(dl_dest, gpg_key, gpg_signature) + + if dl_dest != dest or args.strip_components or args.add_prefix: + repack_archive(dl_dest, dest, args.strip_components, args.add_prefix) + except Exception: + try: + dl_dest.unlink() + except FileNotFoundError: + pass + + raise + + if dl_dest != dest: + log("Removing %s" % dl_dest) + dl_dest.unlink() + + +def api(root_url, service, version, path): + # taskcluster-lib-urls is not available when this script runs, so + # simulate its behavior: + return "{root_url}/api/{service}/{version}/{path}".format( + root_url=root_url, service=service, version=version, path=path + ) + + +def get_hash(fetch, root_url): + path = "task/{task}/artifacts/{artifact}".format( + task=fetch["task"], artifact="public/chain-of-trust.json" + ) + url = api(root_url, "queue", "v1", path) + cot = json.loads(download_to_memory(url)) + return cot["artifacts"][fetch["artifact"]]["sha256"] + + +def command_task_artifacts(args): + start = time.monotonic() + fetches = json.loads(os.environ["MOZ_FETCHES"]) + downloads = [] + for fetch in fetches: + extdir = pathlib.Path(args.dest) + if "dest" in fetch: + # Note: normpath doesn't like pathlib.Path in python 3.5 + extdir = pathlib.Path(os.path.normpath(str(extdir.joinpath(fetch["dest"])))) + extdir.mkdir(parents=True, exist_ok=True) + root_url = os.environ["TASKCLUSTER_ROOT_URL"] + sha256 = None + if fetch.get("verify-hash"): + sha256 = get_hash(fetch, root_url) + if fetch["artifact"].startswith("public/"): + path = "task/{task}/artifacts/{artifact}".format( + task=fetch["task"], artifact=fetch["artifact"] + ) + url = api(root_url, "queue", "v1", path) + else: + url = ("{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}").format( + proxy_url=os.environ["TASKCLUSTER_PROXY_URL"], + task=fetch["task"], + artifact=fetch["artifact"], + ) + downloads.append((url, extdir, fetch["extract"], sha256)) + + fetch_urls(downloads) + end = time.monotonic() + + perfherder_data = { + "framework": {"name": "build_metrics"}, + "suites": [ + { + "name": "fetch_content", + "value": end - start, + "lowerIsBetter": True, + "shouldAlert": False, + "subtests": [], + } + ], + } + print("PERFHERDER_DATA: {}".format(json.dumps(perfherder_data)), file=sys.stderr) + + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(title="sub commands") + + git_checkout = subparsers.add_parser( + "git-checkout-archive", + help="Obtain an archive of files from a Git repository checkout", + ) + git_checkout.set_defaults(func=command_git_checkout_archive) + git_checkout.add_argument( + "--path-prefix", help="Prefix for paths in produced archive" + ) + git_checkout.add_argument("repo", help="URL to Git repository to be cloned") + git_checkout.add_argument("commit", help="Git commit to check out") + git_checkout.add_argument("dest", help="Destination path of archive") + git_checkout.add_argument( + "--ssh-key-secret", help="The scope path of the ssh key to used for checkout" + ) + git_checkout.add_argument( + "--include-dot-git", action="store_true", help="Include the .git directory" + ) + + url = subparsers.add_parser("static-url", help="Download a static URL") + url.set_defaults(func=command_static_url) + url.add_argument("--sha256", required=True, help="SHA-256 of downloaded content") + url.add_argument( + "--size", required=True, type=int, help="Size of downloaded content, in bytes" + ) + url.add_argument( + "--gpg-sig-url", + help="URL containing signed GPG document validating " "URL to fetch", + ) + url.add_argument( + "--gpg-key-env", help="Environment variable containing GPG key to validate" + ) + url.add_argument( + "--strip-components", + type=int, + default=0, + help="Number of leading components to strip from file " + "names in the downloaded archive", + ) + url.add_argument( + "--add-prefix", + default="", + help="Prefix to add to file names in the downloaded " "archive", + ) + url.add_argument( + "-H", + "--header", + default=[], + action="append", + dest="headers", + help="Header to send as part of the request, can be passed " "multiple times", + ) + url.add_argument("url", help="URL to fetch") + url.add_argument("dest", help="Destination path") + + artifacts = subparsers.add_parser("task-artifacts", help="Fetch task artifacts") + artifacts.set_defaults(func=command_task_artifacts) + artifacts.add_argument( + "-d", + "--dest", + default=os.environ.get("MOZ_FETCHES_DIR"), + help="Destination directory which will contain all " + "artifacts (defaults to $MOZ_FETCHES_DIR)", + ) + + args = parser.parse_args() + + if not args.dest: + parser.error( + "no destination directory specified, either pass in --dest " + "or set $MOZ_FETCHES_DIR" + ) + + return args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc new file mode 100755 index 0000000000..f6a2f6643c --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc @@ -0,0 +1,33 @@ +# By default the progress bar starts after 3s and updates every 0.1s. We +# change this so it shows and updates every 1.0s. +# We also tell progress to assume a TTY is present so updates are printed +# even if there is no known TTY. +[progress] +delay = 1.0 +refresh = 1.0 +assume-tty = true + +[extensions] +share = +sparse = +robustcheckout = /usr/local/mercurial/robustcheckout.py + +[hostsecurity] +# When running a modern Python, Mercurial will default to TLS 1.1+. +# When running on a legacy Python, Mercurial will default to TLS 1.0+. +# There is no good reason we shouldn't be running a modern Python +# capable of speaking TLS 1.2. And the only Mercurial servers we care +# about should be running TLS 1.2. So make TLS 1.2 the minimum. +minimumprotocol = tls1.2 + +# Settings to make 1-click loaners more useful. +[extensions] +histedit = +rebase = + +[diff] +git = 1 +showfunc = 1 + +[pager] +pager = LESS=FRSXQ less diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py new file mode 100644 index 0000000000..7e12d07d50 --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py @@ -0,0 +1,826 @@ +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +"""Robustly perform a checkout. + +This extension provides the ``hg robustcheckout`` command for +ensuring a working directory is updated to the specified revision +from a source repo using best practices to ensure optimal clone +times and storage efficiency. +""" + +from __future__ import absolute_import + +import contextlib +import json +import os +import random +import re +import socket +import ssl +import time + +from mercurial.i18n import _ +from mercurial.node import hex, nullid +from mercurial import ( + commands, + configitems, + error, + exchange, + extensions, + hg, + match as matchmod, + pycompat, + registrar, + scmutil, + urllibcompat, + util, + vfs, +) + +# Causes worker to purge caches on process exit and for task to retry. +EXIT_PURGE_CACHE = 72 + +testedwith = b"4.5 4.6 4.7 4.8 4.9 5.0 5.1 5.2 5.3 5.4 5.5 5.6 5.7 5.8 5.9" +minimumhgversion = b"4.5" + +cmdtable = {} +command = registrar.command(cmdtable) + +configtable = {} +configitem = registrar.configitem(configtable) + +configitem(b"robustcheckout", b"retryjittermin", default=configitems.dynamicdefault) +configitem(b"robustcheckout", b"retryjittermax", default=configitems.dynamicdefault) + + +def getsparse(): + from mercurial import sparse + + return sparse + + +def peerlookup(remote, v): + with remote.commandexecutor() as e: + return e.callcommand(b"lookup", {b"key": v}).result() + + +@command( + b"robustcheckout", + [ + (b"", b"upstream", b"", b"URL of upstream repo to clone from"), + (b"r", b"revision", b"", b"Revision to check out"), + (b"b", b"branch", b"", b"Branch to check out"), + (b"", b"purge", False, b"Whether to purge the working directory"), + (b"", b"sharebase", b"", b"Directory where shared repos should be placed"), + ( + b"", + b"networkattempts", + 3, + b"Maximum number of attempts for network " b"operations", + ), + (b"", b"sparseprofile", b"", b"Sparse checkout profile to use (path in repo)"), + ( + b"U", + b"noupdate", + False, + b"the clone will include an empty working directory\n" + b"(only a repository)", + ), + ], + b"[OPTION]... URL DEST", + norepo=True, +) +def robustcheckout( + ui, + url, + dest, + upstream=None, + revision=None, + branch=None, + purge=False, + sharebase=None, + networkattempts=None, + sparseprofile=None, + noupdate=False, +): + """Ensure a working copy has the specified revision checked out. + + Repository data is automatically pooled into the common directory + specified by ``--sharebase``, which is a required argument. It is required + because pooling storage prevents excessive cloning, which makes operations + complete faster. + + One of ``--revision`` or ``--branch`` must be specified. ``--revision`` + is preferred, as it is deterministic and there is no ambiguity as to which + revision will actually be checked out. + + If ``--upstream`` is used, the repo at that URL is used to perform the + initial clone instead of cloning from the repo where the desired revision + is located. + + ``--purge`` controls whether to removed untracked and ignored files from + the working directory. If used, the end state of the working directory + should only contain files explicitly under version control for the requested + revision. + + ``--sparseprofile`` can be used to specify a sparse checkout profile to use. + The sparse checkout profile corresponds to a file in the revision to be + checked out. If a previous sparse profile or config is present, it will be + replaced by this sparse profile. We choose not to "widen" the sparse config + so operations are as deterministic as possible. If an existing checkout + is present and it isn't using a sparse checkout, we error. This is to + prevent accidentally enabling sparse on a repository that may have + clients that aren't sparse aware. Sparse checkout support requires Mercurial + 4.3 or newer and the ``sparse`` extension must be enabled. + """ + if not revision and not branch: + raise error.Abort(b"must specify one of --revision or --branch") + + if revision and branch: + raise error.Abort(b"cannot specify both --revision and --branch") + + # Require revision to look like a SHA-1. + if revision: + if ( + len(revision) < 12 + or len(revision) > 40 + or not re.match(b"^[a-f0-9]+$", revision) + ): + raise error.Abort( + b"--revision must be a SHA-1 fragment 12-40 " b"characters long" + ) + + sharebase = sharebase or ui.config(b"share", b"pool") + if not sharebase: + raise error.Abort( + b"share base directory not defined; refusing to operate", + hint=b"define share.pool config option or pass --sharebase", + ) + + # Sparse profile support was added in Mercurial 4.3, where it was highly + # experimental. Because of the fragility of it, we only support sparse + # profiles on 4.3. When 4.4 is released, we'll need to opt in to sparse + # support. We /could/ silently fall back to non-sparse when not supported. + # However, given that sparse has performance implications, we want to fail + # fast if we can't satisfy the desired checkout request. + if sparseprofile: + try: + extensions.find(b"sparse") + except KeyError: + raise error.Abort( + b"sparse extension must be enabled to use " b"--sparseprofile" + ) + + ui.warn(b"(using Mercurial %s)\n" % util.version()) + + # worker.backgroundclose only makes things faster if running anti-virus, + # which our automation doesn't. Disable it. + ui.setconfig(b"worker", b"backgroundclose", False) + + # By default the progress bar starts after 3s and updates every 0.1s. We + # change this so it shows and updates every 1.0s. + # We also tell progress to assume a TTY is present so updates are printed + # even if there is no known TTY. + # We make the config change here instead of in a config file because + # otherwise we're at the whim of whatever configs are used in automation. + ui.setconfig(b"progress", b"delay", 1.0) + ui.setconfig(b"progress", b"refresh", 1.0) + ui.setconfig(b"progress", b"assume-tty", True) + + sharebase = os.path.realpath(sharebase) + + optimes = [] + behaviors = set() + start = time.time() + + try: + return _docheckout( + ui, + url, + dest, + upstream, + revision, + branch, + purge, + sharebase, + optimes, + behaviors, + networkattempts, + sparse_profile=sparseprofile, + noupdate=noupdate, + ) + finally: + overall = time.time() - start + + # We store the overall time multiple ways in order to help differentiate + # the various "flavors" of operations. + + # ``overall`` is always the total operation time. + optimes.append(("overall", overall)) + + def record_op(name): + # If special behaviors due to "corrupt" storage occur, we vary the + # name to convey that. + if "remove-store" in behaviors: + name += "_rmstore" + if "remove-wdir" in behaviors: + name += "_rmwdir" + + optimes.append((name, overall)) + + # We break out overall operations primarily by their network interaction + # We have variants within for working directory operations. + if "clone" in behaviors and "create-store" in behaviors: + record_op("overall_clone") + + if "sparse-update" in behaviors: + record_op("overall_clone_sparsecheckout") + else: + record_op("overall_clone_fullcheckout") + + elif "pull" in behaviors or "clone" in behaviors: + record_op("overall_pull") + + if "sparse-update" in behaviors: + record_op("overall_pull_sparsecheckout") + else: + record_op("overall_pull_fullcheckout") + + if "empty-wdir" in behaviors: + record_op("overall_pull_emptywdir") + else: + record_op("overall_pull_populatedwdir") + + else: + record_op("overall_nopull") + + if "sparse-update" in behaviors: + record_op("overall_nopull_sparsecheckout") + else: + record_op("overall_nopull_fullcheckout") + + if "empty-wdir" in behaviors: + record_op("overall_nopull_emptywdir") + else: + record_op("overall_nopull_populatedwdir") + + server_url = urllibcompat.urlreq.urlparse(url).netloc + + if "TASKCLUSTER_INSTANCE_TYPE" in os.environ: + perfherder = { + "framework": { + "name": "vcs", + }, + "suites": [], + } + for op, duration in optimes: + perfherder["suites"].append( + { + "name": op, + "value": duration, + "lowerIsBetter": True, + "shouldAlert": False, + "serverUrl": server_url.decode("utf-8"), + "hgVersion": util.version().decode("utf-8"), + "extraOptions": [os.environ["TASKCLUSTER_INSTANCE_TYPE"]], + "subtests": [], + } + ) + ui.write( + b"PERFHERDER_DATA: %s\n" + % pycompat.bytestr(json.dumps(perfherder, sort_keys=True)) + ) + + +def _docheckout( + ui, + url, + dest, + upstream, + revision, + branch, + purge, + sharebase, + optimes, + behaviors, + networkattemptlimit, + networkattempts=None, + sparse_profile=None, + noupdate=False, +): + if not networkattempts: + networkattempts = [1] + + def callself(): + return _docheckout( + ui, + url, + dest, + upstream, + revision, + branch, + purge, + sharebase, + optimes, + behaviors, + networkattemptlimit, + networkattempts=networkattempts, + sparse_profile=sparse_profile, + noupdate=noupdate, + ) + + @contextlib.contextmanager + def timeit(op, behavior): + behaviors.add(behavior) + errored = False + try: + start = time.time() + yield + except Exception: + errored = True + raise + finally: + elapsed = time.time() - start + + if errored: + op += "_errored" + + optimes.append((op, elapsed)) + + ui.write(b"ensuring %s@%s is available at %s\n" % (url, revision or branch, dest)) + + # We assume that we're the only process on the machine touching the + # repository paths that we were told to use. This means our recovery + # scenario when things aren't "right" is to just nuke things and start + # from scratch. This is easier to implement than verifying the state + # of the data and attempting recovery. And in some scenarios (such as + # potential repo corruption), it is probably faster, since verifying + # repos can take a while. + + destvfs = vfs.vfs(dest, audit=False, realpath=True) + + def deletesharedstore(path=None): + storepath = path or destvfs.read(b".hg/sharedpath").strip() + if storepath.endswith(b".hg"): + storepath = os.path.dirname(storepath) + + storevfs = vfs.vfs(storepath, audit=False) + storevfs.rmtree(forcibly=True) + + if destvfs.exists() and not destvfs.exists(b".hg"): + raise error.Abort(b"destination exists but no .hg directory") + + # Refuse to enable sparse checkouts on existing checkouts. The reasoning + # here is that another consumer of this repo may not be sparse aware. If we + # enabled sparse, we would lock them out. + if destvfs.exists() and sparse_profile and not destvfs.exists(b".hg/sparse"): + raise error.Abort( + b"cannot enable sparse profile on existing " b"non-sparse checkout", + hint=b"use a separate working directory to use sparse", + ) + + # And the other direction for symmetry. + if not sparse_profile and destvfs.exists(b".hg/sparse"): + raise error.Abort( + b"cannot use non-sparse checkout on existing sparse " b"checkout", + hint=b"use a separate working directory to use sparse", + ) + + # Require checkouts to be tied to shared storage because efficiency. + if destvfs.exists(b".hg") and not destvfs.exists(b".hg/sharedpath"): + ui.warn(b"(destination is not shared; deleting)\n") + with timeit("remove_unshared_dest", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + # Verify the shared path exists and is using modern pooled storage. + if destvfs.exists(b".hg/sharedpath"): + storepath = destvfs.read(b".hg/sharedpath").strip() + + ui.write(b"(existing repository shared store: %s)\n" % storepath) + + if not os.path.exists(storepath): + ui.warn(b"(shared store does not exist; deleting destination)\n") + with timeit("removed_missing_shared_store", "remove-wdir"): + destvfs.rmtree(forcibly=True) + elif not re.search(b"[a-f0-9]{40}/\.hg$", storepath.replace(b"\\", b"/")): + ui.warn( + b"(shared store does not belong to pooled storage; " + b"deleting destination to improve efficiency)\n" + ) + with timeit("remove_unpooled_store", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + if destvfs.isfileorlink(b".hg/wlock"): + ui.warn( + b"(dest has an active working directory lock; assuming it is " + b"left over from a previous process and that the destination " + b"is corrupt; deleting it just to be sure)\n" + ) + with timeit("remove_locked_wdir", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + def handlerepoerror(e): + if pycompat.bytestr(e) == _(b"abandoned transaction found"): + ui.warn(b"(abandoned transaction found; trying to recover)\n") + repo = hg.repository(ui, dest) + if not repo.recover(): + ui.warn(b"(could not recover repo state; " b"deleting shared store)\n") + with timeit("remove_unrecovered_shared_store", "remove-store"): + deletesharedstore() + + ui.warn(b"(attempting checkout from beginning)\n") + return callself() + + raise + + # At this point we either have an existing working directory using + # shared, pooled storage or we have nothing. + + def handlenetworkfailure(): + if networkattempts[0] >= networkattemptlimit: + raise error.Abort( + b"reached maximum number of network attempts; " b"giving up\n" + ) + + ui.warn( + b"(retrying after network failure on attempt %d of %d)\n" + % (networkattempts[0], networkattemptlimit) + ) + + # Do a backoff on retries to mitigate the thundering herd + # problem. This is an exponential backoff with a multipler + # plus random jitter thrown in for good measure. + # With the default settings, backoffs will be: + # 1) 2.5 - 6.5 + # 2) 5.5 - 9.5 + # 3) 11.5 - 15.5 + backoff = (2 ** networkattempts[0] - 1) * 1.5 + jittermin = ui.configint(b"robustcheckout", b"retryjittermin", 1000) + jittermax = ui.configint(b"robustcheckout", b"retryjittermax", 5000) + backoff += float(random.randint(jittermin, jittermax)) / 1000.0 + ui.warn(b"(waiting %.2fs before retry)\n" % backoff) + time.sleep(backoff) + + networkattempts[0] += 1 + + def handlepullerror(e): + """Handle an exception raised during a pull. + + Returns True if caller should call ``callself()`` to retry. + """ + if isinstance(e, error.Abort): + if e.args[0] == _(b"repository is unrelated"): + ui.warn(b"(repository is unrelated; deleting)\n") + destvfs.rmtree(forcibly=True) + return True + elif e.args[0].startswith(_(b"stream ended unexpectedly")): + ui.warn(b"%s\n" % e.args[0]) + # Will raise if failure limit reached. + handlenetworkfailure() + return True + # TODO test this branch + elif isinstance(e, error.ResponseError): + if e.args[0].startswith(_(b"unexpected response from remote server:")): + ui.warn(b"(unexpected response from remote server; retrying)\n") + destvfs.rmtree(forcibly=True) + # Will raise if failure limit reached. + handlenetworkfailure() + return True + elif isinstance(e, ssl.SSLError): + # Assume all SSL errors are due to the network, as Mercurial + # should convert non-transport errors like cert validation failures + # to error.Abort. + ui.warn(b"ssl error: %s\n" % pycompat.bytestr(str(e))) + handlenetworkfailure() + return True + elif isinstance(e, urllibcompat.urlerr.urlerror): + if isinstance(e.reason, socket.error): + ui.warn(b"socket error: %s\n" % pycompat.bytestr(str(e.reason))) + handlenetworkfailure() + return True + else: + ui.warn( + b"unhandled URLError; reason type: %s; value: %s\n" + % ( + pycompat.bytestr(e.reason.__class__.__name__), + pycompat.bytestr(str(e.reason)), + ) + ) + else: + ui.warn( + b"unhandled exception during network operation; type: %s; " + b"value: %s\n" + % (pycompat.bytestr(e.__class__.__name__), pycompat.bytestr(str(e))) + ) + + return False + + # Perform sanity checking of store. We may or may not know the path to the + # local store. It depends if we have an existing destvfs pointing to a + # share. To ensure we always find a local store, perform the same logic + # that Mercurial's pooled storage does to resolve the local store path. + cloneurl = upstream or url + + try: + clonepeer = hg.peer(ui, {}, cloneurl) + rootnode = peerlookup(clonepeer, b"0") + except error.RepoLookupError: + raise error.Abort(b"unable to resolve root revision from clone " b"source") + except (error.Abort, ssl.SSLError, urllibcompat.urlerr.urlerror) as e: + if handlepullerror(e): + return callself() + raise + + if rootnode == nullid: + raise error.Abort(b"source repo appears to be empty") + + storepath = os.path.join(sharebase, hex(rootnode)) + storevfs = vfs.vfs(storepath, audit=False) + + if storevfs.isfileorlink(b".hg/store/lock"): + ui.warn( + b"(shared store has an active lock; assuming it is left " + b"over from a previous process and that the store is " + b"corrupt; deleting store and destination just to be " + b"sure)\n" + ) + if destvfs.exists(): + with timeit("remove_dest_active_lock", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + with timeit("remove_shared_store_active_lock", "remove-store"): + storevfs.rmtree(forcibly=True) + + if storevfs.exists() and not storevfs.exists(b".hg/requires"): + ui.warn( + b"(shared store missing requires file; this is a really " + b"odd failure; deleting store and destination)\n" + ) + if destvfs.exists(): + with timeit("remove_dest_no_requires", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + with timeit("remove_shared_store_no_requires", "remove-store"): + storevfs.rmtree(forcibly=True) + + if storevfs.exists(b".hg/requires"): + requires = set(storevfs.read(b".hg/requires").splitlines()) + # "share-safe" (enabled by default as of hg 6.1) moved most + # requirements to a new file, so we need to look there as well to avoid + # deleting and re-cloning each time + if b"share-safe" in requires: + requires |= set(storevfs.read(b".hg/store/requires").splitlines()) + # FUTURE when we require generaldelta, this is where we can check + # for that. + required = {b"dotencode", b"fncache"} + + missing = required - requires + if missing: + ui.warn( + b"(shared store missing requirements: %s; deleting " + b"store and destination to ensure optimal behavior)\n" + % b", ".join(sorted(missing)) + ) + if destvfs.exists(): + with timeit("remove_dest_missing_requires", "remove-wdir"): + destvfs.rmtree(forcibly=True) + + with timeit("remove_shared_store_missing_requires", "remove-store"): + storevfs.rmtree(forcibly=True) + + created = False + + if not destvfs.exists(): + # Ensure parent directories of destination exist. + # Mercurial 3.8 removed ensuredirs and made makedirs race safe. + if util.safehasattr(util, "ensuredirs"): + makedirs = util.ensuredirs + else: + makedirs = util.makedirs + + makedirs(os.path.dirname(destvfs.base), notindexed=True) + makedirs(sharebase, notindexed=True) + + if upstream: + ui.write(b"(cloning from upstream repo %s)\n" % upstream) + + if not storevfs.exists(): + behaviors.add(b"create-store") + + try: + with timeit("clone", "clone"): + shareopts = {b"pool": sharebase, b"mode": b"identity"} + res = hg.clone( + ui, + {}, + clonepeer, + dest=dest, + update=False, + shareopts=shareopts, + stream=True, + ) + except (error.Abort, ssl.SSLError, urllibcompat.urlerr.urlerror) as e: + if handlepullerror(e): + return callself() + raise + except error.RepoError as e: + return handlerepoerror(e) + except error.RevlogError as e: + ui.warn(b"(repo corruption: %s; deleting shared store)\n" % e) + with timeit("remove_shared_store_revlogerror", "remote-store"): + deletesharedstore() + return callself() + + # TODO retry here. + if res is None: + raise error.Abort(b"clone failed") + + # Verify it is using shared pool storage. + if not destvfs.exists(b".hg/sharedpath"): + raise error.Abort(b"clone did not create a shared repo") + + created = True + + # The destination .hg directory should exist. Now make sure we have the + # wanted revision. + + repo = hg.repository(ui, dest) + + # We only pull if we are using symbolic names or the requested revision + # doesn't exist. + havewantedrev = False + + if revision: + try: + ctx = scmutil.revsingle(repo, revision) + except error.RepoLookupError: + ctx = None + + if ctx: + if not ctx.hex().startswith(revision): + raise error.Abort( + b"--revision argument is ambiguous", + hint=b"must be the first 12+ characters of a " b"SHA-1 fragment", + ) + + checkoutrevision = ctx.hex() + havewantedrev = True + + if not havewantedrev: + ui.write(b"(pulling to obtain %s)\n" % (revision or branch,)) + + remote = None + try: + remote = hg.peer(repo, {}, url) + pullrevs = [peerlookup(remote, revision or branch)] + checkoutrevision = hex(pullrevs[0]) + if branch: + ui.warn( + b"(remote resolved %s to %s; " + b"result is not deterministic)\n" % (branch, checkoutrevision) + ) + + if checkoutrevision in repo: + ui.warn(b"(revision already present locally; not pulling)\n") + else: + with timeit("pull", "pull"): + pullop = exchange.pull(repo, remote, heads=pullrevs) + if not pullop.rheads: + raise error.Abort(b"unable to pull requested revision") + except (error.Abort, ssl.SSLError, urllibcompat.urlerr.urlerror) as e: + if handlepullerror(e): + return callself() + raise + except error.RepoError as e: + return handlerepoerror(e) + except error.RevlogError as e: + ui.warn(b"(repo corruption: %s; deleting shared store)\n" % e) + deletesharedstore() + return callself() + finally: + if remote: + remote.close() + + # Now we should have the wanted revision in the store. Perform + # working directory manipulation. + + # Avoid any working directory manipulations if `-U`/`--noupdate` was passed + if noupdate: + ui.write(b"(skipping update since `-U` was passed)\n") + return None + + # Purge if requested. We purge before update because this way we're + # guaranteed to not have conflicts on `hg update`. + if purge and not created: + ui.write(b"(purging working directory)\n") + purge = getattr(commands, "purge", None) + if not purge: + purge = extensions.find(b"purge").purge + + # Mercurial 4.3 doesn't purge files outside the sparse checkout. + # See https://bz.mercurial-scm.org/show_bug.cgi?id=5626. Force + # purging by monkeypatching the sparse matcher. + try: + old_sparse_fn = getattr(repo.dirstate, "_sparsematchfn", None) + if old_sparse_fn is not None: + repo.dirstate._sparsematchfn = lambda: matchmod.always() + + with timeit("purge", "purge"): + if purge( + ui, + repo, + all=True, + abort_on_err=True, + # The function expects all arguments to be + # defined. + **{"print": None, "print0": None, "dirs": None, "files": None} + ): + raise error.Abort(b"error purging") + finally: + if old_sparse_fn is not None: + repo.dirstate._sparsematchfn = old_sparse_fn + + # Update the working directory. + + if repo[b"."].node() == nullid: + behaviors.add("empty-wdir") + else: + behaviors.add("populated-wdir") + + if sparse_profile: + sparsemod = getsparse() + + # By default, Mercurial will ignore unknown sparse profiles. This could + # lead to a full checkout. Be more strict. + try: + repo.filectx(sparse_profile, changeid=checkoutrevision).data() + except error.ManifestLookupError: + raise error.Abort( + b"sparse profile %s does not exist at revision " + b"%s" % (sparse_profile, checkoutrevision) + ) + + old_config = sparsemod.parseconfig( + repo.ui, repo.vfs.tryread(b"sparse"), b"sparse" + ) + + old_includes, old_excludes, old_profiles = old_config + + if old_profiles == {sparse_profile} and not old_includes and not old_excludes: + ui.write( + b"(sparse profile %s already set; no need to update " + b"sparse config)\n" % sparse_profile + ) + else: + if old_includes or old_excludes or old_profiles: + ui.write( + b"(replacing existing sparse config with profile " + b"%s)\n" % sparse_profile + ) + else: + ui.write(b"(setting sparse config to profile %s)\n" % sparse_profile) + + # If doing an incremental update, this will perform two updates: + # one to change the sparse profile and another to update to the new + # revision. This is not desired. But there's not a good API in + # Mercurial to do this as one operation. + with repo.wlock(), repo.dirstate.parentchange(), timeit( + "sparse_update_config", "sparse-update-config" + ): + # pylint --py3k: W1636 + fcounts = list( + map( + len, + sparsemod._updateconfigandrefreshwdir( + repo, [], [], [sparse_profile], force=True + ), + ) + ) + + repo.ui.status( + b"%d files added, %d files dropped, " + b"%d files conflicting\n" % tuple(fcounts) + ) + + ui.write(b"(sparse refresh complete)\n") + + op = "update_sparse" if sparse_profile else "update" + behavior = "update-sparse" if sparse_profile else "update" + + with timeit(op, behavior): + if commands.update(ui, repo, rev=checkoutrevision, clean=True): + raise error.Abort(b"error updating") + + ui.write(b"updated to %s\n" % checkoutrevision) + + return None + + +def extsetup(ui): + # Ensure required extensions are loaded. + for ext in (b"purge", b"share"): + try: + extensions.find(ext) + except KeyError: + extensions.load(ui, ext, None) diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task new file mode 100755 index 0000000000..f1e281f5cd --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task @@ -0,0 +1,1307 @@ +#!/usr/bin/python3 -u +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +"""Run a task after performing common actions. + +This script is meant to be the "driver" for TaskCluster based tasks. +It receives some common arguments to control the run-time environment. + +It performs actions as requested from the arguments. Then it executes +the requested process and prints its output, prefixing it with the +current time to improve log usefulness. +""" + +import sys +from typing import Optional + +if sys.version_info[0:2] < (3, 5): + print("run-task requires Python 3.5+") + sys.exit(1) + + +import argparse +import datetime +import errno +import io +import json +import os +from pathlib import Path +import re +import shutil +import signal +import socket +import stat +import subprocess +import time + +import urllib.error +import urllib.request + +from threading import Thread + +SECRET_BASEURL_TPL = "http://taskcluster/secrets/v1/secret/{}" + +GITHUB_SSH_FINGERPRINT = ( + b"github.com ssh-rsa " + b"AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkcc" + b"Krpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFz" + b"LQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaS" + b"jB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3sku" + b"a2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ==\n" +) + + +CACHE_UID_GID_MISMATCH = """ +There is a UID/GID mismatch on the cache. This likely means: + +a) different tasks are running as a different user/group +b) different Docker images have different UID/GID for the same user/group + +Our cache policy is that the UID/GID for ALL tasks must be consistent +for the lifetime of the cache. This eliminates permissions problems due +to file/directory user/group ownership. + +To make this error go away, ensure that all Docker images are use +a consistent UID/GID and that all tasks using this cache are running as +the same user/group. +""" + + +NON_EMPTY_VOLUME = """ +error: volume %s is not empty + +Our Docker image policy requires volumes to be empty. + +The volume was likely populated as part of building the Docker image. +Change the Dockerfile and anything run from it to not create files in +any VOLUME. + +A lesser possibility is that you stumbled upon a TaskCluster platform bug +where it fails to use new volumes for tasks. +""" + + +FETCH_CONTENT_NOT_FOUND = """ +error: fetch-content script not found + +The script at `taskcluster/scripts/misc/fetch-content` could not be +detected in the current environment. +""" + +# The exit code to use when caches should be purged and the task retried. +# This is EX_OSFILE (from sysexits.h): +# Some system file does not exist, cannot be opened, or has some +# sort of error (e.g., syntax error). +EXIT_PURGE_CACHE = 72 + + +IS_MACOSX = sys.platform == "darwin" +IS_POSIX = os.name == "posix" +IS_WINDOWS = os.name == "nt" + +# Both mercurial and git use sha1 as revision idenfiers. Luckily, both define +# the same value as the null revision. +# +# https://github.com/git/git/blob/dc04167d378fb29d30e1647ff6ff51dd182bc9a3/t/oid-info/hash-info#L7 +# https://www.mercurial-scm.org/repo/hg-stable/file/82efc31bd152/mercurial/node.py#l30 +NULL_REVISION = "0000000000000000000000000000000000000000" + + +def print_line(prefix, m): + now = datetime.datetime.utcnow().isoformat().encode("utf-8") + # slice microseconds to 3 decimals. + now = now[:-3] if now[-7:-6] == b"." else now + sys.stdout.buffer.write(b"[%s %sZ] %s" % (prefix, now, m)) + sys.stdout.buffer.flush() + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + retry_delay is multiplied by the number of failed attempts to increase + the likelihood of success in subsequent attempts. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # https://docs.python.org/3/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.ENOENT): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + print( + '%s() failed for "%s". Reason: %s (%s). Retrying...' + % (func.__name__, args, e.strerror, e.errno) + ) + time.sleep(retry_count * retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds + between each attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_windows_retry(os.chmod, (path, mode)) + + if not os.path.lexists(path): + print_line(b"remove", b"WARNING: %s does not exists!\n" % path.encode("utf-8")) + return + + """ + On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access + files or directories that exceed MAX_PATH(260) limitation or that ends + with a period. + """ + if ( + sys.platform in ("win32", "cygwin") + and len(path) >= 3 + and path[1] == ":" + and path[2] == "\\" + ): + path = "\\\\?\\%s" % path + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_windows_retry(shutil.rmtree, (path,)) + + +def run_required_command(prefix, args, *, extra_env=None, cwd=None): + res = run_command(prefix, args, extra_env=extra_env, cwd=cwd) + if res: + sys.exit(res) + + +def retry_required_command(prefix, args, *, extra_env=None, cwd=None, retries=2): + backoff = 1 + while True: + res = run_command(prefix, args, extra_env=extra_env, cwd=cwd) + if not res: + return + if not retries: + sys.exit(res) + retries -= 1 + backoff *= 2 + time.sleep(backoff) + + +def run_command(prefix, args, *, extra_env=None, cwd=None): + """Runs a process and prefixes its output with the time. + + Returns the process exit code. + """ + print_line(prefix, b"executing %r\n" % args) + + env = dict(os.environ) + env.update(extra_env or {}) + + # Note: TaskCluster's stdin is a TTY. This attribute is lost + # when we pass sys.stdin to the invoked process. If we cared + # to preserve stdin as a TTY, we could make this work. But until + # someone needs it, don't bother. + + # We want stdout to be bytes on Python 3. That means we can't use + # universal_newlines=True (because it implies text mode). But + # p.stdout.readline() won't work for bytes text streams. So, on Python 3, + # we manually install a latin1 stream wrapper. This allows us to readline() + # and preserves bytes, without losing any data. + + p = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin.fileno(), + cwd=cwd, + env=env, + ) + + stdout = io.TextIOWrapper(p.stdout, encoding="latin1") + + while True: + data = stdout.readline().encode("latin1") + + if data == b"": + break + + print_line(prefix, data) + + return p.wait() + + +def get_posix_user_group(user, group): + import grp + import pwd + + try: + user_record = pwd.getpwnam(user) + except KeyError: + print("could not find user %s; specify a valid user with --user" % user) + sys.exit(1) + + try: + group_record = grp.getgrnam(group) + except KeyError: + print("could not find group %s; specify a valid group with --group" % group) + sys.exit(1) + + # Most tasks use worker:worker. We require they have a specific numeric ID + # because otherwise it is too easy for files written to caches to have + # mismatched numeric IDs, which results in permissions errors. + if user_record.pw_name == "worker" and user_record.pw_uid != 1000: + print("user `worker` must have uid=1000; got %d" % user_record.pw_uid) + sys.exit(1) + + if group_record.gr_name == "worker" and group_record.gr_gid != 1000: + print("group `worker` must have gid=1000; got %d" % group_record.gr_gid) + sys.exit(1) + + # Find all groups to which this user is a member. + gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem] + + return user_record, group_record, gids + + +def write_audit_entry(path, msg): + now = datetime.datetime.utcnow().isoformat().encode("utf-8") + with open(path, "ab") as fh: + fh.write(b"[%sZ %s] %s\n" % (now, os.environb.get(b"TASK_ID", b"UNKNOWN"), msg)) + + +WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR + + +def set_dir_permissions(path, uid, gid): + st = os.lstat(path) + + if st.st_uid != uid or st.st_gid != gid: + os.chown(path, uid, gid) + + # Also make sure dirs are writable in case we need to delete + # them. + if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE: + os.chmod(path, st.st_mode | WANTED_DIR_MODE) + + +def chown_recursive(path, user, group, uid, gid): + print_line( + b"chown", + b"recursively changing ownership of %s to %s:%s\n" + % (path.encode("utf-8"), user.encode("utf-8"), group.encode("utf-8")), + ) + + set_dir_permissions(path, uid, gid) + + for root, dirs, files in os.walk(path): + for d in dirs: + set_dir_permissions(os.path.join(root, d), uid, gid) + + for f in files: + # File may be a symlink that points to nowhere. In which case + # os.chown() would fail because it attempts to follow the + # symlink. We only care about directory entries, not what + # they point to. So setting the owner of the symlink should + # be sufficient. + os.lchown(os.path.join(root, f), uid, gid) + + +def configure_cache_posix(cache, user, group, untrusted_caches, running_as_root): + """Configure a cache path on POSIX platforms. + + For each cache, we write out a special file denoting attributes and + capabilities of run-task and the task being executed. These attributes + are used by subsequent run-task invocations to validate that use of + the cache is acceptable. + + We /could/ blow away the cache data on requirements mismatch. + While this would be convenient, this could result in "competing" tasks + effectively undoing the other's work. This would slow down task + execution in aggregate. Without monitoring for this, people may not notice + the problem and tasks would be slower than they could be. We follow the + principle of "fail fast" to ensure optimal task execution. + + We also write an audit log of who used the caches. This log is printed + during failures to help aid debugging. + """ + + our_requirements = { + # Include a version string that we can bump whenever to trigger + # fresh caches. The actual value is not relevant and doesn't need + # to follow any explicit order. Since taskgraph bakes this file's + # hash into cache names, any change to this file/version is sufficient + # to force the use of a new cache. + b"version=1", + # Include the UID and GID the task will run as to ensure that tasks + # with different UID and GID don't share the same cache. + b"uid=%d" % user.pw_uid, + b"gid=%d" % group.gr_gid, + } + + requires_path = os.path.join(cache, ".cacherequires") + audit_path = os.path.join(cache, ".cachelog") + + # The cache is empty. Configure it. + if not os.listdir(cache): + print_line( + b"cache", + b"cache %s is empty; writing requirements: " + b"%s\n" % (cache.encode("utf-8"), b" ".join(sorted(our_requirements))), + ) + + # We write a requirements file so future invocations know what the + # requirements are. + with open(requires_path, "wb") as fh: + fh.write(b"\n".join(sorted(our_requirements))) + + # And make it read-only as a precaution against deletion. + os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + + write_audit_entry( + audit_path, + b"created; requirements: %s" % b", ".join(sorted(our_requirements)), + ) + + set_dir_permissions(cache, user.pw_uid, group.gr_gid) + return + + # The cache has content and we have a requirements file. Validate + # requirements alignment. + if os.path.exists(requires_path): + with open(requires_path, "rb") as fh: + wanted_requirements = set(fh.read().splitlines()) + + print_line( + b"cache", + b"cache %s exists; requirements: %s\n" + % (cache.encode("utf-8"), b" ".join(sorted(wanted_requirements))), + ) + + missing = wanted_requirements - our_requirements + + # Allow requirements mismatch for uid/gid if and only if caches + # are untrusted. This allows cache behavior on Try to be + # reasonable. Otherwise, random tasks could "poison" cache + # usability by introducing uid/gid mismatches. For untrusted + # environments like Try, this is a perfectly reasonable thing to + # allow. + if ( + missing + and untrusted_caches + and running_as_root + and all(s.startswith((b"uid=", b"gid=")) for s in missing) + ): + print_line( + b"cache", + b"cache %s uid/gid mismatch; this is acceptable " + b"because caches for this task are untrusted; " + b"changing ownership to facilitate cache use\n" % cache.encode("utf-8"), + ) + chown_recursive( + cache, user.pw_name, group.gr_name, user.pw_uid, group.gr_gid + ) + + # And write out the updated reality. + with open(requires_path, "wb") as fh: + fh.write(b"\n".join(sorted(our_requirements))) + + write_audit_entry( + audit_path, + b"chown; requirements: %s" % b", ".join(sorted(our_requirements)), + ) + + elif missing: + print( + "error: requirements for populated cache %s differ from " + "this task" % cache + ) + print( + "cache requirements: %s" + % " ".join(sorted(s.decode("utf-8") for s in wanted_requirements)) + ) + print( + "our requirements: %s" + % " ".join(sorted(s.decode("utf-8") for s in our_requirements)) + ) + if any(s.startswith((b"uid=", b"gid=")) for s in missing): + print(CACHE_UID_GID_MISMATCH) + + write_audit_entry( + audit_path, + b"requirements mismatch; wanted: %s" + % b", ".join(sorted(our_requirements)), + ) + + print("") + print("audit log:") + with open(audit_path, "r") as fh: + print(fh.read()) + + return True + else: + write_audit_entry(audit_path, b"used") + + # We don't need to adjust permissions here because the cache is + # associated with a uid/gid and the first task should have set + # a proper owner/group. + + return + + # The cache has content and no requirements file. This shouldn't + # happen because run-task should be the first thing that touches a + # cache. + print( + "error: cache %s is not empty and is missing a " + ".cacherequires file; the cache names for this task are " + "likely mis-configured or TASKCLUSTER_CACHES is not set " + "properly" % cache + ) + + write_audit_entry(audit_path, b"missing .cacherequires") + return True + + +def configure_volume_posix(volume, user, group, running_as_root): + # The only time we should see files in the volume is if the Docker + # image build put files there. + # + # For the sake of simplicity, our policy is that volumes should be + # empty. This also has the advantage that an empty volume looks + # a lot like an empty cache. Tasks can rely on caches being + # swapped in and out on any volume without any noticeable change + # of behavior. + volume_files = os.listdir(volume) + if volume_files: + print(NON_EMPTY_VOLUME % volume) + print("entries in root directory: %s" % " ".join(sorted(volume_files))) + sys.exit(1) + + # The volume is almost certainly owned by root:root. Chown it so it + # is writable. + + if running_as_root: + print_line( + b"volume", + b"changing ownership of volume %s " + b"to %d:%d\n" % (volume.encode("utf-8"), user.pw_uid, group.gr_gid), + ) + set_dir_permissions(volume, user.pw_uid, group.gr_gid) + + +def _clean_git_checkout(destination_path): + # Delete untracked files (i.e. build products) + print_line(b"vcs", b"cleaning git checkout...\n") + args = [ + "git", + "clean", + # Two -f`s causes subdirectories with `.git` + # directories to be cleaned as well. + "-nxdff", + ] + print_line(b"vcs", b"executing %r\n" % args) + p = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin.fileno(), + cwd=destination_path, + env=os.environ, + ) + stdout = io.TextIOWrapper(p.stdout, encoding="latin1") + ret = p.wait() + if ret: + sys.exit(ret) + data = stdout.read() + prefix = "Would remove " + filenames = [ + os.path.join(destination_path, line[len(prefix) :]) + for line in data.splitlines() + ] + print_line(b"vcs", b"removing %r\n" % filenames) + for filename in filenames: + remove(filename) + print_line(b"vcs", b"successfully cleaned git checkout!\n") + + +def git_checkout( + destination_path: str, + head_repo: str, + base_repo: Optional[str], + base_ref: Optional[str], + base_rev: Optional[str], + ref: Optional[str], + commit: Optional[str], + ssh_key_file: Optional[Path], + ssh_known_hosts_file: Optional[Path], +): + env = {"PYTHONUNBUFFERED": "1"} + + if ssh_key_file and ssh_known_hosts_file: + if not ssh_key_file.exists(): + raise RuntimeError("Can't find specified ssh_key file.") + if not ssh_known_hosts_file.exists(): + raise RuntimeError("Can't find specified known_hosts file.") + env["GIT_SSH_COMMAND"] = " ".join( + [ + "ssh", + "-oIdentityFile={}".format(ssh_key_file.as_posix()), + "-oStrictHostKeyChecking=yes", + "-oUserKnownHostsFile={}".format(ssh_known_hosts_file.as_posix()), + ] + ) + elif ssh_key_file or ssh_known_hosts_file: + raise RuntimeError( + "Must specify both ssh_key_file and ssh_known_hosts_file, if either are specified", + ) + + if not os.path.exists(destination_path): + # Repository doesn't already exist, needs to be cloned + args = [ + "git", + "clone", + base_repo if base_repo else head_repo, + destination_path, + ] + + retry_required_command(b"vcs", args, extra_env=env) + + if base_ref: + args = ["git", "fetch", "origin", base_ref] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # Create local branch so that taskgraph is able to compute differences + # between the head branch and the base one, if needed + args = ["git", "checkout", base_ref] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # When commits are force-pushed (like on a testing branch), base_rev doesn't + # exist on base_ref. Fetching it allows taskgraph to compute differences + # between the previous state before the force-push and the current state. + # + # Unlike base_ref just above, there is no need to checkout the revision: + # it's immediately avaiable after the fetch. + if base_rev and base_rev != NULL_REVISION: + args = ["git", "fetch", "origin", base_rev] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # If a ref isn't provided, we fetch all refs from head_repo, which may be slow + args = [ + "git", + "fetch", + "--no-tags", + head_repo, + ref if ref else "+refs/heads/*:refs/remotes/work/*", + ] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + args = [ + "git", + "checkout", + "-f", + ] + + if ref: + args.extend(["-B", ref]) + args.append(commit if commit else ref) + + run_required_command(b"vcs", args, cwd=destination_path) + + if os.path.exists(os.path.join(destination_path, ".gitmodules")): + args = [ + "git", + "submodule", + "init", + ] + + run_required_command(b"vcs", args, cwd=destination_path) + + args = [ + "git", + "submodule", + "update", + ] + + run_required_command(b"vcs", args, cwd=destination_path) + + _clean_git_checkout(destination_path) + + args = ["git", "rev-parse", "--verify", "HEAD"] + + commit_hash = subprocess.check_output( + args, cwd=destination_path, universal_newlines=True + ).strip() + assert re.match("^[a-f0-9]{40}$", commit_hash) + + if head_repo.startswith("https://github.com"): + if head_repo.endswith("/"): + head_repo = head_repo[:-1] + + tinderbox_link = "{}/commit/{}".format(head_repo, commit_hash) + repo_name = head_repo.split("/")[-1] + else: + tinderbox_link = head_repo + repo_name = head_repo + + msg = ( + "TinderboxPrint:<a href='{link}' " + "title='Built from {name} commit {commit_hash}'>" + "{commit_hash}</a>\n".format( + commit_hash=commit_hash, link=tinderbox_link, name=repo_name + ) + ) + + print_line(b"vcs", msg.encode("utf-8")) + + return commit_hash + + +def fetch_ssh_secret(secret_name): + """Retrieves the private ssh key, and returns it as a StringIO object""" + secret_url = SECRET_BASEURL_TPL.format(secret_name) + try: + print_line( + b"vcs", + b"fetching secret %s from %s\n" + % (secret_name.encode("utf-8"), secret_url.encode("utf-8")), + ) + res = urllib.request.urlopen(secret_url, timeout=10) + secret = res.read() + try: + secret = json.loads(secret.decode("utf-8")) + except ValueError: + print_line(b"vcs", b"invalid JSON in secret") + sys.exit(1) + except (urllib.error.URLError, socket.timeout): + print_line(b"vcs", b"Unable to retrieve ssh secret. aborting...") + sys.exit(1) + + return secret["secret"]["ssh_privkey"] + + +def hg_checkout( + destination_path: str, + head_repo: str, + base_repo: Optional[str], + store_path: str, + sparse_profile: Optional[str], + branch: Optional[str], + revision: Optional[str], +): + if IS_MACOSX: + hg_bin = "/tools/python27-mercurial/bin/hg" + elif IS_POSIX: + hg_bin = "hg" + elif IS_WINDOWS: + # This is where OCC installs it in the AMIs. + hg_bin = r"C:\Program Files\Mercurial\hg.exe" + if not os.path.exists(hg_bin): + print("could not find Mercurial executable: %s" % hg_bin) + sys.exit(1) + else: + raise RuntimeError("Must be running on mac, posix or windows") + + args = [ + hg_bin, + "robustcheckout", + "--sharebase", + store_path, + "--purge", + ] + + if base_repo: + args.extend(["--upstream", base_repo]) + if sparse_profile: + args.extend(["--sparseprofile", sparse_profile]) + + # Specify method to checkout a revision. This defaults to revisions as + # SHA-1 strings, but also supports symbolic revisions like `tip` via the + # branch flag. + args.extend( + [ + "--branch" if branch else "--revision", + branch or revision, + head_repo, + destination_path, + ] + ) + + run_required_command(b"vcs", args, extra_env={"PYTHONUNBUFFERED": "1"}) + + # Update the current revision hash and ensure that it is well formed. + revision = subprocess.check_output( + [hg_bin, "log", "--rev", ".", "--template", "{node}"], + cwd=destination_path, + # Triggers text mode on Python 3. + universal_newlines=True, + ) + + assert re.match("^[a-f0-9]{40}$", revision) + + msg = ( + "TinderboxPrint:<a href={head_repo}/rev/{revision} " + "title='Built from {repo_name} revision {revision}'>" + "{revision}</a>\n".format( + revision=revision, head_repo=head_repo, repo_name=head_repo.split("/")[-1] + ) + ) + + print_line(b"vcs", msg.encode("utf-8")) + + return revision + + +def fetch_artifacts(): + print_line(b"fetches", b"fetching artifacts\n") + + fetch_content = shutil.which("fetch-content") + + if not fetch_content or not os.path.isfile(fetch_content): + fetch_content = os.path.join(os.path.dirname(__file__), "fetch-content") + + if not os.path.isfile(fetch_content): + print(FETCH_CONTENT_NOT_FOUND) + sys.exit(1) + + cmd = [sys.executable, "-u", fetch_content, "task-artifacts"] + print_line(b"fetches", b"executing %r\n" % cmd) + subprocess.run(cmd, check=True, env=os.environ) + print_line(b"fetches", b"finished fetching artifacts\n") + + +def add_vcs_arguments(parser, project, name): + """Adds arguments to ArgumentParser to control VCS options for a project.""" + + parser.add_argument( + "--%s-checkout" % project, + help="Directory where %s checkout should be created" % name, + ) + parser.add_argument( + "--%s-sparse-profile" % project, + help="Path to sparse profile for %s checkout" % name, + ) + + +def collect_vcs_options(args, project, name): + checkout = getattr(args, "%s_checkout" % project) + sparse_profile = getattr(args, "%s_sparse_profile" % project) + + env_prefix = project.upper() + + repo_type = os.environ.get("%s_REPOSITORY_TYPE" % env_prefix) + base_repo = os.environ.get("%s_BASE_REPOSITORY" % env_prefix) + base_ref = os.environ.get("%s_BASE_REF" % env_prefix) + base_rev = os.environ.get("%s_BASE_REV" % env_prefix) + head_repo = os.environ.get("%s_HEAD_REPOSITORY" % env_prefix) + revision = os.environ.get("%s_HEAD_REV" % env_prefix) + ref = os.environ.get("%s_HEAD_REF" % env_prefix) + pip_requirements = os.environ.get("%s_PIP_REQUIREMENTS" % env_prefix) + private_key_secret = os.environ.get("%s_SSH_SECRET_NAME" % env_prefix) + + store_path = os.environ.get("HG_STORE_PATH") + + # Expand ~ in some paths. + if checkout: + checkout = os.path.abspath(os.path.expanduser(checkout)) + if store_path: + store_path = os.path.abspath(os.path.expanduser(store_path)) + + if pip_requirements: + pip_requirements = os.path.join(checkout, pip_requirements) + + # Some callers set the base repository to mozilla-central for historical + # reasons. Switch to mozilla-unified because robustcheckout works best + # with it. + if base_repo == "https://hg.mozilla.org/mozilla-central": + base_repo = "https://hg.mozilla.org/mozilla-unified" + + return { + "store-path": store_path, + "project": project, + "name": name, + "env-prefix": env_prefix, + "checkout": checkout, + "sparse-profile": sparse_profile, + "base-repo": base_repo, + "base-ref": base_ref, + "base-rev": base_rev, + "head-repo": head_repo, + "revision": revision, + "ref": ref, + "repo-type": repo_type, + "ssh-secret-name": private_key_secret, + "pip-requirements": pip_requirements, + } + + +def vcs_checkout_from_args(options): + + if not options["checkout"]: + if options["ref"] and not options["revision"]: + print("task should be defined in terms of non-symbolic revision") + sys.exit(1) + return + + revision = options["revision"] + ref = options["ref"] + ssh_key_file = None + ssh_known_hosts_file = None + ssh_dir = None + + try: + if options.get("ssh-secret-name"): + ssh_dir = Path("~/.ssh-run-task").expanduser() + os.makedirs(ssh_dir, 0o700) + ssh_key_file = ssh_dir.joinpath("private_ssh_key") + ssh_key = fetch_ssh_secret(options["ssh-secret-name"]) + # We don't use write_text here, to avoid \n -> \r\n on windows + ssh_key_file.write_bytes(ssh_key.encode("ascii")) + ssh_key_file.chmod(0o600) + # TODO: We should pull this from a secret, so it can be updated on old trees + ssh_known_hosts_file = ssh_dir.joinpath("known_hosts") + ssh_known_hosts_file.write_bytes(GITHUB_SSH_FINGERPRINT) + + if options["repo-type"] == "git": + if not revision and not ref: + raise RuntimeError( + "Git requires that either a ref, a revision, or both are provided" + ) + + if not ref: + print("Providing a ref will improve the performance of this checkout") + + revision = git_checkout( + options["checkout"], + options["head-repo"], + options["base-repo"], + options["base-ref"], + options["base-rev"], + ref, + revision, + ssh_key_file, + ssh_known_hosts_file, + ) + elif options["repo-type"] == "hg": + if not revision and not ref: + raise RuntimeError( + "Hg requires that at least one of a ref or revision " "is provided" + ) + + revision = hg_checkout( + options["checkout"], + options["head-repo"], + options["base-repo"], + options["store-path"], + options["sparse-profile"], + ref, + revision, + ) + else: + raise RuntimeError('Type of VCS must be either "git" or "hg"') + finally: + if ssh_dir: + shutil.rmtree(ssh_dir, ignore_errors=True) + pass + + os.environ["%s_HEAD_REV" % options["env-prefix"]] = revision + + +def install_pip_requirements(repositories): + """Install pip requirements files from specified repositories, if necessary.""" + requirements = [ + r["pip-requirements"] for r in repositories if r["pip-requirements"] + ] + if not requirements: + return + + cmd = [sys.executable, "-mpip", "install"] + if os.environ.get("PIP_DISABLE_REQUIRE_HASHES") != "1": + cmd.append("--require-hashes") + + for path in requirements: + cmd.extend(["-r", path]) + + run_required_command(b"pip-install", cmd) + + +def maybe_run_resource_monitoring(): + """Run the resource monitor if available. + + Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160 + and https://bugzil.la/1648051 + + """ + if "MOZ_FETCHES" not in os.environ: + return + if "RESOURCE_MONITOR_OUTPUT" not in os.environ: + return + + prefix = b"resource_monitor" + + executable = "{}/resource-monitor/resource-monitor{}".format( + os.environ.get("MOZ_FETCHES_DIR"), ".exe" if IS_WINDOWS else "" + ) + + if not os.path.exists(executable) or not os.access(executable, os.X_OK): + print_line(prefix, b"%s not executable\n" % executable.encode("utf-8")) + return + args = [ + executable, + "-process", + str(os.getpid()), + "-output", + os.environ["RESOURCE_MONITOR_OUTPUT"], + ] + print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode("utf-8")) + # Avoid environment variables the payload doesn't need. + del os.environ["RESOURCE_MONITOR_OUTPUT"] + + # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too. + process = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0, + cwd=os.getcwd(), + ) + + def capture_output(): + fh = io.TextIOWrapper(process.stdout, encoding="latin1") + while True: + data = fh.readline().encode("latin1") + if data == b"": + break + print_line(prefix, data) + + monitor_process = Thread(target=capture_output) + monitor_process.start() + return process + + +def main(args): + os.environ["TASK_WORKDIR"] = os.getcwd() + print_line( + b"setup", + b"run-task started in %s\n" % os.environ["TASK_WORKDIR"].encode("utf-8"), + ) + running_as_root = IS_POSIX and os.getuid() == 0 + + # Arguments up to '--' are ours. After are for the main task + # to be executed. + try: + i = args.index("--") + our_args = args[0:i] + task_args = args[i + 1 :] + except ValueError: + our_args = args + task_args = [] + + parser = argparse.ArgumentParser() + parser.add_argument("--user", default="worker", help="user to run as") + parser.add_argument("--group", default="worker", help="group to run as") + parser.add_argument("--task-cwd", help="directory to run the provided command in") + + repositories = os.environ.get("REPOSITORIES") + if repositories: + repositories = json.loads(repositories) + else: + repositories = {"vcs": "repository"} + + for repository, name in repositories.items(): + add_vcs_arguments(parser, repository, name) + + parser.add_argument( + "--fetch-hgfingerprint", action="store_true", help=argparse.SUPPRESS + ) + + args = parser.parse_args(our_args) + + repositories = [ + collect_vcs_options(args, repository, name) + for (repository, name) in repositories.items() + ] + # Sort repositories so that parent checkout paths come before children + repositories.sort(key=lambda repo: Path(repo["checkout"] or "/").parts) + + uid = gid = gids = None + if IS_POSIX and running_as_root: + user, group, gids = get_posix_user_group(args.user, args.group) + uid = user.pw_uid + gid = group.gr_gid + + if running_as_root and os.path.exists("/dev/kvm"): + # Ensure kvm permissions for worker, required for Android x86 + st = os.stat("/dev/kvm") + os.chmod("/dev/kvm", st.st_mode | 0o666) + + # Validate caches. + # + # Taskgraph should pass in a list of paths that are caches via an + # environment variable (which we don't want to pass down to child + # processes). + + if "TASKCLUSTER_CACHES" in os.environ: + caches = os.environ["TASKCLUSTER_CACHES"].split(";") + del os.environ["TASKCLUSTER_CACHES"] + else: + caches = [] + + if "TASKCLUSTER_UNTRUSTED_CACHES" in os.environ: + untrusted_caches = True + del os.environ["TASKCLUSTER_UNTRUSTED_CACHES"] + else: + untrusted_caches = False + + for cache in caches: + if not os.path.isdir(cache): + print( + "error: cache %s is not a directory; this should never " + "happen" % cache + ) + return 1 + + purge = configure_cache_posix( + cache, user, group, untrusted_caches, running_as_root + ) + + if purge: + return EXIT_PURGE_CACHE + + if "TASKCLUSTER_VOLUMES" in os.environ: + volumes = os.environ["TASKCLUSTER_VOLUMES"].split(";") + del os.environ["TASKCLUSTER_VOLUMES"] + else: + volumes = [] + + if volumes and not IS_POSIX: + print("assertion failed: volumes not expected on Windows") + return 1 + + # Sanitize volumes. + for volume in volumes: + # If a volume is a cache, it was dealt with above. + if volume in caches: + print_line(b"volume", b"volume %s is a cache\n" % volume.encode("utf-8")) + continue + + configure_volume_posix(volume, user, group, running_as_root) + + all_caches_and_volumes = set(map(os.path.normpath, caches)) + all_caches_and_volumes |= set(map(os.path.normpath, volumes)) + + def path_in_cache_or_volume(path): + path = os.path.normpath(path) + + while path: + if path in all_caches_and_volumes: + return True + + path, child = os.path.split(path) + if not child: + break + + return False + + def prepare_checkout_dir(checkout): + if not checkout: + return + + # The checkout path becomes the working directory. Since there are + # special cache files in the cache's root directory and working + # directory purging could blow them away, disallow this scenario. + if os.path.exists(os.path.join(checkout, ".cacherequires")): + print("error: cannot perform vcs checkout into cache root: %s" % checkout) + sys.exit(1) + + # TODO given the performance implications, consider making this a fatal + # error. + if not path_in_cache_or_volume(checkout): + print_line( + b"vcs", + b"WARNING: vcs checkout path (%s) not in cache " + b"or volume; performance will likely suffer\n" + % checkout.encode("utf-8"), + ) + + # Ensure the directory for the source checkout exists. + try: + os.makedirs(os.path.dirname(checkout)) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + # And that it is owned by the appropriate user/group. + if running_as_root: + os.chown(os.path.dirname(checkout), uid, gid) + + def prepare_hg_store_path(): + # And ensure the shared store path exists and has proper permissions. + if "HG_STORE_PATH" not in os.environ: + print("error: HG_STORE_PATH environment variable not set") + sys.exit(1) + + store_path = os.environ["HG_STORE_PATH"] + + if not path_in_cache_or_volume(store_path): + print_line( + b"vcs", + b"WARNING: HG_STORE_PATH (%s) not in cache or " + b"volume; performance will likely suffer\n" + % store_path.encode("utf-8"), + ) + + try: + os.makedirs(store_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + if running_as_root: + os.chown(store_path, uid, gid) + + repository_paths = [ + Path(repo["checkout"]) for repo in repositories if repo["checkout"] + ] + for repo in repositories: + if not repo["checkout"]: + continue + parents = Path(repo["checkout"]).parents + if any((path in repository_paths) for path in parents): + # Skip creating any checkouts that are inside other checokuts + continue + prepare_checkout_dir(repo["checkout"]) + + if any(repo["checkout"] and repo["repo-type"] == "hg" for repo in repositories): + prepare_hg_store_path() + + if IS_POSIX and running_as_root: + # Drop permissions to requested user. + # This code is modeled after what `sudo` was observed to do in a Docker + # container. We do not bother calling setrlimit() because containers have + # their own limits. + print_line( + b"setup", + b"running as %s:%s\n" + % (args.user.encode("utf-8"), args.group.encode("utf-8")), + ) + + os.setgroups(gids) + os.umask(0o22) + os.setresgid(gid, gid, gid) + os.setresuid(uid, uid, uid) + + for repo in repositories: + vcs_checkout_from_args(repo) + + resource_process = None + + try: + for k in ["MOZ_FETCHES_DIR", "UPLOAD_DIR"] + [ + "{}_PATH".format(repository["project"].upper()) + for repository in repositories + ]: + if k in os.environ: + os.environ[k] = os.path.abspath(os.environ[k]) + print_line( + b"setup", + b"%s is %s\n" % (k.encode("utf-8"), os.environ[k].encode("utf-8")), + ) + + if "MOZ_FETCHES" in os.environ: + fetch_artifacts() + + # Install Python requirements after fetches in case tasks want to use + # fetches to grab dependencies. + install_pip_requirements(repositories) + + resource_process = maybe_run_resource_monitoring() + + return run_command(b"task", task_args, cwd=args.task_cwd) + finally: + if resource_process: + print_line(b"resource_monitor", b"terminating\n") + if IS_WINDOWS: + # .terminate() on Windows is not a graceful shutdown, due to + # differences in signals. CTRL_BREAK_EVENT will work provided + # the subprocess is in a different process group, so this script + # isn't also killed. + os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT) + else: + resource_process.terminate() + resource_process.wait() + fetches_dir = os.environ.get("MOZ_FETCHES_DIR") + if fetches_dir and os.path.isdir(fetches_dir): + print_line(b"fetches", b"removing %s\n" % fetches_dir.encode("utf-8")) + remove(fetches_dir) + print_line(b"fetches", b"finished\n") + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) |