summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/run-task
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/run-task')
-rwxr-xr-xthird_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content899
-rwxr-xr-xthird_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc33
-rw-r--r--third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py860
-rwxr-xr-xthird_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task1348
4 files changed, 3140 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content
new file mode 100755
index 0000000000..0af923d01d
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/fetch-content
@@ -0,0 +1,899 @@
+#!/usr/bin/python3 -u
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import argparse
+import bz2
+import concurrent.futures
+import contextlib
+import datetime
+import gzip
+import hashlib
+import json
+import lzma
+import multiprocessing
+import os
+import pathlib
+import random
+import re
+import stat
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import urllib.parse
+import urllib.request
+import zipfile
+
+try:
+ import zstandard
+except ImportError:
+ zstandard = None
+
+try:
+ import certifi
+except ImportError:
+ certifi = None
+
+
+CONCURRENCY = multiprocessing.cpu_count()
+
+
+def log(msg):
+ print(msg, file=sys.stderr)
+ sys.stderr.flush()
+
+
+class IntegrityError(Exception):
+ """Represents an integrity error when downloading a URL."""
+
+
+def ZstdCompressor(*args, **kwargs):
+ if not zstandard:
+ raise ValueError("zstandard Python package not available")
+ return zstandard.ZstdCompressor(*args, **kwargs)
+
+
+def ZstdDecompressor(*args, **kwargs):
+ if not zstandard:
+ raise ValueError("zstandard Python package not available")
+ return zstandard.ZstdDecompressor(*args, **kwargs)
+
+
+@contextlib.contextmanager
+def rename_after_close(fname, *args, **kwargs):
+ """
+ Context manager that opens a temporary file to use as a writer,
+ and closes the file on context exit, renaming it to the expected
+ file name in case of success, or removing it in case of failure.
+
+ Takes the same options as open(), but must be used as a context
+ manager.
+ """
+ path = pathlib.Path(fname)
+ tmp = path.with_name("%s.tmp" % path.name)
+ try:
+ with tmp.open(*args, **kwargs) as fh:
+ yield fh
+ except Exception:
+ tmp.unlink()
+ raise
+ else:
+ tmp.rename(fname)
+
+
+# The following is copied from
+# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85
+def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1):
+ """
+ A generator function that sleeps between retries, handles exponential
+ backoff and jitter. The action you are retrying is meant to run after
+ retrier yields.
+
+ At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter).
+ Afterwards sleeptime is multiplied by sleepscale for the next iteration.
+
+ Args:
+ attempts (int): maximum number of times to try; defaults to 5
+ sleeptime (float): how many seconds to sleep between tries; defaults to
+ 60s (one minute)
+ max_sleeptime (float): the longest we'll sleep, in seconds; defaults to
+ 300s (five minutes)
+ sleepscale (float): how much to multiply the sleep time by each
+ iteration; defaults to 1.5
+ jitter (int): random jitter to introduce to sleep time each iteration.
+ the amount is chosen at random between [-jitter, +jitter]
+ defaults to 1
+
+ Yields:
+ None, a maximum of `attempts` number of times
+
+ Example:
+ >>> n = 0
+ >>> for _ in retrier(sleeptime=0, jitter=0):
+ ... if n == 3:
+ ... # We did the thing!
+ ... break
+ ... n += 1
+ >>> n
+ 3
+
+ >>> n = 0
+ >>> for _ in retrier(sleeptime=0, jitter=0):
+ ... if n == 6:
+ ... # We did the thing!
+ ... break
+ ... n += 1
+ ... else:
+ ... print("max tries hit")
+ max tries hit
+ """
+ jitter = jitter or 0 # py35 barfs on the next line if jitter is None
+ if jitter > sleeptime:
+ # To prevent negative sleep times
+ raise Exception(
+ "jitter ({}) must be less than sleep time ({})".format(jitter, sleeptime)
+ )
+
+ sleeptime_real = sleeptime
+ for _ in range(attempts):
+ log("attempt %i/%i" % (_ + 1, attempts))
+
+ yield sleeptime_real
+
+ if jitter:
+ sleeptime_real = sleeptime + random.randint(-jitter, jitter)
+ # our jitter should scale along with the sleeptime
+ jitter = int(jitter * sleepscale)
+ else:
+ sleeptime_real = sleeptime
+
+ sleeptime *= sleepscale
+
+ if sleeptime_real > max_sleeptime:
+ sleeptime_real = max_sleeptime
+
+ # Don't need to sleep the last time
+ if _ < attempts - 1:
+ log(
+ "sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts)
+ )
+ time.sleep(sleeptime_real)
+
+
+def stream_download(url, sha256=None, size=None, headers=None):
+ """Download a URL to a generator, optionally with content verification.
+
+ If ``sha256`` or ``size`` are defined, the downloaded URL will be
+ validated against those requirements and ``IntegrityError`` will be
+ raised if expectations do not match.
+
+ Because verification cannot occur until the file is completely downloaded
+ it is recommended for consumers to not do anything meaningful with the
+ data if content verification is being used. To securely handle retrieved
+ content, it should be streamed to a file or memory and only operated
+ on after the generator is exhausted without raising.
+ """
+ log("Downloading %s" % url)
+ headers = headers or []
+
+ h = hashlib.sha256()
+ length = 0
+
+ t0 = time.time()
+ req_headers = {}
+ for header in headers:
+ key, val = header.split(":")
+ req_headers[key.strip()] = val.strip()
+
+ req = urllib.request.Request(url, None, req_headers)
+ with urllib.request.urlopen(
+ req, timeout=60, cafile=certifi.where()
+ ) if certifi else urllib.request.urlopen(req, timeout=60) as fh:
+ if not url.endswith(".gz") and fh.info().get("Content-Encoding") == "gzip":
+ fh = gzip.GzipFile(fileobj=fh)
+
+ while True:
+ chunk = fh.read(65536)
+ if not chunk:
+ break
+
+ h.update(chunk)
+ length += len(chunk)
+
+ yield chunk
+
+ duration = time.time() - t0
+ digest = h.hexdigest()
+
+ log(
+ "%s resolved to %d bytes with sha256 %s in %.3fs"
+ % (url, length, digest, duration)
+ )
+
+ if size:
+ if size == length:
+ log("Verified size of %s" % url)
+ else:
+ raise IntegrityError(
+ "size mismatch on %s: wanted %d; got %d" % (url, size, length)
+ )
+
+ if sha256:
+ if digest == sha256:
+ log("Verified sha256 integrity of %s" % url)
+ else:
+ raise IntegrityError(
+ "sha256 mismatch on %s: wanted %s; got %s" % (url, sha256, digest)
+ )
+
+
+def download_to_path(url, path, sha256=None, size=None, headers=None):
+ """Download a URL to a filesystem path, possibly with verification."""
+
+ # We download to a temporary file and rename at the end so there's
+ # no chance of the final file being partially written or containing
+ # bad data.
+ try:
+ path.unlink()
+ except FileNotFoundError:
+ pass
+
+ for _ in retrier(attempts=5, sleeptime=60):
+ try:
+ log("Downloading %s to %s" % (url, path))
+
+ with rename_after_close(path, "wb") as fh:
+ for chunk in stream_download(
+ url, sha256=sha256, size=size, headers=headers
+ ):
+ fh.write(chunk)
+
+ return
+ except IntegrityError:
+ raise
+ except Exception as e:
+ log("Download failed: {}".format(e))
+ continue
+
+ raise Exception("Download failed, no more retries!")
+
+
+def download_to_memory(url, sha256=None, size=None):
+ """Download a URL to memory, possibly with verification."""
+
+ data = b""
+ for _ in retrier(attempts=5, sleeptime=60):
+ try:
+ log("Downloading %s" % (url))
+
+ for chunk in stream_download(url, sha256=sha256, size=size):
+ data += chunk
+
+ return data
+ except IntegrityError:
+ raise
+ except Exception as e:
+ log("Download failed: {}".format(e))
+ continue
+
+ raise Exception("Download failed, no more retries!")
+
+
+def gpg_verify_path(path: pathlib.Path, public_key_data: bytes, signature_data: bytes):
+ """Verify that a filesystem path verifies using GPG.
+
+ Takes a Path defining a file to verify. ``public_key_data`` contains
+ bytes with GPG public key data. ``signature_data`` contains a signed
+ GPG document to use with ``gpg --verify``.
+ """
+ log("Validating GPG signature of %s" % path)
+ log("GPG key data:\n%s" % public_key_data.decode("ascii"))
+
+ with tempfile.TemporaryDirectory() as td:
+ try:
+ # --batch since we're running unattended.
+ gpg_args = ["gpg", "--homedir", td, "--batch"]
+
+ log("Importing GPG key...")
+ subprocess.run(gpg_args + ["--import"], input=public_key_data, check=True)
+
+ log("Verifying GPG signature...")
+ subprocess.run(
+ gpg_args + ["--verify", "-", "%s" % path],
+ input=signature_data,
+ check=True,
+ )
+
+ log("GPG signature verified!")
+ finally:
+ # There is a race between the agent self-terminating and
+ # shutil.rmtree() from the temporary directory cleanup that can
+ # lead to exceptions. Kill the agent before cleanup to prevent this.
+ env = dict(os.environ)
+ env["GNUPGHOME"] = td
+ subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=env)
+
+
+def open_tar_stream(path: pathlib.Path):
+ """"""
+ if path.suffix == ".bz2":
+ return bz2.open(str(path), "rb")
+ elif path.suffix in (".gz", ".tgz") :
+ return gzip.open(str(path), "rb")
+ elif path.suffix == ".xz":
+ return lzma.open(str(path), "rb")
+ elif path.suffix == ".zst":
+ dctx = ZstdDecompressor()
+ return dctx.stream_reader(path.open("rb"))
+ elif path.suffix == ".tar":
+ return path.open("rb")
+ else:
+ raise ValueError("unknown archive format for tar file: %s" % path)
+
+
+def archive_type(path: pathlib.Path):
+ """Attempt to identify a path as an extractable archive."""
+ if path.suffixes[-2:-1] == [".tar"] or path.suffixes[-1:] == [".tgz"]:
+ return "tar"
+ elif path.suffix == ".zip":
+ return "zip"
+ else:
+ return None
+
+
+def extract_archive(path, dest_dir, typ):
+ """Extract an archive to a destination directory."""
+
+ # Resolve paths to absolute variants.
+ path = path.resolve()
+ dest_dir = dest_dir.resolve()
+
+ log("Extracting %s to %s" % (path, dest_dir))
+ t0 = time.time()
+
+ # We pipe input to the decompressor program so that we can apply
+ # custom decompressors that the program may not know about.
+ if typ == "tar":
+ ifh = open_tar_stream(path)
+ # On Windows, the tar program doesn't support things like symbolic
+ # links, while Windows actually support them. The tarfile module in
+ # python does. So use that. But since it's significantly slower than
+ # the tar program on Linux, only use tarfile on Windows (tarfile is
+ # also not much slower on Windows, presumably because of the
+ # notoriously bad I/O).
+ if sys.platform == "win32":
+ tar = tarfile.open(fileobj=ifh, mode="r|")
+ tar.extractall(str(dest_dir))
+ args = []
+ else:
+ args = ["tar", "xf", "-"]
+ pipe_stdin = True
+ elif typ == "zip":
+ # unzip from stdin has wonky behavior. We don't use a pipe for it.
+ ifh = open(os.devnull, "rb")
+ args = ["unzip", "-q", "-o", str(path)]
+ pipe_stdin = False
+ else:
+ raise ValueError("unknown archive format: %s" % path)
+
+ if args:
+ with ifh, subprocess.Popen(
+ args, cwd=str(dest_dir), bufsize=0, stdin=subprocess.PIPE
+ ) as p:
+ while True:
+ if not pipe_stdin:
+ break
+
+ chunk = ifh.read(131072)
+ if not chunk:
+ break
+
+ p.stdin.write(chunk)
+
+ if p.returncode:
+ raise Exception("%r exited %d" % (args, p.returncode))
+
+ log("%s extracted in %.3fs" % (path, time.time() - t0))
+
+
+def repack_archive(
+ orig: pathlib.Path, dest: pathlib.Path, strip_components=0, prefix=""
+):
+ assert orig != dest
+ log("Repacking as %s" % dest)
+ orig_typ = archive_type(orig)
+ typ = archive_type(dest)
+ if not orig_typ:
+ raise Exception("Archive type not supported for %s" % orig.name)
+ if not typ:
+ raise Exception("Archive type not supported for %s" % dest.name)
+
+ if dest.suffixes[-2:] != [".tar", ".zst"]:
+ raise Exception("Only producing .tar.zst archives is supported.")
+
+ if strip_components or prefix:
+
+ def filter(name):
+ if strip_components:
+ stripped = "/".join(name.split("/")[strip_components:])
+ if not stripped:
+ raise Exception(
+ "Stripping %d components would remove files" % strip_components
+ )
+ name = stripped
+ return prefix + name
+
+ else:
+ filter = None
+
+ with rename_after_close(dest, "wb") as fh:
+ ctx = ZstdCompressor()
+ if orig_typ == "zip":
+ assert typ == "tar"
+ zip = zipfile.ZipFile(orig)
+ # Convert the zip stream to a tar on the fly.
+ with ctx.stream_writer(fh) as compressor, tarfile.open(
+ fileobj=compressor, mode="w:"
+ ) as tar:
+ for zipinfo in zip.infolist():
+ if zipinfo.is_dir():
+ continue
+ tarinfo = tarfile.TarInfo()
+ filename = zipinfo.filename
+ tarinfo.name = filter(filename) if filter else filename
+ tarinfo.size = zipinfo.file_size
+ # Zip files don't have any knowledge of the timezone
+ # they were created in. Which is not really convenient to
+ # reliably convert to a timestamp. But we don't really
+ # care about accuracy, but rather about reproducibility,
+ # so we pick UTC.
+ time = datetime.datetime(
+ *zipinfo.date_time, tzinfo=datetime.timezone.utc
+ )
+ tarinfo.mtime = time.timestamp()
+ # 0 is MS-DOS, 3 is UNIX. Only in the latter case do we
+ # get anything useful for the tar file mode.
+ if zipinfo.create_system == 3:
+ mode = zipinfo.external_attr >> 16
+ else:
+ mode = 0o0644
+ tarinfo.mode = stat.S_IMODE(mode)
+ if stat.S_ISLNK(mode):
+ tarinfo.type = tarfile.SYMTYPE
+ tarinfo.linkname = zip.read(filename).decode()
+ tar.addfile(tarinfo, zip.open(filename))
+ elif stat.S_ISREG(mode) or stat.S_IFMT(mode) == 0:
+ tar.addfile(tarinfo, zip.open(filename))
+ else:
+ raise Exception("Unsupported file mode %o" % stat.S_IFMT(mode))
+
+ elif orig_typ == "tar":
+ if typ == "zip":
+ raise Exception("Repacking a tar to zip is not supported")
+ assert typ == "tar"
+
+ ifh = open_tar_stream(orig)
+ if filter:
+ # To apply the filter, we need to open the tar stream and
+ # tweak it.
+ origtar = tarfile.open(fileobj=ifh, mode="r|")
+ with ctx.stream_writer(fh) as compressor, tarfile.open(
+ fileobj=compressor,
+ mode="w:",
+ format=origtar.format,
+ ) as tar:
+ for tarinfo in origtar:
+ if tarinfo.isdir():
+ continue
+ tarinfo.name = filter(tarinfo.name)
+ if "path" in tarinfo.pax_headers:
+ tarinfo.pax_headers["path"] = filter(
+ tarinfo.pax_headers["path"]
+ )
+ if tarinfo.isfile():
+ tar.addfile(tarinfo, origtar.extractfile(tarinfo))
+ else:
+ tar.addfile(tarinfo)
+ else:
+ # We only change compression here. The tar stream is unchanged.
+ ctx.copy_stream(ifh, fh)
+
+
+def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None):
+ """Fetch a URL and extract it to a destination path.
+
+ If the downloaded URL is an archive, it is extracted automatically
+ and the archive is deleted. Otherwise the file remains in place in
+ the destination directory.
+ """
+
+ basename = urllib.parse.urlparse(url).path.split("/")[-1]
+ dest_path = dest_dir / basename
+
+ download_to_path(url, dest_path, sha256=sha256, size=size)
+
+ if not extract:
+ return
+
+ typ = archive_type(dest_path)
+ if typ:
+ extract_archive(dest_path, dest_dir, typ)
+ log("Removing %s" % dest_path)
+ dest_path.unlink()
+
+
+def fetch_urls(downloads):
+ """Fetch URLs pairs to a pathlib.Path."""
+ with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e:
+ fs = []
+
+ for download in downloads:
+ fs.append(e.submit(fetch_and_extract, *download))
+
+ for f in fs:
+ f.result()
+
+
+def _git_checkout_github_archive(
+ dest_path: pathlib.Path, repo: str, commit: str, prefix: str
+):
+ "Use github archive generator to speed up github git repo cloning"
+ repo = repo.rstrip("/")
+ github_url = "{repo}/archive/{commit}.tar.gz".format(**locals())
+
+ with tempfile.TemporaryDirectory() as td:
+ temp_dir = pathlib.Path(td)
+ dl_dest = temp_dir / "archive.tar.gz"
+ download_to_path(github_url, dl_dest)
+ repack_archive(dl_dest, dest_path, strip_components=1, prefix=prefix + "/")
+
+
+def _github_submodule_required(repo: str, commit: str):
+ "Use github API to check if submodules are used"
+ url = "{repo}/blob/{commit}/.gitmodules".format(**locals())
+ try:
+ status_code = urllib.request.urlopen(url).getcode()
+ return status_code == 200
+ except:
+ return False
+
+
+def git_checkout_archive(
+ dest_path: pathlib.Path,
+ repo: str,
+ commit: str,
+ prefix=None,
+ ssh_key=None,
+ include_dot_git=False,
+):
+ """Produce an archive of the files comprising a Git checkout."""
+ dest_path.parent.mkdir(parents=True, exist_ok=True)
+
+ if not prefix:
+ prefix = repo.rstrip("/").rsplit("/", 1)[-1]
+
+ if dest_path.suffixes[-2:] != [".tar", ".zst"]:
+ raise Exception("Only producing .tar.zst archives is supported.")
+
+ if repo.startswith("https://github.com/"):
+ if not include_dot_git and not _github_submodule_required(repo, commit):
+ log("Using github archive service to speedup archive creation")
+ # Always log sha1 info, either from commit or resolved from repo.
+ if re.match(r"^[a-fA-F0-9]{40}$", commit):
+ revision = commit
+ else:
+ ref_output = subprocess.check_output(["git", "ls-remote", repo,
+ 'refs/heads/' + commit])
+ revision, _ = ref_output.decode().split(maxsplit=1)
+ log("Fetching revision {}".format(revision))
+ return _git_checkout_github_archive(dest_path, repo, commit, prefix)
+
+ with tempfile.TemporaryDirectory() as td:
+ temp_dir = pathlib.Path(td)
+
+ git_dir = temp_dir / prefix
+
+ # This could be faster with a shallow clone. However, Git requires a ref
+ # to initiate a clone. Since the commit-ish may not refer to a ref, we
+ # simply perform a full clone followed by a checkout.
+ print("cloning %s to %s" % (repo, git_dir))
+
+ env = os.environ.copy()
+ keypath = ""
+ if ssh_key:
+ taskcluster_secret_url = api(
+ os.environ.get("TASKCLUSTER_PROXY_URL"),
+ "secrets",
+ "v1",
+ "secret/{keypath}".format(keypath=ssh_key),
+ )
+ taskcluster_secret = b"".join(stream_download(taskcluster_secret_url))
+ taskcluster_secret = json.loads(taskcluster_secret)
+ sshkey = taskcluster_secret["secret"]["ssh_privkey"]
+
+ keypath = temp_dir.joinpath("ssh-key")
+ keypath.write_text(sshkey)
+ keypath.chmod(0o600)
+
+ env = {
+ "GIT_SSH_COMMAND": "ssh -o 'StrictHostKeyChecking no' -i {keypath}".format(
+ keypath=keypath
+ )
+ }
+
+ subprocess.run(["git", "clone", "-n", repo, str(git_dir)], check=True, env=env)
+
+ # Always use a detached head so that git prints out what it checked out.
+ subprocess.run(
+ ["git", "checkout", "--detach", commit], cwd=str(git_dir), check=True
+ )
+
+ # When including the .git, we want --depth 1, but a direct clone would not
+ # necessarily be able to give us the right commit.
+ if include_dot_git:
+ initial_clone = git_dir.with_name(git_dir.name + ".orig")
+ git_dir.rename(initial_clone)
+ subprocess.run(
+ [
+ "git",
+ "clone",
+ "file://" + str(initial_clone),
+ str(git_dir),
+ "--depth",
+ "1",
+ ],
+ check=True,
+ )
+ subprocess.run(
+ ["git", "remote", "set-url", "origin", repo],
+ cwd=str(git_dir),
+ check=True,
+ )
+
+ # --depth 1 can induce more work on the server side, so only use it for
+ # submodule initialization when we want to keep the .git directory.
+ depth = ["--depth", "1"] if include_dot_git else []
+ subprocess.run(
+ ["git", "submodule", "update", "--init"] + depth,
+ cwd=str(git_dir),
+ check=True,
+ )
+
+ if keypath:
+ os.remove(keypath)
+
+ print("creating archive %s of commit %s" % (dest_path, commit))
+ exclude_dot_git = [] if include_dot_git else ["--exclude=.git"]
+ proc = subprocess.Popen(
+ [
+ "tar",
+ "cf",
+ "-",
+ ]
+ + exclude_dot_git
+ + [
+ "-C",
+ str(temp_dir),
+ prefix,
+ ],
+ stdout=subprocess.PIPE,
+ )
+
+ with rename_after_close(dest_path, "wb") as out:
+ ctx = ZstdCompressor()
+ ctx.copy_stream(proc.stdout, out)
+
+ proc.wait()
+
+
+def command_git_checkout_archive(args):
+ dest = pathlib.Path(args.dest)
+
+ try:
+ git_checkout_archive(
+ dest,
+ args.repo,
+ args.commit,
+ prefix=args.path_prefix,
+ ssh_key=args.ssh_key_secret,
+ include_dot_git=args.include_dot_git,
+ )
+ except Exception:
+ try:
+ dest.unlink()
+ except FileNotFoundError:
+ pass
+
+ raise
+
+
+def command_static_url(args):
+ gpg_sig_url = args.gpg_sig_url
+ gpg_env_key = args.gpg_key_env
+
+ if bool(gpg_sig_url) != bool(gpg_env_key):
+ print("--gpg-sig-url and --gpg-key-env must both be defined")
+ return 1
+
+ if gpg_sig_url:
+ gpg_signature = b"".join(stream_download(gpg_sig_url))
+ gpg_key = os.environb[gpg_env_key.encode("ascii")]
+
+ dest = pathlib.Path(args.dest)
+ dest.parent.mkdir(parents=True, exist_ok=True)
+
+ basename = urllib.parse.urlparse(args.url).path.split("/")[-1]
+ if basename.endswith("".join(dest.suffixes)):
+ dl_dest = dest
+ else:
+ dl_dest = dest.parent / basename
+
+ try:
+ download_to_path(
+ args.url, dl_dest, sha256=args.sha256, size=args.size, headers=args.headers
+ )
+
+ if gpg_sig_url:
+ gpg_verify_path(dl_dest, gpg_key, gpg_signature)
+
+ if dl_dest != dest or args.strip_components or args.add_prefix:
+ repack_archive(dl_dest, dest, args.strip_components, args.add_prefix)
+ except Exception:
+ try:
+ dl_dest.unlink()
+ except FileNotFoundError:
+ pass
+
+ raise
+
+ if dl_dest != dest:
+ log("Removing %s" % dl_dest)
+ dl_dest.unlink()
+
+
+def api(root_url, service, version, path):
+ # taskcluster-lib-urls is not available when this script runs, so
+ # simulate its behavior:
+ return "{root_url}/api/{service}/{version}/{path}".format(
+ root_url=root_url, service=service, version=version, path=path
+ )
+
+
+def get_hash(fetch, root_url):
+ path = "task/{task}/artifacts/{artifact}".format(
+ task=fetch["task"], artifact="public/chain-of-trust.json"
+ )
+ url = api(root_url, "queue", "v1", path)
+ cot = json.loads(download_to_memory(url))
+ return cot["artifacts"][fetch["artifact"]]["sha256"]
+
+
+def command_task_artifacts(args):
+ start = time.monotonic()
+ fetches = json.loads(os.environ["MOZ_FETCHES"])
+ downloads = []
+ for fetch in fetches:
+ extdir = pathlib.Path(args.dest)
+ if "dest" in fetch:
+ # Note: normpath doesn't like pathlib.Path in python 3.5
+ extdir = pathlib.Path(os.path.normpath(str(extdir.joinpath(fetch["dest"]))))
+ extdir.mkdir(parents=True, exist_ok=True)
+ root_url = os.environ["TASKCLUSTER_ROOT_URL"]
+ sha256 = None
+ if fetch.get("verify-hash"):
+ sha256 = get_hash(fetch, root_url)
+ if fetch["artifact"].startswith("public/"):
+ path = "task/{task}/artifacts/{artifact}".format(
+ task=fetch["task"], artifact=fetch["artifact"]
+ )
+ url = api(root_url, "queue", "v1", path)
+ else:
+ url = ("{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}").format(
+ proxy_url=os.environ["TASKCLUSTER_PROXY_URL"],
+ task=fetch["task"],
+ artifact=fetch["artifact"],
+ )
+ downloads.append((url, extdir, fetch["extract"], sha256))
+
+ fetch_urls(downloads)
+ end = time.monotonic()
+
+ perfherder_data = {
+ "framework": {"name": "build_metrics"},
+ "suites": [
+ {
+ "name": "fetch_content",
+ "value": end - start,
+ "lowerIsBetter": True,
+ "shouldAlert": False,
+ "subtests": [],
+ }
+ ],
+ }
+ print("PERFHERDER_DATA: {}".format(json.dumps(perfherder_data)), file=sys.stderr)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ subparsers = parser.add_subparsers(title="sub commands")
+
+ git_checkout = subparsers.add_parser(
+ "git-checkout-archive",
+ help="Obtain an archive of files from a Git repository checkout",
+ )
+ git_checkout.set_defaults(func=command_git_checkout_archive)
+ git_checkout.add_argument(
+ "--path-prefix", help="Prefix for paths in produced archive"
+ )
+ git_checkout.add_argument("repo", help="URL to Git repository to be cloned")
+ git_checkout.add_argument("commit", help="Git commit to check out")
+ git_checkout.add_argument("dest", help="Destination path of archive")
+ git_checkout.add_argument(
+ "--ssh-key-secret", help="The scope path of the ssh key to used for checkout"
+ )
+ git_checkout.add_argument(
+ "--include-dot-git", action="store_true", help="Include the .git directory"
+ )
+
+ url = subparsers.add_parser("static-url", help="Download a static URL")
+ url.set_defaults(func=command_static_url)
+ url.add_argument("--sha256", required=True, help="SHA-256 of downloaded content")
+ url.add_argument(
+ "--size", required=True, type=int, help="Size of downloaded content, in bytes"
+ )
+ url.add_argument(
+ "--gpg-sig-url",
+ help="URL containing signed GPG document validating " "URL to fetch",
+ )
+ url.add_argument(
+ "--gpg-key-env", help="Environment variable containing GPG key to validate"
+ )
+ url.add_argument(
+ "--strip-components",
+ type=int,
+ default=0,
+ help="Number of leading components to strip from file "
+ "names in the downloaded archive",
+ )
+ url.add_argument(
+ "--add-prefix",
+ default="",
+ help="Prefix to add to file names in the downloaded " "archive",
+ )
+ url.add_argument(
+ "-H",
+ "--header",
+ default=[],
+ action="append",
+ dest="headers",
+ help="Header to send as part of the request, can be passed " "multiple times",
+ )
+ url.add_argument("url", help="URL to fetch")
+ url.add_argument("dest", help="Destination path")
+
+ artifacts = subparsers.add_parser("task-artifacts", help="Fetch task artifacts")
+ artifacts.set_defaults(func=command_task_artifacts)
+ artifacts.add_argument(
+ "-d",
+ "--dest",
+ default=os.environ.get("MOZ_FETCHES_DIR"),
+ help="Destination directory which will contain all "
+ "artifacts (defaults to $MOZ_FETCHES_DIR)",
+ )
+
+ args = parser.parse_args()
+
+ if not args.dest:
+ parser.error(
+ "no destination directory specified, either pass in --dest "
+ "or set $MOZ_FETCHES_DIR"
+ )
+
+ return args.func(args)
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc
new file mode 100755
index 0000000000..f6a2f6643c
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/hgrc
@@ -0,0 +1,33 @@
+# By default the progress bar starts after 3s and updates every 0.1s. We
+# change this so it shows and updates every 1.0s.
+# We also tell progress to assume a TTY is present so updates are printed
+# even if there is no known TTY.
+[progress]
+delay = 1.0
+refresh = 1.0
+assume-tty = true
+
+[extensions]
+share =
+sparse =
+robustcheckout = /usr/local/mercurial/robustcheckout.py
+
+[hostsecurity]
+# When running a modern Python, Mercurial will default to TLS 1.1+.
+# When running on a legacy Python, Mercurial will default to TLS 1.0+.
+# There is no good reason we shouldn't be running a modern Python
+# capable of speaking TLS 1.2. And the only Mercurial servers we care
+# about should be running TLS 1.2. So make TLS 1.2 the minimum.
+minimumprotocol = tls1.2
+
+# Settings to make 1-click loaners more useful.
+[extensions]
+histedit =
+rebase =
+
+[diff]
+git = 1
+showfunc = 1
+
+[pager]
+pager = LESS=FRSXQ less
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py
new file mode 100644
index 0000000000..b5d2230211
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/robustcheckout.py
@@ -0,0 +1,860 @@
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+"""Robustly perform a checkout.
+
+This extension provides the ``hg robustcheckout`` command for
+ensuring a working directory is updated to the specified revision
+from a source repo using best practices to ensure optimal clone
+times and storage efficiency.
+"""
+
+from __future__ import absolute_import
+
+import contextlib
+import json
+import os
+import random
+import re
+import socket
+import ssl
+import time
+
+from mercurial.i18n import _
+from mercurial.node import hex, nullid
+from mercurial import (
+ commands,
+ configitems,
+ error,
+ exchange,
+ extensions,
+ hg,
+ match as matchmod,
+ pycompat,
+ registrar,
+ scmutil,
+ urllibcompat,
+ util,
+ vfs,
+)
+
+# Causes worker to purge caches on process exit and for task to retry.
+EXIT_PURGE_CACHE = 72
+
+testedwith = (
+ b"4.5 4.6 4.7 4.8 4.9 5.0 5.1 5.2 5.3 5.4 5.5 5.6 5.7 5.8 5.9 6.0 6.1 6.2 6.3 6.4"
+)
+minimumhgversion = b"4.5"
+
+cmdtable = {}
+command = registrar.command(cmdtable)
+
+configtable = {}
+configitem = registrar.configitem(configtable)
+
+configitem(b"robustcheckout", b"retryjittermin", default=configitems.dynamicdefault)
+configitem(b"robustcheckout", b"retryjittermax", default=configitems.dynamicdefault)
+
+
+def getsparse():
+ from mercurial import sparse
+
+ return sparse
+
+
+def peerlookup(remote, v):
+ with remote.commandexecutor() as e:
+ return e.callcommand(b"lookup", {b"key": v}).result()
+
+
+@command(
+ b"robustcheckout",
+ [
+ (b"", b"upstream", b"", b"URL of upstream repo to clone from"),
+ (b"r", b"revision", b"", b"Revision to check out"),
+ (b"b", b"branch", b"", b"Branch to check out"),
+ (b"", b"purge", False, b"Whether to purge the working directory"),
+ (b"", b"sharebase", b"", b"Directory where shared repos should be placed"),
+ (
+ b"",
+ b"networkattempts",
+ 3,
+ b"Maximum number of attempts for network " b"operations",
+ ),
+ (b"", b"sparseprofile", b"", b"Sparse checkout profile to use (path in repo)"),
+ (
+ b"U",
+ b"noupdate",
+ False,
+ b"the clone will include an empty working directory\n"
+ b"(only a repository)",
+ ),
+ ],
+ b"[OPTION]... URL DEST",
+ norepo=True,
+)
+def robustcheckout(
+ ui,
+ url,
+ dest,
+ upstream=None,
+ revision=None,
+ branch=None,
+ purge=False,
+ sharebase=None,
+ networkattempts=None,
+ sparseprofile=None,
+ noupdate=False,
+):
+ """Ensure a working copy has the specified revision checked out.
+
+ Repository data is automatically pooled into the common directory
+ specified by ``--sharebase``, which is a required argument. It is required
+ because pooling storage prevents excessive cloning, which makes operations
+ complete faster.
+
+ One of ``--revision`` or ``--branch`` must be specified. ``--revision``
+ is preferred, as it is deterministic and there is no ambiguity as to which
+ revision will actually be checked out.
+
+ If ``--upstream`` is used, the repo at that URL is used to perform the
+ initial clone instead of cloning from the repo where the desired revision
+ is located.
+
+ ``--purge`` controls whether to removed untracked and ignored files from
+ the working directory. If used, the end state of the working directory
+ should only contain files explicitly under version control for the requested
+ revision.
+
+ ``--sparseprofile`` can be used to specify a sparse checkout profile to use.
+ The sparse checkout profile corresponds to a file in the revision to be
+ checked out. If a previous sparse profile or config is present, it will be
+ replaced by this sparse profile. We choose not to "widen" the sparse config
+ so operations are as deterministic as possible. If an existing checkout
+ is present and it isn't using a sparse checkout, we error. This is to
+ prevent accidentally enabling sparse on a repository that may have
+ clients that aren't sparse aware. Sparse checkout support requires Mercurial
+ 4.3 or newer and the ``sparse`` extension must be enabled.
+ """
+ if not revision and not branch:
+ raise error.Abort(b"must specify one of --revision or --branch")
+
+ if revision and branch:
+ raise error.Abort(b"cannot specify both --revision and --branch")
+
+ # Require revision to look like a SHA-1.
+ if revision:
+ if (
+ len(revision) < 12
+ or len(revision) > 40
+ or not re.match(b"^[a-f0-9]+$", revision)
+ ):
+ raise error.Abort(
+ b"--revision must be a SHA-1 fragment 12-40 " b"characters long"
+ )
+
+ sharebase = sharebase or ui.config(b"share", b"pool")
+ if not sharebase:
+ raise error.Abort(
+ b"share base directory not defined; refusing to operate",
+ hint=b"define share.pool config option or pass --sharebase",
+ )
+
+ # Sparse profile support was added in Mercurial 4.3, where it was highly
+ # experimental. Because of the fragility of it, we only support sparse
+ # profiles on 4.3. When 4.4 is released, we'll need to opt in to sparse
+ # support. We /could/ silently fall back to non-sparse when not supported.
+ # However, given that sparse has performance implications, we want to fail
+ # fast if we can't satisfy the desired checkout request.
+ if sparseprofile:
+ try:
+ extensions.find(b"sparse")
+ except KeyError:
+ raise error.Abort(
+ b"sparse extension must be enabled to use " b"--sparseprofile"
+ )
+
+ ui.warn(b"(using Mercurial %s)\n" % util.version())
+
+ # worker.backgroundclose only makes things faster if running anti-virus,
+ # which our automation doesn't. Disable it.
+ ui.setconfig(b"worker", b"backgroundclose", False)
+ # Don't wait forever if the connection hangs
+ ui.setconfig(b"http", b"timeout", 600)
+
+ # By default the progress bar starts after 3s and updates every 0.1s. We
+ # change this so it shows and updates every 1.0s.
+ # We also tell progress to assume a TTY is present so updates are printed
+ # even if there is no known TTY.
+ # We make the config change here instead of in a config file because
+ # otherwise we're at the whim of whatever configs are used in automation.
+ ui.setconfig(b"progress", b"delay", 1.0)
+ ui.setconfig(b"progress", b"refresh", 1.0)
+ ui.setconfig(b"progress", b"assume-tty", True)
+
+ sharebase = os.path.realpath(sharebase)
+
+ optimes = []
+ behaviors = set()
+ start = time.time()
+
+ try:
+ return _docheckout(
+ ui,
+ url,
+ dest,
+ upstream,
+ revision,
+ branch,
+ purge,
+ sharebase,
+ optimes,
+ behaviors,
+ networkattempts,
+ sparse_profile=sparseprofile,
+ noupdate=noupdate,
+ )
+ finally:
+ overall = time.time() - start
+
+ # We store the overall time multiple ways in order to help differentiate
+ # the various "flavors" of operations.
+
+ # ``overall`` is always the total operation time.
+ optimes.append(("overall", overall))
+
+ def record_op(name):
+ # If special behaviors due to "corrupt" storage occur, we vary the
+ # name to convey that.
+ if "remove-store" in behaviors:
+ name += "_rmstore"
+ if "remove-wdir" in behaviors:
+ name += "_rmwdir"
+
+ optimes.append((name, overall))
+
+ # We break out overall operations primarily by their network interaction
+ # We have variants within for working directory operations.
+ if "clone" in behaviors and "create-store" in behaviors:
+ record_op("overall_clone")
+
+ if "sparse-update" in behaviors:
+ record_op("overall_clone_sparsecheckout")
+ else:
+ record_op("overall_clone_fullcheckout")
+
+ elif "pull" in behaviors or "clone" in behaviors:
+ record_op("overall_pull")
+
+ if "sparse-update" in behaviors:
+ record_op("overall_pull_sparsecheckout")
+ else:
+ record_op("overall_pull_fullcheckout")
+
+ if "empty-wdir" in behaviors:
+ record_op("overall_pull_emptywdir")
+ else:
+ record_op("overall_pull_populatedwdir")
+
+ else:
+ record_op("overall_nopull")
+
+ if "sparse-update" in behaviors:
+ record_op("overall_nopull_sparsecheckout")
+ else:
+ record_op("overall_nopull_fullcheckout")
+
+ if "empty-wdir" in behaviors:
+ record_op("overall_nopull_emptywdir")
+ else:
+ record_op("overall_nopull_populatedwdir")
+
+ server_url = urllibcompat.urlreq.urlparse(url).netloc
+
+ if "TASKCLUSTER_INSTANCE_TYPE" in os.environ:
+ perfherder = {
+ "framework": {
+ "name": "vcs",
+ },
+ "suites": [],
+ }
+ for op, duration in optimes:
+ perfherder["suites"].append(
+ {
+ "name": op,
+ "value": duration,
+ "lowerIsBetter": True,
+ "shouldAlert": False,
+ "serverUrl": server_url.decode("utf-8"),
+ "hgVersion": util.version().decode("utf-8"),
+ "extraOptions": [os.environ["TASKCLUSTER_INSTANCE_TYPE"]],
+ "subtests": [],
+ }
+ )
+ ui.write(
+ b"PERFHERDER_DATA: %s\n"
+ % pycompat.bytestr(json.dumps(perfherder, sort_keys=True))
+ )
+
+
+def _docheckout(
+ ui,
+ url,
+ dest,
+ upstream,
+ revision,
+ branch,
+ purge,
+ sharebase,
+ optimes,
+ behaviors,
+ networkattemptlimit,
+ networkattempts=None,
+ sparse_profile=None,
+ noupdate=False,
+):
+ if not networkattempts:
+ networkattempts = [1]
+
+ def callself():
+ return _docheckout(
+ ui,
+ url,
+ dest,
+ upstream,
+ revision,
+ branch,
+ purge,
+ sharebase,
+ optimes,
+ behaviors,
+ networkattemptlimit,
+ networkattempts=networkattempts,
+ sparse_profile=sparse_profile,
+ noupdate=noupdate,
+ )
+
+ @contextlib.contextmanager
+ def timeit(op, behavior):
+ behaviors.add(behavior)
+ errored = False
+ try:
+ start = time.time()
+ yield
+ except Exception:
+ errored = True
+ raise
+ finally:
+ elapsed = time.time() - start
+
+ if errored:
+ op += "_errored"
+
+ optimes.append((op, elapsed))
+
+ ui.write(b"ensuring %s@%s is available at %s\n" % (url, revision or branch, dest))
+
+ # We assume that we're the only process on the machine touching the
+ # repository paths that we were told to use. This means our recovery
+ # scenario when things aren't "right" is to just nuke things and start
+ # from scratch. This is easier to implement than verifying the state
+ # of the data and attempting recovery. And in some scenarios (such as
+ # potential repo corruption), it is probably faster, since verifying
+ # repos can take a while.
+
+ destvfs = vfs.vfs(dest, audit=False, realpath=True)
+
+ def deletesharedstore(path=None):
+ storepath = path or destvfs.read(b".hg/sharedpath").strip()
+ if storepath.endswith(b".hg"):
+ storepath = os.path.dirname(storepath)
+
+ storevfs = vfs.vfs(storepath, audit=False)
+ storevfs.rmtree(forcibly=True)
+
+ if destvfs.exists() and not destvfs.exists(b".hg"):
+ raise error.Abort(b"destination exists but no .hg directory")
+
+ # Refuse to enable sparse checkouts on existing checkouts. The reasoning
+ # here is that another consumer of this repo may not be sparse aware. If we
+ # enabled sparse, we would lock them out.
+ if destvfs.exists() and sparse_profile and not destvfs.exists(b".hg/sparse"):
+ raise error.Abort(
+ b"cannot enable sparse profile on existing " b"non-sparse checkout",
+ hint=b"use a separate working directory to use sparse",
+ )
+
+ # And the other direction for symmetry.
+ if not sparse_profile and destvfs.exists(b".hg/sparse"):
+ raise error.Abort(
+ b"cannot use non-sparse checkout on existing sparse " b"checkout",
+ hint=b"use a separate working directory to use sparse",
+ )
+
+ # Require checkouts to be tied to shared storage because efficiency.
+ if destvfs.exists(b".hg") and not destvfs.exists(b".hg/sharedpath"):
+ ui.warn(b"(destination is not shared; deleting)\n")
+ with timeit("remove_unshared_dest", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ # Verify the shared path exists and is using modern pooled storage.
+ if destvfs.exists(b".hg/sharedpath"):
+ storepath = destvfs.read(b".hg/sharedpath").strip()
+
+ ui.write(b"(existing repository shared store: %s)\n" % storepath)
+
+ if not os.path.exists(storepath):
+ ui.warn(b"(shared store does not exist; deleting destination)\n")
+ with timeit("removed_missing_shared_store", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+ elif not re.search(b"[a-f0-9]{40}/\.hg$", storepath.replace(b"\\", b"/")):
+ ui.warn(
+ b"(shared store does not belong to pooled storage; "
+ b"deleting destination to improve efficiency)\n"
+ )
+ with timeit("remove_unpooled_store", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ if destvfs.isfileorlink(b".hg/wlock"):
+ ui.warn(
+ b"(dest has an active working directory lock; assuming it is "
+ b"left over from a previous process and that the destination "
+ b"is corrupt; deleting it just to be sure)\n"
+ )
+ with timeit("remove_locked_wdir", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ def handlerepoerror(e):
+ if pycompat.bytestr(e) == _(b"abandoned transaction found"):
+ ui.warn(b"(abandoned transaction found; trying to recover)\n")
+ repo = hg.repository(ui, dest)
+ if not repo.recover():
+ ui.warn(b"(could not recover repo state; " b"deleting shared store)\n")
+ with timeit("remove_unrecovered_shared_store", "remove-store"):
+ deletesharedstore()
+
+ ui.warn(b"(attempting checkout from beginning)\n")
+ return callself()
+
+ raise
+
+ # At this point we either have an existing working directory using
+ # shared, pooled storage or we have nothing.
+
+ def handlenetworkfailure():
+ if networkattempts[0] >= networkattemptlimit:
+ raise error.Abort(
+ b"reached maximum number of network attempts; " b"giving up\n"
+ )
+
+ ui.warn(
+ b"(retrying after network failure on attempt %d of %d)\n"
+ % (networkattempts[0], networkattemptlimit)
+ )
+
+ # Do a backoff on retries to mitigate the thundering herd
+ # problem. This is an exponential backoff with a multipler
+ # plus random jitter thrown in for good measure.
+ # With the default settings, backoffs will be:
+ # 1) 2.5 - 6.5
+ # 2) 5.5 - 9.5
+ # 3) 11.5 - 15.5
+ backoff = (2 ** networkattempts[0] - 1) * 1.5
+ jittermin = ui.configint(b"robustcheckout", b"retryjittermin", 1000)
+ jittermax = ui.configint(b"robustcheckout", b"retryjittermax", 5000)
+ backoff += float(random.randint(jittermin, jittermax)) / 1000.0
+ ui.warn(b"(waiting %.2fs before retry)\n" % backoff)
+ time.sleep(backoff)
+
+ networkattempts[0] += 1
+
+ def handlepullerror(e):
+ """Handle an exception raised during a pull.
+
+ Returns True if caller should call ``callself()`` to retry.
+ """
+ if isinstance(e, error.Abort):
+ if e.args[0] == _(b"repository is unrelated"):
+ ui.warn(b"(repository is unrelated; deleting)\n")
+ destvfs.rmtree(forcibly=True)
+ return True
+ elif e.args[0].startswith(_(b"stream ended unexpectedly")):
+ ui.warn(b"%s\n" % e.args[0])
+ # Will raise if failure limit reached.
+ handlenetworkfailure()
+ return True
+ # TODO test this branch
+ elif isinstance(e, error.ResponseError):
+ if e.args[0].startswith(_(b"unexpected response from remote server:")):
+ ui.warn(b"(unexpected response from remote server; retrying)\n")
+ destvfs.rmtree(forcibly=True)
+ # Will raise if failure limit reached.
+ handlenetworkfailure()
+ return True
+ elif isinstance(e, ssl.SSLError):
+ # Assume all SSL errors are due to the network, as Mercurial
+ # should convert non-transport errors like cert validation failures
+ # to error.Abort.
+ ui.warn(b"ssl error: %s\n" % pycompat.bytestr(str(e)))
+ handlenetworkfailure()
+ return True
+ elif isinstance(e, urllibcompat.urlerr.httperror) and e.code >= 500:
+ ui.warn(b"http error: %s\n" % pycompat.bytestr(str(e.reason)))
+ handlenetworkfailure()
+ return True
+ elif isinstance(e, urllibcompat.urlerr.urlerror):
+ if isinstance(e.reason, socket.error):
+ ui.warn(b"socket error: %s\n" % pycompat.bytestr(str(e.reason)))
+ handlenetworkfailure()
+ return True
+ else:
+ ui.warn(
+ b"unhandled URLError; reason type: %s; value: %s\n"
+ % (
+ pycompat.bytestr(e.reason.__class__.__name__),
+ pycompat.bytestr(str(e.reason)),
+ )
+ )
+ elif isinstance(e, socket.timeout):
+ ui.warn(b"socket timeout\n")
+ handlenetworkfailure()
+ return True
+ else:
+ ui.warn(
+ b"unhandled exception during network operation; type: %s; "
+ b"value: %s\n"
+ % (pycompat.bytestr(e.__class__.__name__), pycompat.bytestr(str(e)))
+ )
+
+ return False
+
+ # Perform sanity checking of store. We may or may not know the path to the
+ # local store. It depends if we have an existing destvfs pointing to a
+ # share. To ensure we always find a local store, perform the same logic
+ # that Mercurial's pooled storage does to resolve the local store path.
+ cloneurl = upstream or url
+
+ try:
+ clonepeer = hg.peer(ui, {}, cloneurl)
+ rootnode = peerlookup(clonepeer, b"0")
+ except error.RepoLookupError:
+ raise error.Abort(b"unable to resolve root revision from clone " b"source")
+ except (
+ error.Abort,
+ ssl.SSLError,
+ urllibcompat.urlerr.urlerror,
+ socket.timeout,
+ ) as e:
+ if handlepullerror(e):
+ return callself()
+ raise
+
+ if rootnode == nullid:
+ raise error.Abort(b"source repo appears to be empty")
+
+ storepath = os.path.join(sharebase, hex(rootnode))
+ storevfs = vfs.vfs(storepath, audit=False)
+
+ if storevfs.isfileorlink(b".hg/store/lock"):
+ ui.warn(
+ b"(shared store has an active lock; assuming it is left "
+ b"over from a previous process and that the store is "
+ b"corrupt; deleting store and destination just to be "
+ b"sure)\n"
+ )
+ if destvfs.exists():
+ with timeit("remove_dest_active_lock", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ with timeit("remove_shared_store_active_lock", "remove-store"):
+ storevfs.rmtree(forcibly=True)
+
+ if storevfs.exists() and not storevfs.exists(b".hg/requires"):
+ ui.warn(
+ b"(shared store missing requires file; this is a really "
+ b"odd failure; deleting store and destination)\n"
+ )
+ if destvfs.exists():
+ with timeit("remove_dest_no_requires", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ with timeit("remove_shared_store_no_requires", "remove-store"):
+ storevfs.rmtree(forcibly=True)
+
+ if storevfs.exists(b".hg/requires"):
+ requires = set(storevfs.read(b".hg/requires").splitlines())
+ # "share-safe" (enabled by default as of hg 6.1) moved most
+ # requirements to a new file, so we need to look there as well to avoid
+ # deleting and re-cloning each time
+ if b"share-safe" in requires:
+ requires |= set(storevfs.read(b".hg/store/requires").splitlines())
+ # FUTURE when we require generaldelta, this is where we can check
+ # for that.
+ required = {b"dotencode", b"fncache"}
+
+ missing = required - requires
+ if missing:
+ ui.warn(
+ b"(shared store missing requirements: %s; deleting "
+ b"store and destination to ensure optimal behavior)\n"
+ % b", ".join(sorted(missing))
+ )
+ if destvfs.exists():
+ with timeit("remove_dest_missing_requires", "remove-wdir"):
+ destvfs.rmtree(forcibly=True)
+
+ with timeit("remove_shared_store_missing_requires", "remove-store"):
+ storevfs.rmtree(forcibly=True)
+
+ created = False
+
+ if not destvfs.exists():
+ # Ensure parent directories of destination exist.
+ # Mercurial 3.8 removed ensuredirs and made makedirs race safe.
+ if util.safehasattr(util, "ensuredirs"):
+ makedirs = util.ensuredirs
+ else:
+ makedirs = util.makedirs
+
+ makedirs(os.path.dirname(destvfs.base), notindexed=True)
+ makedirs(sharebase, notindexed=True)
+
+ if upstream:
+ ui.write(b"(cloning from upstream repo %s)\n" % upstream)
+
+ if not storevfs.exists():
+ behaviors.add(b"create-store")
+
+ try:
+ with timeit("clone", "clone"):
+ shareopts = {b"pool": sharebase, b"mode": b"identity"}
+ res = hg.clone(
+ ui,
+ {},
+ clonepeer,
+ dest=dest,
+ update=False,
+ shareopts=shareopts,
+ stream=True,
+ )
+ except (
+ error.Abort,
+ ssl.SSLError,
+ urllibcompat.urlerr.urlerror,
+ socket.timeout,
+ ) as e:
+ if handlepullerror(e):
+ return callself()
+ raise
+ except error.RepoError as e:
+ return handlerepoerror(e)
+ except error.RevlogError as e:
+ ui.warn(b"(repo corruption: %s; deleting shared store)\n" % e)
+ with timeit("remove_shared_store_revlogerror", "remote-store"):
+ deletesharedstore()
+ return callself()
+
+ # TODO retry here.
+ if res is None:
+ raise error.Abort(b"clone failed")
+
+ # Verify it is using shared pool storage.
+ if not destvfs.exists(b".hg/sharedpath"):
+ raise error.Abort(b"clone did not create a shared repo")
+
+ created = True
+
+ # The destination .hg directory should exist. Now make sure we have the
+ # wanted revision.
+
+ repo = hg.repository(ui, dest)
+
+ # We only pull if we are using symbolic names or the requested revision
+ # doesn't exist.
+ havewantedrev = False
+
+ if revision:
+ try:
+ ctx = scmutil.revsingle(repo, revision)
+ except error.RepoLookupError:
+ ctx = None
+
+ if ctx:
+ if not ctx.hex().startswith(revision):
+ raise error.Abort(
+ b"--revision argument is ambiguous",
+ hint=b"must be the first 12+ characters of a " b"SHA-1 fragment",
+ )
+
+ checkoutrevision = ctx.hex()
+ havewantedrev = True
+
+ if not havewantedrev:
+ ui.write(b"(pulling to obtain %s)\n" % (revision or branch,))
+
+ remote = None
+ try:
+ remote = hg.peer(repo, {}, url)
+ pullrevs = [peerlookup(remote, revision or branch)]
+ checkoutrevision = hex(pullrevs[0])
+ if branch:
+ ui.warn(
+ b"(remote resolved %s to %s; "
+ b"result is not deterministic)\n" % (branch, checkoutrevision)
+ )
+
+ if checkoutrevision in repo:
+ ui.warn(b"(revision already present locally; not pulling)\n")
+ else:
+ with timeit("pull", "pull"):
+ pullop = exchange.pull(repo, remote, heads=pullrevs)
+ if not pullop.rheads:
+ raise error.Abort(b"unable to pull requested revision")
+ except (
+ error.Abort,
+ ssl.SSLError,
+ urllibcompat.urlerr.urlerror,
+ socket.timeout,
+ ) as e:
+ if handlepullerror(e):
+ return callself()
+ raise
+ except error.RepoError as e:
+ return handlerepoerror(e)
+ except error.RevlogError as e:
+ ui.warn(b"(repo corruption: %s; deleting shared store)\n" % e)
+ deletesharedstore()
+ return callself()
+ finally:
+ if remote:
+ remote.close()
+
+ # Now we should have the wanted revision in the store. Perform
+ # working directory manipulation.
+
+ # Avoid any working directory manipulations if `-U`/`--noupdate` was passed
+ if noupdate:
+ ui.write(b"(skipping update since `-U` was passed)\n")
+ return None
+
+ # Purge if requested. We purge before update because this way we're
+ # guaranteed to not have conflicts on `hg update`.
+ if purge and not created:
+ ui.write(b"(purging working directory)\n")
+ purge = getattr(commands, "purge", None)
+ if not purge:
+ purge = extensions.find(b"purge").purge
+
+ # Mercurial 4.3 doesn't purge files outside the sparse checkout.
+ # See https://bz.mercurial-scm.org/show_bug.cgi?id=5626. Force
+ # purging by monkeypatching the sparse matcher.
+ try:
+ old_sparse_fn = getattr(repo.dirstate, "_sparsematchfn", None)
+ if old_sparse_fn is not None:
+ repo.dirstate._sparsematchfn = lambda: matchmod.always()
+
+ with timeit("purge", "purge"):
+ if purge(
+ ui,
+ repo,
+ all=True,
+ abort_on_err=True,
+ # The function expects all arguments to be
+ # defined.
+ **{"print": None, "print0": None, "dirs": None, "files": None}
+ ):
+ raise error.Abort(b"error purging")
+ finally:
+ if old_sparse_fn is not None:
+ repo.dirstate._sparsematchfn = old_sparse_fn
+
+ # Update the working directory.
+
+ if repo[b"."].node() == nullid:
+ behaviors.add("empty-wdir")
+ else:
+ behaviors.add("populated-wdir")
+
+ if sparse_profile:
+ sparsemod = getsparse()
+
+ # By default, Mercurial will ignore unknown sparse profiles. This could
+ # lead to a full checkout. Be more strict.
+ try:
+ repo.filectx(sparse_profile, changeid=checkoutrevision).data()
+ except error.ManifestLookupError:
+ raise error.Abort(
+ b"sparse profile %s does not exist at revision "
+ b"%s" % (sparse_profile, checkoutrevision)
+ )
+
+ old_config = sparsemod.parseconfig(
+ repo.ui, repo.vfs.tryread(b"sparse"), b"sparse"
+ )
+
+ old_includes, old_excludes, old_profiles = old_config
+
+ if old_profiles == {sparse_profile} and not old_includes and not old_excludes:
+ ui.write(
+ b"(sparse profile %s already set; no need to update "
+ b"sparse config)\n" % sparse_profile
+ )
+ else:
+ if old_includes or old_excludes or old_profiles:
+ ui.write(
+ b"(replacing existing sparse config with profile "
+ b"%s)\n" % sparse_profile
+ )
+ else:
+ ui.write(b"(setting sparse config to profile %s)\n" % sparse_profile)
+
+ # If doing an incremental update, this will perform two updates:
+ # one to change the sparse profile and another to update to the new
+ # revision. This is not desired. But there's not a good API in
+ # Mercurial to do this as one operation.
+ # TRACKING hg64 - Mercurial 6.4 and later require call to
+ # dirstate.changing_parents(repo)
+ def parentchange(repo):
+ if util.safehasattr(repo.dirstate, "changing_parents"):
+ return repo.dirstate.changing_parents(repo)
+ return repo.dirstate.parentchange()
+
+ with repo.wlock(), parentchange(repo), timeit(
+ "sparse_update_config", "sparse-update-config"
+ ):
+ # pylint --py3k: W1636
+ fcounts = list(
+ map(
+ len,
+ sparsemod._updateconfigandrefreshwdir(
+ repo, [], [], [sparse_profile], force=True
+ ),
+ )
+ )
+
+ repo.ui.status(
+ b"%d files added, %d files dropped, "
+ b"%d files conflicting\n" % tuple(fcounts)
+ )
+
+ ui.write(b"(sparse refresh complete)\n")
+
+ op = "update_sparse" if sparse_profile else "update"
+ behavior = "update-sparse" if sparse_profile else "update"
+
+ with timeit(op, behavior):
+ if commands.update(ui, repo, rev=checkoutrevision, clean=True):
+ raise error.Abort(b"error updating")
+
+ ui.write(b"updated to %s\n" % checkoutrevision)
+
+ return None
+
+
+def extsetup(ui):
+ # Ensure required extensions are loaded.
+ for ext in (b"purge", b"share"):
+ try:
+ extensions.find(ext)
+ except KeyError:
+ extensions.load(ui, ext, None)
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task
new file mode 100755
index 0000000000..267b5283ea
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task
@@ -0,0 +1,1348 @@
+#!/usr/bin/python3 -u
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""Run a task after performing common actions.
+
+This script is meant to be the "driver" for TaskCluster based tasks.
+It receives some common arguments to control the run-time environment.
+
+It performs actions as requested from the arguments. Then it executes
+the requested process and prints its output, prefixing it with the
+current time to improve log usefulness.
+"""
+
+import sys
+
+if sys.version_info[0:2] < (3, 5):
+ print("run-task requires Python 3.5+")
+ sys.exit(1)
+
+import argparse
+import datetime
+import errno
+import io
+import json
+import os
+import platform
+import re
+import shutil
+import signal
+import socket
+import stat
+import subprocess
+import time
+import urllib.error
+import urllib.request
+from pathlib import Path
+from threading import Thread
+from typing import Optional
+
+SECRET_BASEURL_TPL = "http://taskcluster/secrets/v1/secret/{}"
+
+GITHUB_SSH_FINGERPRINT = (
+ b"github.com ssh-ed25519 "
+ b"AAAAC3NzaC1lZDI1NTE5AAAAIOMqqnkVzrm0SdG6UOoqKLsabgH5C9okWi0dh2l9GKJl\n"
+ b"github.com ecdsa-sha2-nistp256 "
+ b"AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEmKSENjQEezOmxkZMy7opKgwFB"
+ b"9nkt5YRrYMjNuG5N87uRgg6CLrbo5wAdT/y6v0mKV0U2w0WZ2YB/++Tpockg=\n"
+ b"github.com ssh-rsa "
+ b"AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY"
+ b"4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDP"
+ b"gVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyR"
+ b"kQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWO"
+ b"WRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZ"
+ b"yaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+"
+ b"2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=\n"
+)
+
+
+CACHE_UID_GID_MISMATCH = """
+There is a UID/GID mismatch on the cache. This likely means:
+
+a) different tasks are running as a different user/group
+b) different Docker images have different UID/GID for the same user/group
+
+Our cache policy is that the UID/GID for ALL tasks must be consistent
+for the lifetime of the cache. This eliminates permissions problems due
+to file/directory user/group ownership.
+
+To make this error go away, ensure that all Docker images are use
+a consistent UID/GID and that all tasks using this cache are running as
+the same user/group.
+"""
+
+
+NON_EMPTY_VOLUME = """
+error: volume %s is not empty
+
+Our Docker image policy requires volumes to be empty.
+
+The volume was likely populated as part of building the Docker image.
+Change the Dockerfile and anything run from it to not create files in
+any VOLUME.
+
+A lesser possibility is that you stumbled upon a TaskCluster platform bug
+where it fails to use new volumes for tasks.
+"""
+
+
+FETCH_CONTENT_NOT_FOUND = """
+error: fetch-content script not found
+
+The script at `taskcluster/scripts/misc/fetch-content` could not be
+detected in the current environment.
+"""
+
+# The exit code to use when caches should be purged and the task retried.
+# This is EX_OSFILE (from sysexits.h):
+# Some system file does not exist, cannot be opened, or has some
+# sort of error (e.g., syntax error).
+EXIT_PURGE_CACHE = 72
+
+
+IS_MACOSX = sys.platform == "darwin"
+IS_POSIX = os.name == "posix"
+IS_WINDOWS = os.name == "nt"
+
+# Both mercurial and git use sha1 as revision idenfiers. Luckily, both define
+# the same value as the null revision.
+#
+# https://github.com/git/git/blob/dc04167d378fb29d30e1647ff6ff51dd182bc9a3/t/oid-info/hash-info#L7
+# https://www.mercurial-scm.org/repo/hg-stable/file/82efc31bd152/mercurial/node.py#l30
+NULL_REVISION = "0000000000000000000000000000000000000000"
+
+
+def print_line(prefix, m):
+ now = datetime.datetime.utcnow().isoformat().encode("utf-8")
+ # slice microseconds to 3 decimals.
+ now = now[:-3] if now[-7:-6] == b"." else now
+ sys.stdout.buffer.write(b"[%s %sZ] %s" % (prefix, now, m))
+ sys.stdout.buffer.flush()
+
+
+def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5):
+ """
+ It's possible to see spurious errors on Windows due to various things
+ keeping a handle to the directory open (explorer, virus scanners, etc)
+ So we try a few times if it fails with a known error.
+ retry_delay is multiplied by the number of failed attempts to increase
+ the likelihood of success in subsequent attempts.
+ """
+ retry_count = 0
+ while True:
+ try:
+ func(*args)
+ except OSError as e:
+ # Error codes are defined in:
+ # https://docs.python.org/3/library/errno.html#module-errno
+ if e.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.ENOENT):
+ raise
+
+ if retry_count == retry_max:
+ raise
+
+ retry_count += 1
+
+ print(
+ '%s() failed for "%s". Reason: %s (%s). Retrying...'
+ % (func.__name__, args, e.strerror, e.errno)
+ )
+ time.sleep(retry_count * retry_delay)
+ else:
+ # If no exception has been thrown it should be done
+ break
+
+
+def remove(path):
+ """Removes the specified file, link, or directory tree.
+
+ This is a replacement for shutil.rmtree that works better under
+ windows. It does the following things:
+
+ - check path access for the current user before trying to remove
+ - retry operations on some known errors due to various things keeping
+ a handle on file paths - like explorer, virus scanners, etc. The
+ known errors are errno.EACCES and errno.ENOTEMPTY, and it will
+ retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds
+ between each attempt.
+
+ Note that no error will be raised if the given path does not exists.
+
+ :param path: path to be removed
+ """
+
+ def _update_permissions(path):
+ """Sets specified pemissions depending on filetype"""
+ if os.path.islink(path):
+ # Path is a symlink which we don't have to modify
+ # because it should already have all the needed permissions
+ return
+
+ stats = os.stat(path)
+
+ if os.path.isfile(path):
+ mode = stats.st_mode | stat.S_IWUSR
+ elif os.path.isdir(path):
+ mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR
+ else:
+ # Not supported type
+ return
+
+ _call_windows_retry(os.chmod, (path, mode))
+
+ if not os.path.lexists(path):
+ print_line(b"remove", b"WARNING: %s does not exists!\n" % path.encode("utf-8"))
+ return
+
+ """
+ On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access
+ files or directories that exceed MAX_PATH(260) limitation or that ends
+ with a period.
+ """
+ if (
+ sys.platform in ("win32", "cygwin")
+ and len(path) >= 3
+ and path[1] == ":"
+ and path[2] == "\\"
+ ):
+ path = "\\\\?\\%s" % path
+
+ if os.path.isfile(path) or os.path.islink(path):
+ # Verify the file or link is read/write for the current user
+ _update_permissions(path)
+ _call_windows_retry(os.remove, (path,))
+
+ elif os.path.isdir(path):
+ # Verify the directory is read/write/execute for the current user
+ _update_permissions(path)
+
+ # We're ensuring that every nested item has writable permission.
+ for root, dirs, files in os.walk(path):
+ for entry in dirs + files:
+ _update_permissions(os.path.join(root, entry))
+ _call_windows_retry(shutil.rmtree, (path,))
+
+
+def run_required_command(prefix, args, *, extra_env=None, cwd=None):
+ res = run_command(prefix, args, extra_env=extra_env, cwd=cwd)
+ if res:
+ sys.exit(res)
+
+
+def retry_required_command(prefix, args, *, extra_env=None, cwd=None, retries=2):
+ backoff = 1
+ while True:
+ res = run_command(prefix, args, extra_env=extra_env, cwd=cwd)
+ if not res:
+ return
+ if not retries:
+ sys.exit(res)
+ retries -= 1
+ backoff *= 2
+ time.sleep(backoff)
+
+
+def run_command(prefix, args, *, extra_env=None, cwd=None):
+ """Runs a process and prefixes its output with the time.
+
+ Returns the process exit code.
+ """
+ print_line(prefix, b"executing %r\n" % args)
+
+ env = dict(os.environ)
+ env.update(extra_env or {})
+
+ # Note: TaskCluster's stdin is a TTY. This attribute is lost
+ # when we pass sys.stdin to the invoked process. If we cared
+ # to preserve stdin as a TTY, we could make this work. But until
+ # someone needs it, don't bother.
+
+ # We want stdout to be bytes on Python 3. That means we can't use
+ # universal_newlines=True (because it implies text mode). But
+ # p.stdout.readline() won't work for bytes text streams. So, on Python 3,
+ # we manually install a latin1 stream wrapper. This allows us to readline()
+ # and preserves bytes, without losing any data.
+
+ p = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=sys.stdin.fileno(),
+ cwd=cwd,
+ env=env,
+ )
+
+ stdout = io.TextIOWrapper(p.stdout, encoding="latin1")
+
+ while True:
+ data = stdout.readline().encode("latin1")
+
+ if data == b"":
+ break
+
+ print_line(prefix, data)
+
+ return p.wait()
+
+
+def get_posix_user_group(user, group):
+ import grp
+ import pwd
+
+ try:
+ user_record = pwd.getpwnam(user)
+ except KeyError:
+ print("could not find user %s; specify a valid user with --user" % user)
+ sys.exit(1)
+
+ try:
+ group_record = grp.getgrnam(group)
+ except KeyError:
+ print("could not find group %s; specify a valid group with --group" % group)
+ sys.exit(1)
+
+ # Most tasks use worker:worker. We require they have a specific numeric ID
+ # because otherwise it is too easy for files written to caches to have
+ # mismatched numeric IDs, which results in permissions errors.
+ if user_record.pw_name == "worker" and user_record.pw_uid != 1000:
+ print("user `worker` must have uid=1000; got %d" % user_record.pw_uid)
+ sys.exit(1)
+
+ if group_record.gr_name == "worker" and group_record.gr_gid != 1000:
+ print("group `worker` must have gid=1000; got %d" % group_record.gr_gid)
+ sys.exit(1)
+
+ # Find all groups to which this user is a member.
+ gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem]
+
+ return user_record, group_record, gids
+
+
+def write_audit_entry(path, msg):
+ now = datetime.datetime.utcnow().isoformat().encode("utf-8")
+ with open(path, "ab") as fh:
+ fh.write(b"[%sZ %s] %s\n" % (now, os.environb.get(b"TASK_ID", b"UNKNOWN"), msg))
+
+
+WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR
+
+
+def set_dir_permissions(path, uid, gid):
+ st = os.lstat(path)
+
+ if st.st_uid != uid or st.st_gid != gid:
+ os.chown(path, uid, gid)
+
+ # Also make sure dirs are writable in case we need to delete
+ # them.
+ if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE:
+ os.chmod(path, st.st_mode | WANTED_DIR_MODE)
+
+
+def chown_recursive(path, user, group, uid, gid):
+ print_line(
+ b"chown",
+ b"recursively changing ownership of %s to %s:%s\n"
+ % (path.encode("utf-8"), user.encode("utf-8"), group.encode("utf-8")),
+ )
+
+ set_dir_permissions(path, uid, gid)
+
+ for root, dirs, files in os.walk(path):
+ for d in dirs:
+ set_dir_permissions(os.path.join(root, d), uid, gid)
+
+ for f in files:
+ # File may be a symlink that points to nowhere. In which case
+ # os.chown() would fail because it attempts to follow the
+ # symlink. We only care about directory entries, not what
+ # they point to. So setting the owner of the symlink should
+ # be sufficient.
+ os.lchown(os.path.join(root, f), uid, gid)
+
+
+def configure_cache_posix(cache, user, group, untrusted_caches, running_as_root):
+ """Configure a cache path on POSIX platforms.
+
+ For each cache, we write out a special file denoting attributes and
+ capabilities of run-task and the task being executed. These attributes
+ are used by subsequent run-task invocations to validate that use of
+ the cache is acceptable.
+
+ We /could/ blow away the cache data on requirements mismatch.
+ While this would be convenient, this could result in "competing" tasks
+ effectively undoing the other's work. This would slow down task
+ execution in aggregate. Without monitoring for this, people may not notice
+ the problem and tasks would be slower than they could be. We follow the
+ principle of "fail fast" to ensure optimal task execution.
+
+ We also write an audit log of who used the caches. This log is printed
+ during failures to help aid debugging.
+ """
+
+ our_requirements = {
+ # Include a version string that we can bump whenever to trigger
+ # fresh caches. The actual value is not relevant and doesn't need
+ # to follow any explicit order. Since taskgraph bakes this file's
+ # hash into cache names, any change to this file/version is sufficient
+ # to force the use of a new cache.
+ b"version=1",
+ # Include the UID and GID the task will run as to ensure that tasks
+ # with different UID and GID don't share the same cache.
+ b"uid=%d" % user.pw_uid,
+ b"gid=%d" % group.gr_gid,
+ }
+
+ requires_path = os.path.join(cache, ".cacherequires")
+ audit_path = os.path.join(cache, ".cachelog")
+
+ # The cache is empty. Configure it.
+ if not os.listdir(cache):
+ print_line(
+ b"cache",
+ b"cache %s is empty; writing requirements: "
+ b"%s\n" % (cache.encode("utf-8"), b" ".join(sorted(our_requirements))),
+ )
+
+ # We write a requirements file so future invocations know what the
+ # requirements are.
+ with open(requires_path, "wb") as fh:
+ fh.write(b"\n".join(sorted(our_requirements)))
+
+ # And make it read-only as a precaution against deletion.
+ os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+
+ write_audit_entry(
+ audit_path,
+ b"created; requirements: %s" % b", ".join(sorted(our_requirements)),
+ )
+
+ set_dir_permissions(cache, user.pw_uid, group.gr_gid)
+ return
+
+ # The cache has content and we have a requirements file. Validate
+ # requirements alignment.
+ if os.path.exists(requires_path):
+ with open(requires_path, "rb") as fh:
+ wanted_requirements = set(fh.read().splitlines())
+
+ print_line(
+ b"cache",
+ b"cache %s exists; requirements: %s\n"
+ % (cache.encode("utf-8"), b" ".join(sorted(wanted_requirements))),
+ )
+
+ missing = wanted_requirements - our_requirements
+
+ # Allow requirements mismatch for uid/gid if and only if caches
+ # are untrusted. This allows cache behavior on Try to be
+ # reasonable. Otherwise, random tasks could "poison" cache
+ # usability by introducing uid/gid mismatches. For untrusted
+ # environments like Try, this is a perfectly reasonable thing to
+ # allow.
+ if (
+ missing
+ and untrusted_caches
+ and running_as_root
+ and all(s.startswith((b"uid=", b"gid=")) for s in missing)
+ ):
+ print_line(
+ b"cache",
+ b"cache %s uid/gid mismatch; this is acceptable "
+ b"because caches for this task are untrusted; "
+ b"changing ownership to facilitate cache use\n" % cache.encode("utf-8"),
+ )
+ chown_recursive(
+ cache, user.pw_name, group.gr_name, user.pw_uid, group.gr_gid
+ )
+
+ # And write out the updated reality.
+ with open(requires_path, "wb") as fh:
+ fh.write(b"\n".join(sorted(our_requirements)))
+
+ write_audit_entry(
+ audit_path,
+ b"chown; requirements: %s" % b", ".join(sorted(our_requirements)),
+ )
+
+ elif missing:
+ print(
+ "error: requirements for populated cache %s differ from "
+ "this task" % cache
+ )
+ print(
+ "cache requirements: %s"
+ % " ".join(sorted(s.decode("utf-8") for s in wanted_requirements))
+ )
+ print(
+ "our requirements: %s"
+ % " ".join(sorted(s.decode("utf-8") for s in our_requirements))
+ )
+ if any(s.startswith((b"uid=", b"gid=")) for s in missing):
+ print(CACHE_UID_GID_MISMATCH)
+
+ write_audit_entry(
+ audit_path,
+ b"requirements mismatch; wanted: %s"
+ % b", ".join(sorted(our_requirements)),
+ )
+
+ print("")
+ print("audit log:")
+ with open(audit_path, "r") as fh:
+ print(fh.read())
+
+ return True
+ else:
+ write_audit_entry(audit_path, b"used")
+
+ # We don't need to adjust permissions here because the cache is
+ # associated with a uid/gid and the first task should have set
+ # a proper owner/group.
+
+ return
+
+ # The cache has content and no requirements file. This shouldn't
+ # happen because run-task should be the first thing that touches a
+ # cache.
+ print(
+ "error: cache %s is not empty and is missing a "
+ ".cacherequires file; the cache names for this task are "
+ "likely mis-configured or TASKCLUSTER_CACHES is not set "
+ "properly" % cache
+ )
+
+ write_audit_entry(audit_path, b"missing .cacherequires")
+ return True
+
+
+def configure_volume_posix(volume, user, group, running_as_root):
+ # The only time we should see files in the volume is if the Docker
+ # image build put files there.
+ #
+ # For the sake of simplicity, our policy is that volumes should be
+ # empty. This also has the advantage that an empty volume looks
+ # a lot like an empty cache. Tasks can rely on caches being
+ # swapped in and out on any volume without any noticeable change
+ # of behavior.
+ volume_files = os.listdir(volume)
+ if volume_files:
+ print(NON_EMPTY_VOLUME % volume)
+ print("entries in root directory: %s" % " ".join(sorted(volume_files)))
+ sys.exit(1)
+
+ # The volume is almost certainly owned by root:root. Chown it so it
+ # is writable.
+
+ if running_as_root:
+ print_line(
+ b"volume",
+ b"changing ownership of volume %s "
+ b"to %d:%d\n" % (volume.encode("utf-8"), user.pw_uid, group.gr_gid),
+ )
+ set_dir_permissions(volume, user.pw_uid, group.gr_gid)
+
+
+def _clean_git_checkout(destination_path):
+ # Delete untracked files (i.e. build products)
+ print_line(b"vcs", b"cleaning git checkout...\n")
+ args = [
+ "git",
+ "clean",
+ # Two -f`s causes subdirectories with `.git`
+ # directories to be cleaned as well.
+ "-nxdff",
+ ]
+ print_line(b"vcs", b"executing %r\n" % args)
+ p = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=sys.stdin.fileno(),
+ cwd=destination_path,
+ env=os.environ,
+ )
+ stdout = io.TextIOWrapper(p.stdout, encoding="latin1")
+ ret = p.wait()
+ if ret:
+ sys.exit(ret)
+ data = stdout.read()
+ prefix = "Would remove "
+ filenames = [
+ os.path.join(destination_path, line[len(prefix) :])
+ for line in data.splitlines()
+ ]
+ print_line(b"vcs", b"removing %r\n" % filenames)
+ for filename in filenames:
+ remove(filename)
+ print_line(b"vcs", b"successfully cleaned git checkout!\n")
+
+
+def git_checkout(
+ destination_path: str,
+ head_repo: str,
+ base_repo: Optional[str],
+ base_ref: Optional[str],
+ base_rev: Optional[str],
+ ref: Optional[str],
+ commit: Optional[str],
+ ssh_key_file: Optional[Path],
+ ssh_known_hosts_file: Optional[Path],
+):
+ env = {
+ # abort if transfer speed is lower than 1kB/s for 1 minute
+ "GIT_HTTP_LOW_SPEED_LIMIT": "1024",
+ "GIT_HTTP_LOW_SPEED_TIME": "60",
+ "PYTHONUNBUFFERED": "1",
+ }
+
+ if ssh_key_file and ssh_known_hosts_file:
+ if not ssh_key_file.exists():
+ raise RuntimeError("Can't find specified ssh_key file.")
+ if not ssh_known_hosts_file.exists():
+ raise RuntimeError("Can't find specified known_hosts file.")
+ env["GIT_SSH_COMMAND"] = " ".join(
+ [
+ "ssh",
+ "-oIdentityFile={}".format(ssh_key_file.as_posix()),
+ "-oStrictHostKeyChecking=yes",
+ "-oUserKnownHostsFile={}".format(ssh_known_hosts_file.as_posix()),
+ ]
+ )
+ elif ssh_key_file or ssh_known_hosts_file:
+ raise RuntimeError(
+ "Must specify both ssh_key_file and ssh_known_hosts_file, if either are specified",
+ )
+
+ if not os.path.exists(destination_path):
+ # Repository doesn't already exist, needs to be cloned
+ args = [
+ "git",
+ "clone",
+ base_repo if base_repo else head_repo,
+ destination_path,
+ ]
+
+ retry_required_command(b"vcs", args, extra_env=env)
+
+ if base_ref:
+ args = ["git", "fetch", "origin", base_ref]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # Create local branch so that taskgraph is able to compute differences
+ # between the head branch and the base one, if needed
+ args = ["git", "checkout", base_ref]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # When commits are force-pushed (like on a testing branch), base_rev doesn't
+ # exist on base_ref. Fetching it allows taskgraph to compute differences
+ # between the previous state before the force-push and the current state.
+ #
+ # Unlike base_ref just above, there is no need to checkout the revision:
+ # it's immediately available after the fetch.
+ if base_rev and base_rev != NULL_REVISION:
+ args = ["git", "fetch", "origin", base_rev]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # If a ref was provided, it might be tag, so we need to make sure we fetch
+ # those. This is explicitly only done when base and head repo match,
+ # because it is the only scenario where tags could be present. (PRs, for
+ # example, always include an explicit rev.) Failure to do this could result
+ # in not having a tag, or worse: having an outdated version of one.
+ # `--force` is needed to be able to update an existing tag.
+ if ref and base_repo == head_repo:
+ args = [
+ "git",
+ "fetch",
+ "--tags",
+ "--force",
+ base_repo,
+ ref,
+ ]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # If a ref isn't provided, we fetch all refs from head_repo, which may be slow
+ args = [
+ "git",
+ "fetch",
+ "--no-tags",
+ head_repo,
+ ref if ref else "+refs/heads/*:refs/remotes/work/*",
+ ]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ args = [
+ "git",
+ "checkout",
+ "-f",
+ ]
+
+ if ref:
+ args.extend(["-B", ref])
+
+ # `git fetch` set `FETCH_HEAD` reference to the last commit of the desired branch
+ args.append(commit if commit else "FETCH_HEAD")
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ if os.path.exists(os.path.join(destination_path, ".gitmodules")):
+ args = [
+ "git",
+ "submodule",
+ "init",
+ ]
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ args = [
+ "git",
+ "submodule",
+ "update",
+ "--force", # Overrides any potential local changes
+ ]
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ _clean_git_checkout(destination_path)
+
+ args = ["git", "rev-parse", "--verify", "HEAD"]
+
+ commit_hash = subprocess.check_output(
+ args, cwd=destination_path, universal_newlines=True
+ ).strip()
+ assert re.match("^[a-f0-9]{40}$", commit_hash)
+
+ if head_repo.startswith("https://github.com"):
+ if head_repo.endswith("/"):
+ head_repo = head_repo[:-1]
+
+ tinderbox_link = "{}/commit/{}".format(head_repo, commit_hash)
+ repo_name = head_repo.split("/")[-1]
+ else:
+ tinderbox_link = head_repo
+ repo_name = head_repo
+
+ msg = (
+ "TinderboxPrint:<a href='{link}' "
+ "title='Built from {name} commit {commit_hash}'>"
+ "{commit_hash}</a>\n".format(
+ commit_hash=commit_hash, link=tinderbox_link, name=repo_name
+ )
+ )
+
+ print_line(b"vcs", msg.encode("utf-8"))
+
+ return commit_hash
+
+
+def fetch_ssh_secret(secret_name):
+ """Retrieves the private ssh key, and returns it as a StringIO object"""
+ secret_url = SECRET_BASEURL_TPL.format(secret_name)
+ try:
+ print_line(
+ b"vcs",
+ b"fetching secret %s from %s\n"
+ % (secret_name.encode("utf-8"), secret_url.encode("utf-8")),
+ )
+ res = urllib.request.urlopen(secret_url, timeout=10)
+ secret = res.read()
+ try:
+ secret = json.loads(secret.decode("utf-8"))
+ except ValueError:
+ print_line(b"vcs", b"invalid JSON in secret")
+ sys.exit(1)
+ except (urllib.error.URLError, socket.timeout):
+ print_line(b"vcs", b"Unable to retrieve ssh secret. aborting...")
+ sys.exit(1)
+
+ return secret["secret"]["ssh_privkey"]
+
+
+def hg_checkout(
+ destination_path: str,
+ head_repo: str,
+ base_repo: Optional[str],
+ store_path: str,
+ sparse_profile: Optional[str],
+ branch: Optional[str],
+ revision: Optional[str],
+):
+ if IS_MACOSX:
+ hg_bin = "/tools/python27-mercurial/bin/hg"
+ elif IS_POSIX:
+ hg_bin = "hg"
+ elif IS_WINDOWS:
+ # This is where OCC installs it in the AMIs.
+ hg_bin = r"C:\Program Files\Mercurial\hg.exe"
+ if not os.path.exists(hg_bin):
+ print("could not find Mercurial executable: %s" % hg_bin)
+ sys.exit(1)
+ else:
+ raise RuntimeError("Must be running on mac, posix or windows")
+
+ args = [
+ hg_bin,
+ "robustcheckout",
+ "--sharebase",
+ store_path,
+ "--purge",
+ ]
+
+ if base_repo:
+ args.extend(["--upstream", base_repo])
+ if sparse_profile:
+ args.extend(["--sparseprofile", sparse_profile])
+
+ # Specify method to checkout a revision. This defaults to revisions as
+ # SHA-1 strings, but also supports symbolic revisions like `tip` via the
+ # branch flag.
+ args.extend(
+ [
+ "--branch" if branch else "--revision",
+ branch or revision,
+ head_repo,
+ destination_path,
+ ]
+ )
+
+ run_required_command(b"vcs", args, extra_env={"PYTHONUNBUFFERED": "1"})
+
+ # Update the current revision hash and ensure that it is well formed.
+ revision = subprocess.check_output(
+ [hg_bin, "log", "--rev", ".", "--template", "{node}"],
+ cwd=destination_path,
+ # Triggers text mode on Python 3.
+ universal_newlines=True,
+ )
+
+ assert re.match("^[a-f0-9]{40}$", revision)
+
+ msg = (
+ "TinderboxPrint:<a href={head_repo}/rev/{revision} "
+ "title='Built from {repo_name} revision {revision}'>"
+ "{revision}</a>\n".format(
+ revision=revision, head_repo=head_repo, repo_name=head_repo.split("/")[-1]
+ )
+ )
+
+ print_line(b"vcs", msg.encode("utf-8"))
+
+ return revision
+
+
+def fetch_artifacts():
+ print_line(b"fetches", b"fetching artifacts\n")
+
+ fetch_content = shutil.which("fetch-content")
+
+ if not fetch_content or not os.path.isfile(fetch_content):
+ fetch_content = os.path.join(os.path.dirname(__file__), "fetch-content")
+
+ if not os.path.isfile(fetch_content):
+ print(FETCH_CONTENT_NOT_FOUND)
+ sys.exit(1)
+
+ cmd = [sys.executable, "-u", fetch_content, "task-artifacts"]
+ print_line(b"fetches", b"executing %r\n" % cmd)
+ subprocess.run(cmd, check=True, env=os.environ)
+ print_line(b"fetches", b"finished fetching artifacts\n")
+
+
+def add_vcs_arguments(parser, project, name):
+ """Adds arguments to ArgumentParser to control VCS options for a project."""
+
+ parser.add_argument(
+ "--%s-checkout" % project,
+ help="Directory where %s checkout should be created" % name,
+ )
+ parser.add_argument(
+ "--%s-sparse-profile" % project,
+ help="Path to sparse profile for %s checkout" % name,
+ )
+
+
+def collect_vcs_options(args, project, name):
+ checkout = getattr(args, "%s_checkout" % project)
+ sparse_profile = getattr(args, "%s_sparse_profile" % project)
+
+ env_prefix = project.upper()
+
+ repo_type = os.environ.get("%s_REPOSITORY_TYPE" % env_prefix)
+ base_repo = os.environ.get("%s_BASE_REPOSITORY" % env_prefix)
+ base_ref = os.environ.get("%s_BASE_REF" % env_prefix)
+ base_rev = os.environ.get("%s_BASE_REV" % env_prefix)
+ head_repo = os.environ.get("%s_HEAD_REPOSITORY" % env_prefix)
+ revision = os.environ.get("%s_HEAD_REV" % env_prefix)
+ ref = os.environ.get("%s_HEAD_REF" % env_prefix)
+ pip_requirements = os.environ.get("%s_PIP_REQUIREMENTS" % env_prefix)
+ private_key_secret = os.environ.get("%s_SSH_SECRET_NAME" % env_prefix)
+
+ store_path = os.environ.get("HG_STORE_PATH")
+
+ # Expand ~ in some paths.
+ if checkout:
+ checkout = os.path.abspath(os.path.expanduser(checkout))
+ if store_path:
+ store_path = os.path.abspath(os.path.expanduser(store_path))
+
+ if pip_requirements:
+ pip_requirements = os.path.join(checkout, pip_requirements)
+
+ # Some callers set the base repository to mozilla-central for historical
+ # reasons. Switch to mozilla-unified because robustcheckout works best
+ # with it.
+ if base_repo == "https://hg.mozilla.org/mozilla-central":
+ base_repo = "https://hg.mozilla.org/mozilla-unified"
+
+ return {
+ "store-path": store_path,
+ "project": project,
+ "name": name,
+ "env-prefix": env_prefix,
+ "checkout": checkout,
+ "sparse-profile": sparse_profile,
+ "base-repo": base_repo,
+ "base-ref": base_ref,
+ "base-rev": base_rev,
+ "head-repo": head_repo,
+ "revision": revision,
+ "ref": ref,
+ "repo-type": repo_type,
+ "ssh-secret-name": private_key_secret,
+ "pip-requirements": pip_requirements,
+ }
+
+
+def vcs_checkout_from_args(options):
+ if not options["checkout"]:
+ if options["ref"] and not options["revision"]:
+ print("task should be defined in terms of non-symbolic revision")
+ sys.exit(1)
+ return
+
+ revision = options["revision"]
+ ref = options["ref"]
+ ssh_key_file = None
+ ssh_known_hosts_file = None
+ ssh_dir = None
+
+ try:
+ if options.get("ssh-secret-name"):
+ ssh_dir = Path("~/.ssh-run-task").expanduser()
+ os.makedirs(ssh_dir, 0o700)
+ ssh_key_file = ssh_dir.joinpath("private_ssh_key")
+ ssh_key = fetch_ssh_secret(options["ssh-secret-name"])
+ # We don't use write_text here, to avoid \n -> \r\n on windows
+ ssh_key_file.write_bytes(ssh_key.encode("ascii"))
+ ssh_key_file.chmod(0o600)
+ # TODO: We should pull this from a secret, so it can be updated on old trees
+ ssh_known_hosts_file = ssh_dir.joinpath("known_hosts")
+ ssh_known_hosts_file.write_bytes(GITHUB_SSH_FINGERPRINT)
+
+ if options["repo-type"] == "git":
+ if not revision and not ref:
+ raise RuntimeError(
+ "Git requires that either a ref, a revision, or both are provided"
+ )
+
+ if not ref:
+ print("Providing a ref will improve the performance of this checkout")
+
+ revision = git_checkout(
+ options["checkout"],
+ options["head-repo"],
+ options["base-repo"],
+ options["base-ref"],
+ options["base-rev"],
+ ref,
+ revision,
+ ssh_key_file,
+ ssh_known_hosts_file,
+ )
+ elif options["repo-type"] == "hg":
+ if not revision and not ref:
+ raise RuntimeError(
+ "Hg requires that at least one of a ref or revision " "is provided"
+ )
+
+ revision = hg_checkout(
+ options["checkout"],
+ options["head-repo"],
+ options["base-repo"],
+ options["store-path"],
+ options["sparse-profile"],
+ ref,
+ revision,
+ )
+ else:
+ raise RuntimeError('Type of VCS must be either "git" or "hg"')
+ finally:
+ if ssh_dir:
+ shutil.rmtree(ssh_dir, ignore_errors=True)
+ pass
+
+ os.environ["%s_HEAD_REV" % options["env-prefix"]] = revision
+
+
+def install_pip_requirements(repositories):
+ """Install pip requirements files from specified repositories, if necessary."""
+ requirements = [
+ r["pip-requirements"] for r in repositories if r["pip-requirements"]
+ ]
+ if not requirements:
+ return
+
+ cmd = [sys.executable, "-mpip", "install"]
+ if os.environ.get("PIP_DISABLE_REQUIRE_HASHES") != "1":
+ cmd.append("--require-hashes")
+
+ for path in requirements:
+ cmd.extend(["-r", path])
+
+ run_required_command(b"pip-install", cmd)
+
+
+def maybe_run_resource_monitoring():
+ """Run the resource monitor if available.
+
+ Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160
+ and https://bugzil.la/1648051
+
+ """
+ if "MOZ_FETCHES" not in os.environ:
+ return
+ if "RESOURCE_MONITOR_OUTPUT" not in os.environ:
+ return
+
+ prefix = b"resource_monitor"
+
+ executable = "{}/resource-monitor/resource-monitor{}".format(
+ os.environ.get("MOZ_FETCHES_DIR"), ".exe" if IS_WINDOWS else ""
+ )
+
+ if not os.path.exists(executable) or not os.access(executable, os.X_OK):
+ print_line(prefix, b"%s not executable\n" % executable.encode("utf-8"))
+ return
+ args = [
+ executable,
+ "-process",
+ str(os.getpid()),
+ "-output",
+ os.environ["RESOURCE_MONITOR_OUTPUT"],
+ ]
+ print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode("utf-8"))
+ # Avoid environment variables the payload doesn't need.
+ del os.environ["RESOURCE_MONITOR_OUTPUT"]
+
+ # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too.
+ process = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0,
+ cwd=os.getcwd(),
+ )
+
+ def capture_output():
+ fh = io.TextIOWrapper(process.stdout, encoding="latin1")
+ while True:
+ data = fh.readline().encode("latin1")
+ if data == b"":
+ break
+ print_line(prefix, data)
+
+ monitor_process = Thread(target=capture_output)
+ monitor_process.start()
+ return process
+
+
+def _display_python_version():
+ print_line(
+ b"setup", b"Python version: %s\n" % platform.python_version().encode("utf-8")
+ )
+
+
+def main(args):
+ os.environ["TASK_WORKDIR"] = os.getcwd()
+ print_line(
+ b"setup",
+ b"run-task started in %s\n" % os.environ["TASK_WORKDIR"].encode("utf-8"),
+ )
+ print_line(
+ b"setup",
+ b"Invoked by command: %s\n" % " ".join(args).encode("utf-8"),
+ )
+ _display_python_version()
+ running_as_root = IS_POSIX and os.getuid() == 0
+
+ # Arguments up to '--' are ours. After are for the main task
+ # to be executed.
+ try:
+ i = args.index("--")
+ our_args = args[0:i]
+ task_args = args[i + 1 :]
+ except ValueError:
+ our_args = args
+ task_args = []
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--user", default="worker", help="user to run as")
+ parser.add_argument("--group", default="worker", help="group to run as")
+ parser.add_argument("--task-cwd", help="directory to run the provided command in")
+
+ repositories = os.environ.get("REPOSITORIES")
+ if repositories:
+ repositories = json.loads(repositories)
+ else:
+ repositories = {"vcs": "repository"}
+
+ for repository, name in repositories.items():
+ add_vcs_arguments(parser, repository, name)
+
+ parser.add_argument(
+ "--fetch-hgfingerprint", action="store_true", help=argparse.SUPPRESS
+ )
+
+ args = parser.parse_args(our_args)
+
+ repositories = [
+ collect_vcs_options(args, repository, name)
+ for (repository, name) in repositories.items()
+ ]
+ # Sort repositories so that parent checkout paths come before children
+ repositories.sort(key=lambda repo: Path(repo["checkout"] or "/").parts)
+
+ uid = gid = gids = user = group = None
+ if IS_POSIX and running_as_root:
+ user, group, gids = get_posix_user_group(args.user, args.group)
+ uid = user.pw_uid
+ gid = group.gr_gid
+
+ if running_as_root and os.path.exists("/dev/kvm"):
+ # Ensure kvm permissions for worker, required for Android x86
+ st = os.stat("/dev/kvm")
+ os.chmod("/dev/kvm", st.st_mode | 0o666)
+
+ # Validate caches.
+ #
+ # Taskgraph should pass in a list of paths that are caches via an
+ # environment variable (which we don't want to pass down to child
+ # processes).
+
+ if "TASKCLUSTER_CACHES" in os.environ:
+ caches = os.environ["TASKCLUSTER_CACHES"].split(";")
+ del os.environ["TASKCLUSTER_CACHES"]
+ else:
+ caches = []
+
+ if "TASKCLUSTER_UNTRUSTED_CACHES" in os.environ:
+ untrusted_caches = True
+ del os.environ["TASKCLUSTER_UNTRUSTED_CACHES"]
+ else:
+ untrusted_caches = False
+
+ for cache in caches:
+ if not os.path.isdir(cache):
+ print(
+ "error: cache %s is not a directory; this should never "
+ "happen" % cache
+ )
+ return 1
+
+ purge = configure_cache_posix(
+ cache, user, group, untrusted_caches, running_as_root
+ )
+
+ if purge:
+ return EXIT_PURGE_CACHE
+
+ if "TASKCLUSTER_VOLUMES" in os.environ:
+ volumes = os.environ["TASKCLUSTER_VOLUMES"].split(";")
+ del os.environ["TASKCLUSTER_VOLUMES"]
+ else:
+ volumes = []
+
+ if volumes and not IS_POSIX:
+ print("assertion failed: volumes not expected on Windows")
+ return 1
+
+ # Sanitize volumes.
+ for volume in volumes:
+ # If a volume is a cache, it was dealt with above.
+ if volume in caches:
+ print_line(b"volume", b"volume %s is a cache\n" % volume.encode("utf-8"))
+ continue
+
+ configure_volume_posix(volume, user, group, running_as_root)
+
+ all_caches_and_volumes = set(map(os.path.normpath, caches))
+ all_caches_and_volumes |= set(map(os.path.normpath, volumes))
+
+ def path_in_cache_or_volume(path):
+ path = os.path.normpath(path)
+
+ while path:
+ if path in all_caches_and_volumes:
+ return True
+
+ path, child = os.path.split(path)
+ if not child:
+ break
+
+ return False
+
+ def prepare_checkout_dir(checkout):
+ if not checkout:
+ return
+
+ # The checkout path becomes the working directory. Since there are
+ # special cache files in the cache's root directory and working
+ # directory purging could blow them away, disallow this scenario.
+ if os.path.exists(os.path.join(checkout, ".cacherequires")):
+ print("error: cannot perform vcs checkout into cache root: %s" % checkout)
+ sys.exit(1)
+
+ # TODO given the performance implications, consider making this a fatal
+ # error.
+ if not path_in_cache_or_volume(checkout):
+ print_line(
+ b"vcs",
+ b"WARNING: vcs checkout path (%s) not in cache "
+ b"or volume; performance will likely suffer\n"
+ % checkout.encode("utf-8"),
+ )
+
+ # Ensure the directory for the source checkout exists.
+ try:
+ os.makedirs(os.path.dirname(checkout))
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ # And that it is owned by the appropriate user/group.
+ if running_as_root:
+ os.chown(os.path.dirname(checkout), uid, gid)
+
+ def prepare_hg_store_path():
+ # And ensure the shared store path exists and has proper permissions.
+ if "HG_STORE_PATH" not in os.environ:
+ print("error: HG_STORE_PATH environment variable not set")
+ sys.exit(1)
+
+ store_path = os.environ["HG_STORE_PATH"]
+
+ if not path_in_cache_or_volume(store_path):
+ print_line(
+ b"vcs",
+ b"WARNING: HG_STORE_PATH (%s) not in cache or "
+ b"volume; performance will likely suffer\n"
+ % store_path.encode("utf-8"),
+ )
+
+ try:
+ os.makedirs(store_path)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ if running_as_root:
+ os.chown(store_path, uid, gid)
+
+ repository_paths = [
+ Path(repo["checkout"]) for repo in repositories if repo["checkout"]
+ ]
+ for repo in repositories:
+ if not repo["checkout"]:
+ continue
+ parents = Path(repo["checkout"]).parents
+ if any((path in repository_paths) for path in parents):
+ # Skip creating any checkouts that are inside other checokuts
+ continue
+ prepare_checkout_dir(repo["checkout"])
+
+ if any(repo["checkout"] and repo["repo-type"] == "hg" for repo in repositories):
+ prepare_hg_store_path()
+
+ if IS_POSIX and running_as_root:
+ # Drop permissions to requested user.
+ # This code is modeled after what `sudo` was observed to do in a Docker
+ # container. We do not bother calling setrlimit() because containers have
+ # their own limits.
+ print_line(
+ b"setup",
+ b"running as %s:%s\n"
+ % (args.user.encode("utf-8"), args.group.encode("utf-8")),
+ )
+
+ os.setgroups(gids)
+ os.umask(0o22)
+ os.setresgid(gid, gid, gid)
+ os.setresuid(uid, uid, uid)
+
+ for repo in repositories:
+ vcs_checkout_from_args(repo)
+
+ resource_process = None
+
+ try:
+ for k in ["MOZ_FETCHES_DIR", "UPLOAD_DIR"] + [
+ "{}_PATH".format(repository["project"].upper())
+ for repository in repositories
+ ]:
+ if k in os.environ:
+ os.environ[k] = os.path.abspath(os.environ[k])
+ print_line(
+ b"setup",
+ b"%s is %s\n" % (k.encode("utf-8"), os.environ[k].encode("utf-8")),
+ )
+
+ if "MOZ_FETCHES" in os.environ:
+ fetch_artifacts()
+
+ # Install Python requirements after fetches in case tasks want to use
+ # fetches to grab dependencies.
+ install_pip_requirements(repositories)
+
+ resource_process = maybe_run_resource_monitoring()
+
+ return run_command(b"task", task_args, cwd=args.task_cwd)
+ finally:
+ if resource_process:
+ print_line(b"resource_monitor", b"terminating\n")
+ if IS_WINDOWS:
+ # .terminate() on Windows is not a graceful shutdown, due to
+ # differences in signals. CTRL_BREAK_EVENT will work provided
+ # the subprocess is in a different process group, so this script
+ # isn't also killed.
+ os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT)
+ else:
+ resource_process.terminate()
+ resource_process.wait()
+ fetches_dir = os.environ.get("MOZ_FETCHES_DIR")
+ if fetches_dir and os.path.isdir(fetches_dir):
+ print_line(b"fetches", b"removing %s\n" % fetches_dir.encode("utf-8"))
+ remove(fetches_dir)
+ print_line(b"fetches", b"finished\n")
+
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))