diff options
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task')
-rwxr-xr-x | third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task | 1307 |
1 files changed, 1307 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task new file mode 100755 index 0000000000..f1e281f5cd --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task @@ -0,0 +1,1307 @@ +#!/usr/bin/python3 -u +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +"""Run a task after performing common actions. + +This script is meant to be the "driver" for TaskCluster based tasks. +It receives some common arguments to control the run-time environment. + +It performs actions as requested from the arguments. Then it executes +the requested process and prints its output, prefixing it with the +current time to improve log usefulness. +""" + +import sys +from typing import Optional + +if sys.version_info[0:2] < (3, 5): + print("run-task requires Python 3.5+") + sys.exit(1) + + +import argparse +import datetime +import errno +import io +import json +import os +from pathlib import Path +import re +import shutil +import signal +import socket +import stat +import subprocess +import time + +import urllib.error +import urllib.request + +from threading import Thread + +SECRET_BASEURL_TPL = "http://taskcluster/secrets/v1/secret/{}" + +GITHUB_SSH_FINGERPRINT = ( + b"github.com ssh-rsa " + b"AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkcc" + b"Krpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFz" + b"LQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaS" + b"jB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3sku" + b"a2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ==\n" +) + + +CACHE_UID_GID_MISMATCH = """ +There is a UID/GID mismatch on the cache. This likely means: + +a) different tasks are running as a different user/group +b) different Docker images have different UID/GID for the same user/group + +Our cache policy is that the UID/GID for ALL tasks must be consistent +for the lifetime of the cache. This eliminates permissions problems due +to file/directory user/group ownership. + +To make this error go away, ensure that all Docker images are use +a consistent UID/GID and that all tasks using this cache are running as +the same user/group. +""" + + +NON_EMPTY_VOLUME = """ +error: volume %s is not empty + +Our Docker image policy requires volumes to be empty. + +The volume was likely populated as part of building the Docker image. +Change the Dockerfile and anything run from it to not create files in +any VOLUME. + +A lesser possibility is that you stumbled upon a TaskCluster platform bug +where it fails to use new volumes for tasks. +""" + + +FETCH_CONTENT_NOT_FOUND = """ +error: fetch-content script not found + +The script at `taskcluster/scripts/misc/fetch-content` could not be +detected in the current environment. +""" + +# The exit code to use when caches should be purged and the task retried. +# This is EX_OSFILE (from sysexits.h): +# Some system file does not exist, cannot be opened, or has some +# sort of error (e.g., syntax error). +EXIT_PURGE_CACHE = 72 + + +IS_MACOSX = sys.platform == "darwin" +IS_POSIX = os.name == "posix" +IS_WINDOWS = os.name == "nt" + +# Both mercurial and git use sha1 as revision idenfiers. Luckily, both define +# the same value as the null revision. +# +# https://github.com/git/git/blob/dc04167d378fb29d30e1647ff6ff51dd182bc9a3/t/oid-info/hash-info#L7 +# https://www.mercurial-scm.org/repo/hg-stable/file/82efc31bd152/mercurial/node.py#l30 +NULL_REVISION = "0000000000000000000000000000000000000000" + + +def print_line(prefix, m): + now = datetime.datetime.utcnow().isoformat().encode("utf-8") + # slice microseconds to 3 decimals. + now = now[:-3] if now[-7:-6] == b"." else now + sys.stdout.buffer.write(b"[%s %sZ] %s" % (prefix, now, m)) + sys.stdout.buffer.flush() + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + retry_delay is multiplied by the number of failed attempts to increase + the likelihood of success in subsequent attempts. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # https://docs.python.org/3/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.ENOENT): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + print( + '%s() failed for "%s". Reason: %s (%s). Retrying...' + % (func.__name__, args, e.strerror, e.errno) + ) + time.sleep(retry_count * retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds + between each attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_windows_retry(os.chmod, (path, mode)) + + if not os.path.lexists(path): + print_line(b"remove", b"WARNING: %s does not exists!\n" % path.encode("utf-8")) + return + + """ + On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access + files or directories that exceed MAX_PATH(260) limitation or that ends + with a period. + """ + if ( + sys.platform in ("win32", "cygwin") + and len(path) >= 3 + and path[1] == ":" + and path[2] == "\\" + ): + path = "\\\\?\\%s" % path + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_windows_retry(shutil.rmtree, (path,)) + + +def run_required_command(prefix, args, *, extra_env=None, cwd=None): + res = run_command(prefix, args, extra_env=extra_env, cwd=cwd) + if res: + sys.exit(res) + + +def retry_required_command(prefix, args, *, extra_env=None, cwd=None, retries=2): + backoff = 1 + while True: + res = run_command(prefix, args, extra_env=extra_env, cwd=cwd) + if not res: + return + if not retries: + sys.exit(res) + retries -= 1 + backoff *= 2 + time.sleep(backoff) + + +def run_command(prefix, args, *, extra_env=None, cwd=None): + """Runs a process and prefixes its output with the time. + + Returns the process exit code. + """ + print_line(prefix, b"executing %r\n" % args) + + env = dict(os.environ) + env.update(extra_env or {}) + + # Note: TaskCluster's stdin is a TTY. This attribute is lost + # when we pass sys.stdin to the invoked process. If we cared + # to preserve stdin as a TTY, we could make this work. But until + # someone needs it, don't bother. + + # We want stdout to be bytes on Python 3. That means we can't use + # universal_newlines=True (because it implies text mode). But + # p.stdout.readline() won't work for bytes text streams. So, on Python 3, + # we manually install a latin1 stream wrapper. This allows us to readline() + # and preserves bytes, without losing any data. + + p = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin.fileno(), + cwd=cwd, + env=env, + ) + + stdout = io.TextIOWrapper(p.stdout, encoding="latin1") + + while True: + data = stdout.readline().encode("latin1") + + if data == b"": + break + + print_line(prefix, data) + + return p.wait() + + +def get_posix_user_group(user, group): + import grp + import pwd + + try: + user_record = pwd.getpwnam(user) + except KeyError: + print("could not find user %s; specify a valid user with --user" % user) + sys.exit(1) + + try: + group_record = grp.getgrnam(group) + except KeyError: + print("could not find group %s; specify a valid group with --group" % group) + sys.exit(1) + + # Most tasks use worker:worker. We require they have a specific numeric ID + # because otherwise it is too easy for files written to caches to have + # mismatched numeric IDs, which results in permissions errors. + if user_record.pw_name == "worker" and user_record.pw_uid != 1000: + print("user `worker` must have uid=1000; got %d" % user_record.pw_uid) + sys.exit(1) + + if group_record.gr_name == "worker" and group_record.gr_gid != 1000: + print("group `worker` must have gid=1000; got %d" % group_record.gr_gid) + sys.exit(1) + + # Find all groups to which this user is a member. + gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem] + + return user_record, group_record, gids + + +def write_audit_entry(path, msg): + now = datetime.datetime.utcnow().isoformat().encode("utf-8") + with open(path, "ab") as fh: + fh.write(b"[%sZ %s] %s\n" % (now, os.environb.get(b"TASK_ID", b"UNKNOWN"), msg)) + + +WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR + + +def set_dir_permissions(path, uid, gid): + st = os.lstat(path) + + if st.st_uid != uid or st.st_gid != gid: + os.chown(path, uid, gid) + + # Also make sure dirs are writable in case we need to delete + # them. + if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE: + os.chmod(path, st.st_mode | WANTED_DIR_MODE) + + +def chown_recursive(path, user, group, uid, gid): + print_line( + b"chown", + b"recursively changing ownership of %s to %s:%s\n" + % (path.encode("utf-8"), user.encode("utf-8"), group.encode("utf-8")), + ) + + set_dir_permissions(path, uid, gid) + + for root, dirs, files in os.walk(path): + for d in dirs: + set_dir_permissions(os.path.join(root, d), uid, gid) + + for f in files: + # File may be a symlink that points to nowhere. In which case + # os.chown() would fail because it attempts to follow the + # symlink. We only care about directory entries, not what + # they point to. So setting the owner of the symlink should + # be sufficient. + os.lchown(os.path.join(root, f), uid, gid) + + +def configure_cache_posix(cache, user, group, untrusted_caches, running_as_root): + """Configure a cache path on POSIX platforms. + + For each cache, we write out a special file denoting attributes and + capabilities of run-task and the task being executed. These attributes + are used by subsequent run-task invocations to validate that use of + the cache is acceptable. + + We /could/ blow away the cache data on requirements mismatch. + While this would be convenient, this could result in "competing" tasks + effectively undoing the other's work. This would slow down task + execution in aggregate. Without monitoring for this, people may not notice + the problem and tasks would be slower than they could be. We follow the + principle of "fail fast" to ensure optimal task execution. + + We also write an audit log of who used the caches. This log is printed + during failures to help aid debugging. + """ + + our_requirements = { + # Include a version string that we can bump whenever to trigger + # fresh caches. The actual value is not relevant and doesn't need + # to follow any explicit order. Since taskgraph bakes this file's + # hash into cache names, any change to this file/version is sufficient + # to force the use of a new cache. + b"version=1", + # Include the UID and GID the task will run as to ensure that tasks + # with different UID and GID don't share the same cache. + b"uid=%d" % user.pw_uid, + b"gid=%d" % group.gr_gid, + } + + requires_path = os.path.join(cache, ".cacherequires") + audit_path = os.path.join(cache, ".cachelog") + + # The cache is empty. Configure it. + if not os.listdir(cache): + print_line( + b"cache", + b"cache %s is empty; writing requirements: " + b"%s\n" % (cache.encode("utf-8"), b" ".join(sorted(our_requirements))), + ) + + # We write a requirements file so future invocations know what the + # requirements are. + with open(requires_path, "wb") as fh: + fh.write(b"\n".join(sorted(our_requirements))) + + # And make it read-only as a precaution against deletion. + os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + + write_audit_entry( + audit_path, + b"created; requirements: %s" % b", ".join(sorted(our_requirements)), + ) + + set_dir_permissions(cache, user.pw_uid, group.gr_gid) + return + + # The cache has content and we have a requirements file. Validate + # requirements alignment. + if os.path.exists(requires_path): + with open(requires_path, "rb") as fh: + wanted_requirements = set(fh.read().splitlines()) + + print_line( + b"cache", + b"cache %s exists; requirements: %s\n" + % (cache.encode("utf-8"), b" ".join(sorted(wanted_requirements))), + ) + + missing = wanted_requirements - our_requirements + + # Allow requirements mismatch for uid/gid if and only if caches + # are untrusted. This allows cache behavior on Try to be + # reasonable. Otherwise, random tasks could "poison" cache + # usability by introducing uid/gid mismatches. For untrusted + # environments like Try, this is a perfectly reasonable thing to + # allow. + if ( + missing + and untrusted_caches + and running_as_root + and all(s.startswith((b"uid=", b"gid=")) for s in missing) + ): + print_line( + b"cache", + b"cache %s uid/gid mismatch; this is acceptable " + b"because caches for this task are untrusted; " + b"changing ownership to facilitate cache use\n" % cache.encode("utf-8"), + ) + chown_recursive( + cache, user.pw_name, group.gr_name, user.pw_uid, group.gr_gid + ) + + # And write out the updated reality. + with open(requires_path, "wb") as fh: + fh.write(b"\n".join(sorted(our_requirements))) + + write_audit_entry( + audit_path, + b"chown; requirements: %s" % b", ".join(sorted(our_requirements)), + ) + + elif missing: + print( + "error: requirements for populated cache %s differ from " + "this task" % cache + ) + print( + "cache requirements: %s" + % " ".join(sorted(s.decode("utf-8") for s in wanted_requirements)) + ) + print( + "our requirements: %s" + % " ".join(sorted(s.decode("utf-8") for s in our_requirements)) + ) + if any(s.startswith((b"uid=", b"gid=")) for s in missing): + print(CACHE_UID_GID_MISMATCH) + + write_audit_entry( + audit_path, + b"requirements mismatch; wanted: %s" + % b", ".join(sorted(our_requirements)), + ) + + print("") + print("audit log:") + with open(audit_path, "r") as fh: + print(fh.read()) + + return True + else: + write_audit_entry(audit_path, b"used") + + # We don't need to adjust permissions here because the cache is + # associated with a uid/gid and the first task should have set + # a proper owner/group. + + return + + # The cache has content and no requirements file. This shouldn't + # happen because run-task should be the first thing that touches a + # cache. + print( + "error: cache %s is not empty and is missing a " + ".cacherequires file; the cache names for this task are " + "likely mis-configured or TASKCLUSTER_CACHES is not set " + "properly" % cache + ) + + write_audit_entry(audit_path, b"missing .cacherequires") + return True + + +def configure_volume_posix(volume, user, group, running_as_root): + # The only time we should see files in the volume is if the Docker + # image build put files there. + # + # For the sake of simplicity, our policy is that volumes should be + # empty. This also has the advantage that an empty volume looks + # a lot like an empty cache. Tasks can rely on caches being + # swapped in and out on any volume without any noticeable change + # of behavior. + volume_files = os.listdir(volume) + if volume_files: + print(NON_EMPTY_VOLUME % volume) + print("entries in root directory: %s" % " ".join(sorted(volume_files))) + sys.exit(1) + + # The volume is almost certainly owned by root:root. Chown it so it + # is writable. + + if running_as_root: + print_line( + b"volume", + b"changing ownership of volume %s " + b"to %d:%d\n" % (volume.encode("utf-8"), user.pw_uid, group.gr_gid), + ) + set_dir_permissions(volume, user.pw_uid, group.gr_gid) + + +def _clean_git_checkout(destination_path): + # Delete untracked files (i.e. build products) + print_line(b"vcs", b"cleaning git checkout...\n") + args = [ + "git", + "clean", + # Two -f`s causes subdirectories with `.git` + # directories to be cleaned as well. + "-nxdff", + ] + print_line(b"vcs", b"executing %r\n" % args) + p = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin.fileno(), + cwd=destination_path, + env=os.environ, + ) + stdout = io.TextIOWrapper(p.stdout, encoding="latin1") + ret = p.wait() + if ret: + sys.exit(ret) + data = stdout.read() + prefix = "Would remove " + filenames = [ + os.path.join(destination_path, line[len(prefix) :]) + for line in data.splitlines() + ] + print_line(b"vcs", b"removing %r\n" % filenames) + for filename in filenames: + remove(filename) + print_line(b"vcs", b"successfully cleaned git checkout!\n") + + +def git_checkout( + destination_path: str, + head_repo: str, + base_repo: Optional[str], + base_ref: Optional[str], + base_rev: Optional[str], + ref: Optional[str], + commit: Optional[str], + ssh_key_file: Optional[Path], + ssh_known_hosts_file: Optional[Path], +): + env = {"PYTHONUNBUFFERED": "1"} + + if ssh_key_file and ssh_known_hosts_file: + if not ssh_key_file.exists(): + raise RuntimeError("Can't find specified ssh_key file.") + if not ssh_known_hosts_file.exists(): + raise RuntimeError("Can't find specified known_hosts file.") + env["GIT_SSH_COMMAND"] = " ".join( + [ + "ssh", + "-oIdentityFile={}".format(ssh_key_file.as_posix()), + "-oStrictHostKeyChecking=yes", + "-oUserKnownHostsFile={}".format(ssh_known_hosts_file.as_posix()), + ] + ) + elif ssh_key_file or ssh_known_hosts_file: + raise RuntimeError( + "Must specify both ssh_key_file and ssh_known_hosts_file, if either are specified", + ) + + if not os.path.exists(destination_path): + # Repository doesn't already exist, needs to be cloned + args = [ + "git", + "clone", + base_repo if base_repo else head_repo, + destination_path, + ] + + retry_required_command(b"vcs", args, extra_env=env) + + if base_ref: + args = ["git", "fetch", "origin", base_ref] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # Create local branch so that taskgraph is able to compute differences + # between the head branch and the base one, if needed + args = ["git", "checkout", base_ref] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # When commits are force-pushed (like on a testing branch), base_rev doesn't + # exist on base_ref. Fetching it allows taskgraph to compute differences + # between the previous state before the force-push and the current state. + # + # Unlike base_ref just above, there is no need to checkout the revision: + # it's immediately avaiable after the fetch. + if base_rev and base_rev != NULL_REVISION: + args = ["git", "fetch", "origin", base_rev] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + # If a ref isn't provided, we fetch all refs from head_repo, which may be slow + args = [ + "git", + "fetch", + "--no-tags", + head_repo, + ref if ref else "+refs/heads/*:refs/remotes/work/*", + ] + + retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env) + + args = [ + "git", + "checkout", + "-f", + ] + + if ref: + args.extend(["-B", ref]) + args.append(commit if commit else ref) + + run_required_command(b"vcs", args, cwd=destination_path) + + if os.path.exists(os.path.join(destination_path, ".gitmodules")): + args = [ + "git", + "submodule", + "init", + ] + + run_required_command(b"vcs", args, cwd=destination_path) + + args = [ + "git", + "submodule", + "update", + ] + + run_required_command(b"vcs", args, cwd=destination_path) + + _clean_git_checkout(destination_path) + + args = ["git", "rev-parse", "--verify", "HEAD"] + + commit_hash = subprocess.check_output( + args, cwd=destination_path, universal_newlines=True + ).strip() + assert re.match("^[a-f0-9]{40}$", commit_hash) + + if head_repo.startswith("https://github.com"): + if head_repo.endswith("/"): + head_repo = head_repo[:-1] + + tinderbox_link = "{}/commit/{}".format(head_repo, commit_hash) + repo_name = head_repo.split("/")[-1] + else: + tinderbox_link = head_repo + repo_name = head_repo + + msg = ( + "TinderboxPrint:<a href='{link}' " + "title='Built from {name} commit {commit_hash}'>" + "{commit_hash}</a>\n".format( + commit_hash=commit_hash, link=tinderbox_link, name=repo_name + ) + ) + + print_line(b"vcs", msg.encode("utf-8")) + + return commit_hash + + +def fetch_ssh_secret(secret_name): + """Retrieves the private ssh key, and returns it as a StringIO object""" + secret_url = SECRET_BASEURL_TPL.format(secret_name) + try: + print_line( + b"vcs", + b"fetching secret %s from %s\n" + % (secret_name.encode("utf-8"), secret_url.encode("utf-8")), + ) + res = urllib.request.urlopen(secret_url, timeout=10) + secret = res.read() + try: + secret = json.loads(secret.decode("utf-8")) + except ValueError: + print_line(b"vcs", b"invalid JSON in secret") + sys.exit(1) + except (urllib.error.URLError, socket.timeout): + print_line(b"vcs", b"Unable to retrieve ssh secret. aborting...") + sys.exit(1) + + return secret["secret"]["ssh_privkey"] + + +def hg_checkout( + destination_path: str, + head_repo: str, + base_repo: Optional[str], + store_path: str, + sparse_profile: Optional[str], + branch: Optional[str], + revision: Optional[str], +): + if IS_MACOSX: + hg_bin = "/tools/python27-mercurial/bin/hg" + elif IS_POSIX: + hg_bin = "hg" + elif IS_WINDOWS: + # This is where OCC installs it in the AMIs. + hg_bin = r"C:\Program Files\Mercurial\hg.exe" + if not os.path.exists(hg_bin): + print("could not find Mercurial executable: %s" % hg_bin) + sys.exit(1) + else: + raise RuntimeError("Must be running on mac, posix or windows") + + args = [ + hg_bin, + "robustcheckout", + "--sharebase", + store_path, + "--purge", + ] + + if base_repo: + args.extend(["--upstream", base_repo]) + if sparse_profile: + args.extend(["--sparseprofile", sparse_profile]) + + # Specify method to checkout a revision. This defaults to revisions as + # SHA-1 strings, but also supports symbolic revisions like `tip` via the + # branch flag. + args.extend( + [ + "--branch" if branch else "--revision", + branch or revision, + head_repo, + destination_path, + ] + ) + + run_required_command(b"vcs", args, extra_env={"PYTHONUNBUFFERED": "1"}) + + # Update the current revision hash and ensure that it is well formed. + revision = subprocess.check_output( + [hg_bin, "log", "--rev", ".", "--template", "{node}"], + cwd=destination_path, + # Triggers text mode on Python 3. + universal_newlines=True, + ) + + assert re.match("^[a-f0-9]{40}$", revision) + + msg = ( + "TinderboxPrint:<a href={head_repo}/rev/{revision} " + "title='Built from {repo_name} revision {revision}'>" + "{revision}</a>\n".format( + revision=revision, head_repo=head_repo, repo_name=head_repo.split("/")[-1] + ) + ) + + print_line(b"vcs", msg.encode("utf-8")) + + return revision + + +def fetch_artifacts(): + print_line(b"fetches", b"fetching artifacts\n") + + fetch_content = shutil.which("fetch-content") + + if not fetch_content or not os.path.isfile(fetch_content): + fetch_content = os.path.join(os.path.dirname(__file__), "fetch-content") + + if not os.path.isfile(fetch_content): + print(FETCH_CONTENT_NOT_FOUND) + sys.exit(1) + + cmd = [sys.executable, "-u", fetch_content, "task-artifacts"] + print_line(b"fetches", b"executing %r\n" % cmd) + subprocess.run(cmd, check=True, env=os.environ) + print_line(b"fetches", b"finished fetching artifacts\n") + + +def add_vcs_arguments(parser, project, name): + """Adds arguments to ArgumentParser to control VCS options for a project.""" + + parser.add_argument( + "--%s-checkout" % project, + help="Directory where %s checkout should be created" % name, + ) + parser.add_argument( + "--%s-sparse-profile" % project, + help="Path to sparse profile for %s checkout" % name, + ) + + +def collect_vcs_options(args, project, name): + checkout = getattr(args, "%s_checkout" % project) + sparse_profile = getattr(args, "%s_sparse_profile" % project) + + env_prefix = project.upper() + + repo_type = os.environ.get("%s_REPOSITORY_TYPE" % env_prefix) + base_repo = os.environ.get("%s_BASE_REPOSITORY" % env_prefix) + base_ref = os.environ.get("%s_BASE_REF" % env_prefix) + base_rev = os.environ.get("%s_BASE_REV" % env_prefix) + head_repo = os.environ.get("%s_HEAD_REPOSITORY" % env_prefix) + revision = os.environ.get("%s_HEAD_REV" % env_prefix) + ref = os.environ.get("%s_HEAD_REF" % env_prefix) + pip_requirements = os.environ.get("%s_PIP_REQUIREMENTS" % env_prefix) + private_key_secret = os.environ.get("%s_SSH_SECRET_NAME" % env_prefix) + + store_path = os.environ.get("HG_STORE_PATH") + + # Expand ~ in some paths. + if checkout: + checkout = os.path.abspath(os.path.expanduser(checkout)) + if store_path: + store_path = os.path.abspath(os.path.expanduser(store_path)) + + if pip_requirements: + pip_requirements = os.path.join(checkout, pip_requirements) + + # Some callers set the base repository to mozilla-central for historical + # reasons. Switch to mozilla-unified because robustcheckout works best + # with it. + if base_repo == "https://hg.mozilla.org/mozilla-central": + base_repo = "https://hg.mozilla.org/mozilla-unified" + + return { + "store-path": store_path, + "project": project, + "name": name, + "env-prefix": env_prefix, + "checkout": checkout, + "sparse-profile": sparse_profile, + "base-repo": base_repo, + "base-ref": base_ref, + "base-rev": base_rev, + "head-repo": head_repo, + "revision": revision, + "ref": ref, + "repo-type": repo_type, + "ssh-secret-name": private_key_secret, + "pip-requirements": pip_requirements, + } + + +def vcs_checkout_from_args(options): + + if not options["checkout"]: + if options["ref"] and not options["revision"]: + print("task should be defined in terms of non-symbolic revision") + sys.exit(1) + return + + revision = options["revision"] + ref = options["ref"] + ssh_key_file = None + ssh_known_hosts_file = None + ssh_dir = None + + try: + if options.get("ssh-secret-name"): + ssh_dir = Path("~/.ssh-run-task").expanduser() + os.makedirs(ssh_dir, 0o700) + ssh_key_file = ssh_dir.joinpath("private_ssh_key") + ssh_key = fetch_ssh_secret(options["ssh-secret-name"]) + # We don't use write_text here, to avoid \n -> \r\n on windows + ssh_key_file.write_bytes(ssh_key.encode("ascii")) + ssh_key_file.chmod(0o600) + # TODO: We should pull this from a secret, so it can be updated on old trees + ssh_known_hosts_file = ssh_dir.joinpath("known_hosts") + ssh_known_hosts_file.write_bytes(GITHUB_SSH_FINGERPRINT) + + if options["repo-type"] == "git": + if not revision and not ref: + raise RuntimeError( + "Git requires that either a ref, a revision, or both are provided" + ) + + if not ref: + print("Providing a ref will improve the performance of this checkout") + + revision = git_checkout( + options["checkout"], + options["head-repo"], + options["base-repo"], + options["base-ref"], + options["base-rev"], + ref, + revision, + ssh_key_file, + ssh_known_hosts_file, + ) + elif options["repo-type"] == "hg": + if not revision and not ref: + raise RuntimeError( + "Hg requires that at least one of a ref or revision " "is provided" + ) + + revision = hg_checkout( + options["checkout"], + options["head-repo"], + options["base-repo"], + options["store-path"], + options["sparse-profile"], + ref, + revision, + ) + else: + raise RuntimeError('Type of VCS must be either "git" or "hg"') + finally: + if ssh_dir: + shutil.rmtree(ssh_dir, ignore_errors=True) + pass + + os.environ["%s_HEAD_REV" % options["env-prefix"]] = revision + + +def install_pip_requirements(repositories): + """Install pip requirements files from specified repositories, if necessary.""" + requirements = [ + r["pip-requirements"] for r in repositories if r["pip-requirements"] + ] + if not requirements: + return + + cmd = [sys.executable, "-mpip", "install"] + if os.environ.get("PIP_DISABLE_REQUIRE_HASHES") != "1": + cmd.append("--require-hashes") + + for path in requirements: + cmd.extend(["-r", path]) + + run_required_command(b"pip-install", cmd) + + +def maybe_run_resource_monitoring(): + """Run the resource monitor if available. + + Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160 + and https://bugzil.la/1648051 + + """ + if "MOZ_FETCHES" not in os.environ: + return + if "RESOURCE_MONITOR_OUTPUT" not in os.environ: + return + + prefix = b"resource_monitor" + + executable = "{}/resource-monitor/resource-monitor{}".format( + os.environ.get("MOZ_FETCHES_DIR"), ".exe" if IS_WINDOWS else "" + ) + + if not os.path.exists(executable) or not os.access(executable, os.X_OK): + print_line(prefix, b"%s not executable\n" % executable.encode("utf-8")) + return + args = [ + executable, + "-process", + str(os.getpid()), + "-output", + os.environ["RESOURCE_MONITOR_OUTPUT"], + ] + print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode("utf-8")) + # Avoid environment variables the payload doesn't need. + del os.environ["RESOURCE_MONITOR_OUTPUT"] + + # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too. + process = subprocess.Popen( + args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0, + cwd=os.getcwd(), + ) + + def capture_output(): + fh = io.TextIOWrapper(process.stdout, encoding="latin1") + while True: + data = fh.readline().encode("latin1") + if data == b"": + break + print_line(prefix, data) + + monitor_process = Thread(target=capture_output) + monitor_process.start() + return process + + +def main(args): + os.environ["TASK_WORKDIR"] = os.getcwd() + print_line( + b"setup", + b"run-task started in %s\n" % os.environ["TASK_WORKDIR"].encode("utf-8"), + ) + running_as_root = IS_POSIX and os.getuid() == 0 + + # Arguments up to '--' are ours. After are for the main task + # to be executed. + try: + i = args.index("--") + our_args = args[0:i] + task_args = args[i + 1 :] + except ValueError: + our_args = args + task_args = [] + + parser = argparse.ArgumentParser() + parser.add_argument("--user", default="worker", help="user to run as") + parser.add_argument("--group", default="worker", help="group to run as") + parser.add_argument("--task-cwd", help="directory to run the provided command in") + + repositories = os.environ.get("REPOSITORIES") + if repositories: + repositories = json.loads(repositories) + else: + repositories = {"vcs": "repository"} + + for repository, name in repositories.items(): + add_vcs_arguments(parser, repository, name) + + parser.add_argument( + "--fetch-hgfingerprint", action="store_true", help=argparse.SUPPRESS + ) + + args = parser.parse_args(our_args) + + repositories = [ + collect_vcs_options(args, repository, name) + for (repository, name) in repositories.items() + ] + # Sort repositories so that parent checkout paths come before children + repositories.sort(key=lambda repo: Path(repo["checkout"] or "/").parts) + + uid = gid = gids = None + if IS_POSIX and running_as_root: + user, group, gids = get_posix_user_group(args.user, args.group) + uid = user.pw_uid + gid = group.gr_gid + + if running_as_root and os.path.exists("/dev/kvm"): + # Ensure kvm permissions for worker, required for Android x86 + st = os.stat("/dev/kvm") + os.chmod("/dev/kvm", st.st_mode | 0o666) + + # Validate caches. + # + # Taskgraph should pass in a list of paths that are caches via an + # environment variable (which we don't want to pass down to child + # processes). + + if "TASKCLUSTER_CACHES" in os.environ: + caches = os.environ["TASKCLUSTER_CACHES"].split(";") + del os.environ["TASKCLUSTER_CACHES"] + else: + caches = [] + + if "TASKCLUSTER_UNTRUSTED_CACHES" in os.environ: + untrusted_caches = True + del os.environ["TASKCLUSTER_UNTRUSTED_CACHES"] + else: + untrusted_caches = False + + for cache in caches: + if not os.path.isdir(cache): + print( + "error: cache %s is not a directory; this should never " + "happen" % cache + ) + return 1 + + purge = configure_cache_posix( + cache, user, group, untrusted_caches, running_as_root + ) + + if purge: + return EXIT_PURGE_CACHE + + if "TASKCLUSTER_VOLUMES" in os.environ: + volumes = os.environ["TASKCLUSTER_VOLUMES"].split(";") + del os.environ["TASKCLUSTER_VOLUMES"] + else: + volumes = [] + + if volumes and not IS_POSIX: + print("assertion failed: volumes not expected on Windows") + return 1 + + # Sanitize volumes. + for volume in volumes: + # If a volume is a cache, it was dealt with above. + if volume in caches: + print_line(b"volume", b"volume %s is a cache\n" % volume.encode("utf-8")) + continue + + configure_volume_posix(volume, user, group, running_as_root) + + all_caches_and_volumes = set(map(os.path.normpath, caches)) + all_caches_and_volumes |= set(map(os.path.normpath, volumes)) + + def path_in_cache_or_volume(path): + path = os.path.normpath(path) + + while path: + if path in all_caches_and_volumes: + return True + + path, child = os.path.split(path) + if not child: + break + + return False + + def prepare_checkout_dir(checkout): + if not checkout: + return + + # The checkout path becomes the working directory. Since there are + # special cache files in the cache's root directory and working + # directory purging could blow them away, disallow this scenario. + if os.path.exists(os.path.join(checkout, ".cacherequires")): + print("error: cannot perform vcs checkout into cache root: %s" % checkout) + sys.exit(1) + + # TODO given the performance implications, consider making this a fatal + # error. + if not path_in_cache_or_volume(checkout): + print_line( + b"vcs", + b"WARNING: vcs checkout path (%s) not in cache " + b"or volume; performance will likely suffer\n" + % checkout.encode("utf-8"), + ) + + # Ensure the directory for the source checkout exists. + try: + os.makedirs(os.path.dirname(checkout)) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + # And that it is owned by the appropriate user/group. + if running_as_root: + os.chown(os.path.dirname(checkout), uid, gid) + + def prepare_hg_store_path(): + # And ensure the shared store path exists and has proper permissions. + if "HG_STORE_PATH" not in os.environ: + print("error: HG_STORE_PATH environment variable not set") + sys.exit(1) + + store_path = os.environ["HG_STORE_PATH"] + + if not path_in_cache_or_volume(store_path): + print_line( + b"vcs", + b"WARNING: HG_STORE_PATH (%s) not in cache or " + b"volume; performance will likely suffer\n" + % store_path.encode("utf-8"), + ) + + try: + os.makedirs(store_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + if running_as_root: + os.chown(store_path, uid, gid) + + repository_paths = [ + Path(repo["checkout"]) for repo in repositories if repo["checkout"] + ] + for repo in repositories: + if not repo["checkout"]: + continue + parents = Path(repo["checkout"]).parents + if any((path in repository_paths) for path in parents): + # Skip creating any checkouts that are inside other checokuts + continue + prepare_checkout_dir(repo["checkout"]) + + if any(repo["checkout"] and repo["repo-type"] == "hg" for repo in repositories): + prepare_hg_store_path() + + if IS_POSIX and running_as_root: + # Drop permissions to requested user. + # This code is modeled after what `sudo` was observed to do in a Docker + # container. We do not bother calling setrlimit() because containers have + # their own limits. + print_line( + b"setup", + b"running as %s:%s\n" + % (args.user.encode("utf-8"), args.group.encode("utf-8")), + ) + + os.setgroups(gids) + os.umask(0o22) + os.setresgid(gid, gid, gid) + os.setresuid(uid, uid, uid) + + for repo in repositories: + vcs_checkout_from_args(repo) + + resource_process = None + + try: + for k in ["MOZ_FETCHES_DIR", "UPLOAD_DIR"] + [ + "{}_PATH".format(repository["project"].upper()) + for repository in repositories + ]: + if k in os.environ: + os.environ[k] = os.path.abspath(os.environ[k]) + print_line( + b"setup", + b"%s is %s\n" % (k.encode("utf-8"), os.environ[k].encode("utf-8")), + ) + + if "MOZ_FETCHES" in os.environ: + fetch_artifacts() + + # Install Python requirements after fetches in case tasks want to use + # fetches to grab dependencies. + install_pip_requirements(repositories) + + resource_process = maybe_run_resource_monitoring() + + return run_command(b"task", task_args, cwd=args.task_cwd) + finally: + if resource_process: + print_line(b"resource_monitor", b"terminating\n") + if IS_WINDOWS: + # .terminate() on Windows is not a graceful shutdown, due to + # differences in signals. CTRL_BREAK_EVENT will work provided + # the subprocess is in a different process group, so this script + # isn't also killed. + os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT) + else: + resource_process.terminate() + resource_process.wait() + fetches_dir = os.environ.get("MOZ_FETCHES_DIR") + if fetches_dir and os.path.isdir(fetches_dir): + print_line(b"fetches", b"removing %s\n" % fetches_dir.encode("utf-8")) + remove(fetches_dir) + print_line(b"fetches", b"finished\n") + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) |