summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task')
-rwxr-xr-xthird_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task1307
1 files changed, 1307 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task
new file mode 100755
index 0000000000..f1e281f5cd
--- /dev/null
+++ b/third_party/python/taskcluster_taskgraph/taskgraph/run-task/run-task
@@ -0,0 +1,1307 @@
+#!/usr/bin/python3 -u
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""Run a task after performing common actions.
+
+This script is meant to be the "driver" for TaskCluster based tasks.
+It receives some common arguments to control the run-time environment.
+
+It performs actions as requested from the arguments. Then it executes
+the requested process and prints its output, prefixing it with the
+current time to improve log usefulness.
+"""
+
+import sys
+from typing import Optional
+
+if sys.version_info[0:2] < (3, 5):
+ print("run-task requires Python 3.5+")
+ sys.exit(1)
+
+
+import argparse
+import datetime
+import errno
+import io
+import json
+import os
+from pathlib import Path
+import re
+import shutil
+import signal
+import socket
+import stat
+import subprocess
+import time
+
+import urllib.error
+import urllib.request
+
+from threading import Thread
+
+SECRET_BASEURL_TPL = "http://taskcluster/secrets/v1/secret/{}"
+
+GITHUB_SSH_FINGERPRINT = (
+ b"github.com ssh-rsa "
+ b"AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkcc"
+ b"Krpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFz"
+ b"LQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaS"
+ b"jB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3sku"
+ b"a2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ==\n"
+)
+
+
+CACHE_UID_GID_MISMATCH = """
+There is a UID/GID mismatch on the cache. This likely means:
+
+a) different tasks are running as a different user/group
+b) different Docker images have different UID/GID for the same user/group
+
+Our cache policy is that the UID/GID for ALL tasks must be consistent
+for the lifetime of the cache. This eliminates permissions problems due
+to file/directory user/group ownership.
+
+To make this error go away, ensure that all Docker images are use
+a consistent UID/GID and that all tasks using this cache are running as
+the same user/group.
+"""
+
+
+NON_EMPTY_VOLUME = """
+error: volume %s is not empty
+
+Our Docker image policy requires volumes to be empty.
+
+The volume was likely populated as part of building the Docker image.
+Change the Dockerfile and anything run from it to not create files in
+any VOLUME.
+
+A lesser possibility is that you stumbled upon a TaskCluster platform bug
+where it fails to use new volumes for tasks.
+"""
+
+
+FETCH_CONTENT_NOT_FOUND = """
+error: fetch-content script not found
+
+The script at `taskcluster/scripts/misc/fetch-content` could not be
+detected in the current environment.
+"""
+
+# The exit code to use when caches should be purged and the task retried.
+# This is EX_OSFILE (from sysexits.h):
+# Some system file does not exist, cannot be opened, or has some
+# sort of error (e.g., syntax error).
+EXIT_PURGE_CACHE = 72
+
+
+IS_MACOSX = sys.platform == "darwin"
+IS_POSIX = os.name == "posix"
+IS_WINDOWS = os.name == "nt"
+
+# Both mercurial and git use sha1 as revision idenfiers. Luckily, both define
+# the same value as the null revision.
+#
+# https://github.com/git/git/blob/dc04167d378fb29d30e1647ff6ff51dd182bc9a3/t/oid-info/hash-info#L7
+# https://www.mercurial-scm.org/repo/hg-stable/file/82efc31bd152/mercurial/node.py#l30
+NULL_REVISION = "0000000000000000000000000000000000000000"
+
+
+def print_line(prefix, m):
+ now = datetime.datetime.utcnow().isoformat().encode("utf-8")
+ # slice microseconds to 3 decimals.
+ now = now[:-3] if now[-7:-6] == b"." else now
+ sys.stdout.buffer.write(b"[%s %sZ] %s" % (prefix, now, m))
+ sys.stdout.buffer.flush()
+
+
+def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5):
+ """
+ It's possible to see spurious errors on Windows due to various things
+ keeping a handle to the directory open (explorer, virus scanners, etc)
+ So we try a few times if it fails with a known error.
+ retry_delay is multiplied by the number of failed attempts to increase
+ the likelihood of success in subsequent attempts.
+ """
+ retry_count = 0
+ while True:
+ try:
+ func(*args)
+ except OSError as e:
+ # Error codes are defined in:
+ # https://docs.python.org/3/library/errno.html#module-errno
+ if e.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.ENOENT):
+ raise
+
+ if retry_count == retry_max:
+ raise
+
+ retry_count += 1
+
+ print(
+ '%s() failed for "%s". Reason: %s (%s). Retrying...'
+ % (func.__name__, args, e.strerror, e.errno)
+ )
+ time.sleep(retry_count * retry_delay)
+ else:
+ # If no exception has been thrown it should be done
+ break
+
+
+def remove(path):
+ """Removes the specified file, link, or directory tree.
+
+ This is a replacement for shutil.rmtree that works better under
+ windows. It does the following things:
+
+ - check path access for the current user before trying to remove
+ - retry operations on some known errors due to various things keeping
+ a handle on file paths - like explorer, virus scanners, etc. The
+ known errors are errno.EACCES and errno.ENOTEMPTY, and it will
+ retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds
+ between each attempt.
+
+ Note that no error will be raised if the given path does not exists.
+
+ :param path: path to be removed
+ """
+
+ def _update_permissions(path):
+ """Sets specified pemissions depending on filetype"""
+ if os.path.islink(path):
+ # Path is a symlink which we don't have to modify
+ # because it should already have all the needed permissions
+ return
+
+ stats = os.stat(path)
+
+ if os.path.isfile(path):
+ mode = stats.st_mode | stat.S_IWUSR
+ elif os.path.isdir(path):
+ mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR
+ else:
+ # Not supported type
+ return
+
+ _call_windows_retry(os.chmod, (path, mode))
+
+ if not os.path.lexists(path):
+ print_line(b"remove", b"WARNING: %s does not exists!\n" % path.encode("utf-8"))
+ return
+
+ """
+ On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access
+ files or directories that exceed MAX_PATH(260) limitation or that ends
+ with a period.
+ """
+ if (
+ sys.platform in ("win32", "cygwin")
+ and len(path) >= 3
+ and path[1] == ":"
+ and path[2] == "\\"
+ ):
+ path = "\\\\?\\%s" % path
+
+ if os.path.isfile(path) or os.path.islink(path):
+ # Verify the file or link is read/write for the current user
+ _update_permissions(path)
+ _call_windows_retry(os.remove, (path,))
+
+ elif os.path.isdir(path):
+ # Verify the directory is read/write/execute for the current user
+ _update_permissions(path)
+
+ # We're ensuring that every nested item has writable permission.
+ for root, dirs, files in os.walk(path):
+ for entry in dirs + files:
+ _update_permissions(os.path.join(root, entry))
+ _call_windows_retry(shutil.rmtree, (path,))
+
+
+def run_required_command(prefix, args, *, extra_env=None, cwd=None):
+ res = run_command(prefix, args, extra_env=extra_env, cwd=cwd)
+ if res:
+ sys.exit(res)
+
+
+def retry_required_command(prefix, args, *, extra_env=None, cwd=None, retries=2):
+ backoff = 1
+ while True:
+ res = run_command(prefix, args, extra_env=extra_env, cwd=cwd)
+ if not res:
+ return
+ if not retries:
+ sys.exit(res)
+ retries -= 1
+ backoff *= 2
+ time.sleep(backoff)
+
+
+def run_command(prefix, args, *, extra_env=None, cwd=None):
+ """Runs a process and prefixes its output with the time.
+
+ Returns the process exit code.
+ """
+ print_line(prefix, b"executing %r\n" % args)
+
+ env = dict(os.environ)
+ env.update(extra_env or {})
+
+ # Note: TaskCluster's stdin is a TTY. This attribute is lost
+ # when we pass sys.stdin to the invoked process. If we cared
+ # to preserve stdin as a TTY, we could make this work. But until
+ # someone needs it, don't bother.
+
+ # We want stdout to be bytes on Python 3. That means we can't use
+ # universal_newlines=True (because it implies text mode). But
+ # p.stdout.readline() won't work for bytes text streams. So, on Python 3,
+ # we manually install a latin1 stream wrapper. This allows us to readline()
+ # and preserves bytes, without losing any data.
+
+ p = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=sys.stdin.fileno(),
+ cwd=cwd,
+ env=env,
+ )
+
+ stdout = io.TextIOWrapper(p.stdout, encoding="latin1")
+
+ while True:
+ data = stdout.readline().encode("latin1")
+
+ if data == b"":
+ break
+
+ print_line(prefix, data)
+
+ return p.wait()
+
+
+def get_posix_user_group(user, group):
+ import grp
+ import pwd
+
+ try:
+ user_record = pwd.getpwnam(user)
+ except KeyError:
+ print("could not find user %s; specify a valid user with --user" % user)
+ sys.exit(1)
+
+ try:
+ group_record = grp.getgrnam(group)
+ except KeyError:
+ print("could not find group %s; specify a valid group with --group" % group)
+ sys.exit(1)
+
+ # Most tasks use worker:worker. We require they have a specific numeric ID
+ # because otherwise it is too easy for files written to caches to have
+ # mismatched numeric IDs, which results in permissions errors.
+ if user_record.pw_name == "worker" and user_record.pw_uid != 1000:
+ print("user `worker` must have uid=1000; got %d" % user_record.pw_uid)
+ sys.exit(1)
+
+ if group_record.gr_name == "worker" and group_record.gr_gid != 1000:
+ print("group `worker` must have gid=1000; got %d" % group_record.gr_gid)
+ sys.exit(1)
+
+ # Find all groups to which this user is a member.
+ gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem]
+
+ return user_record, group_record, gids
+
+
+def write_audit_entry(path, msg):
+ now = datetime.datetime.utcnow().isoformat().encode("utf-8")
+ with open(path, "ab") as fh:
+ fh.write(b"[%sZ %s] %s\n" % (now, os.environb.get(b"TASK_ID", b"UNKNOWN"), msg))
+
+
+WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR
+
+
+def set_dir_permissions(path, uid, gid):
+ st = os.lstat(path)
+
+ if st.st_uid != uid or st.st_gid != gid:
+ os.chown(path, uid, gid)
+
+ # Also make sure dirs are writable in case we need to delete
+ # them.
+ if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE:
+ os.chmod(path, st.st_mode | WANTED_DIR_MODE)
+
+
+def chown_recursive(path, user, group, uid, gid):
+ print_line(
+ b"chown",
+ b"recursively changing ownership of %s to %s:%s\n"
+ % (path.encode("utf-8"), user.encode("utf-8"), group.encode("utf-8")),
+ )
+
+ set_dir_permissions(path, uid, gid)
+
+ for root, dirs, files in os.walk(path):
+ for d in dirs:
+ set_dir_permissions(os.path.join(root, d), uid, gid)
+
+ for f in files:
+ # File may be a symlink that points to nowhere. In which case
+ # os.chown() would fail because it attempts to follow the
+ # symlink. We only care about directory entries, not what
+ # they point to. So setting the owner of the symlink should
+ # be sufficient.
+ os.lchown(os.path.join(root, f), uid, gid)
+
+
+def configure_cache_posix(cache, user, group, untrusted_caches, running_as_root):
+ """Configure a cache path on POSIX platforms.
+
+ For each cache, we write out a special file denoting attributes and
+ capabilities of run-task and the task being executed. These attributes
+ are used by subsequent run-task invocations to validate that use of
+ the cache is acceptable.
+
+ We /could/ blow away the cache data on requirements mismatch.
+ While this would be convenient, this could result in "competing" tasks
+ effectively undoing the other's work. This would slow down task
+ execution in aggregate. Without monitoring for this, people may not notice
+ the problem and tasks would be slower than they could be. We follow the
+ principle of "fail fast" to ensure optimal task execution.
+
+ We also write an audit log of who used the caches. This log is printed
+ during failures to help aid debugging.
+ """
+
+ our_requirements = {
+ # Include a version string that we can bump whenever to trigger
+ # fresh caches. The actual value is not relevant and doesn't need
+ # to follow any explicit order. Since taskgraph bakes this file's
+ # hash into cache names, any change to this file/version is sufficient
+ # to force the use of a new cache.
+ b"version=1",
+ # Include the UID and GID the task will run as to ensure that tasks
+ # with different UID and GID don't share the same cache.
+ b"uid=%d" % user.pw_uid,
+ b"gid=%d" % group.gr_gid,
+ }
+
+ requires_path = os.path.join(cache, ".cacherequires")
+ audit_path = os.path.join(cache, ".cachelog")
+
+ # The cache is empty. Configure it.
+ if not os.listdir(cache):
+ print_line(
+ b"cache",
+ b"cache %s is empty; writing requirements: "
+ b"%s\n" % (cache.encode("utf-8"), b" ".join(sorted(our_requirements))),
+ )
+
+ # We write a requirements file so future invocations know what the
+ # requirements are.
+ with open(requires_path, "wb") as fh:
+ fh.write(b"\n".join(sorted(our_requirements)))
+
+ # And make it read-only as a precaution against deletion.
+ os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+
+ write_audit_entry(
+ audit_path,
+ b"created; requirements: %s" % b", ".join(sorted(our_requirements)),
+ )
+
+ set_dir_permissions(cache, user.pw_uid, group.gr_gid)
+ return
+
+ # The cache has content and we have a requirements file. Validate
+ # requirements alignment.
+ if os.path.exists(requires_path):
+ with open(requires_path, "rb") as fh:
+ wanted_requirements = set(fh.read().splitlines())
+
+ print_line(
+ b"cache",
+ b"cache %s exists; requirements: %s\n"
+ % (cache.encode("utf-8"), b" ".join(sorted(wanted_requirements))),
+ )
+
+ missing = wanted_requirements - our_requirements
+
+ # Allow requirements mismatch for uid/gid if and only if caches
+ # are untrusted. This allows cache behavior on Try to be
+ # reasonable. Otherwise, random tasks could "poison" cache
+ # usability by introducing uid/gid mismatches. For untrusted
+ # environments like Try, this is a perfectly reasonable thing to
+ # allow.
+ if (
+ missing
+ and untrusted_caches
+ and running_as_root
+ and all(s.startswith((b"uid=", b"gid=")) for s in missing)
+ ):
+ print_line(
+ b"cache",
+ b"cache %s uid/gid mismatch; this is acceptable "
+ b"because caches for this task are untrusted; "
+ b"changing ownership to facilitate cache use\n" % cache.encode("utf-8"),
+ )
+ chown_recursive(
+ cache, user.pw_name, group.gr_name, user.pw_uid, group.gr_gid
+ )
+
+ # And write out the updated reality.
+ with open(requires_path, "wb") as fh:
+ fh.write(b"\n".join(sorted(our_requirements)))
+
+ write_audit_entry(
+ audit_path,
+ b"chown; requirements: %s" % b", ".join(sorted(our_requirements)),
+ )
+
+ elif missing:
+ print(
+ "error: requirements for populated cache %s differ from "
+ "this task" % cache
+ )
+ print(
+ "cache requirements: %s"
+ % " ".join(sorted(s.decode("utf-8") for s in wanted_requirements))
+ )
+ print(
+ "our requirements: %s"
+ % " ".join(sorted(s.decode("utf-8") for s in our_requirements))
+ )
+ if any(s.startswith((b"uid=", b"gid=")) for s in missing):
+ print(CACHE_UID_GID_MISMATCH)
+
+ write_audit_entry(
+ audit_path,
+ b"requirements mismatch; wanted: %s"
+ % b", ".join(sorted(our_requirements)),
+ )
+
+ print("")
+ print("audit log:")
+ with open(audit_path, "r") as fh:
+ print(fh.read())
+
+ return True
+ else:
+ write_audit_entry(audit_path, b"used")
+
+ # We don't need to adjust permissions here because the cache is
+ # associated with a uid/gid and the first task should have set
+ # a proper owner/group.
+
+ return
+
+ # The cache has content and no requirements file. This shouldn't
+ # happen because run-task should be the first thing that touches a
+ # cache.
+ print(
+ "error: cache %s is not empty and is missing a "
+ ".cacherequires file; the cache names for this task are "
+ "likely mis-configured or TASKCLUSTER_CACHES is not set "
+ "properly" % cache
+ )
+
+ write_audit_entry(audit_path, b"missing .cacherequires")
+ return True
+
+
+def configure_volume_posix(volume, user, group, running_as_root):
+ # The only time we should see files in the volume is if the Docker
+ # image build put files there.
+ #
+ # For the sake of simplicity, our policy is that volumes should be
+ # empty. This also has the advantage that an empty volume looks
+ # a lot like an empty cache. Tasks can rely on caches being
+ # swapped in and out on any volume without any noticeable change
+ # of behavior.
+ volume_files = os.listdir(volume)
+ if volume_files:
+ print(NON_EMPTY_VOLUME % volume)
+ print("entries in root directory: %s" % " ".join(sorted(volume_files)))
+ sys.exit(1)
+
+ # The volume is almost certainly owned by root:root. Chown it so it
+ # is writable.
+
+ if running_as_root:
+ print_line(
+ b"volume",
+ b"changing ownership of volume %s "
+ b"to %d:%d\n" % (volume.encode("utf-8"), user.pw_uid, group.gr_gid),
+ )
+ set_dir_permissions(volume, user.pw_uid, group.gr_gid)
+
+
+def _clean_git_checkout(destination_path):
+ # Delete untracked files (i.e. build products)
+ print_line(b"vcs", b"cleaning git checkout...\n")
+ args = [
+ "git",
+ "clean",
+ # Two -f`s causes subdirectories with `.git`
+ # directories to be cleaned as well.
+ "-nxdff",
+ ]
+ print_line(b"vcs", b"executing %r\n" % args)
+ p = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=sys.stdin.fileno(),
+ cwd=destination_path,
+ env=os.environ,
+ )
+ stdout = io.TextIOWrapper(p.stdout, encoding="latin1")
+ ret = p.wait()
+ if ret:
+ sys.exit(ret)
+ data = stdout.read()
+ prefix = "Would remove "
+ filenames = [
+ os.path.join(destination_path, line[len(prefix) :])
+ for line in data.splitlines()
+ ]
+ print_line(b"vcs", b"removing %r\n" % filenames)
+ for filename in filenames:
+ remove(filename)
+ print_line(b"vcs", b"successfully cleaned git checkout!\n")
+
+
+def git_checkout(
+ destination_path: str,
+ head_repo: str,
+ base_repo: Optional[str],
+ base_ref: Optional[str],
+ base_rev: Optional[str],
+ ref: Optional[str],
+ commit: Optional[str],
+ ssh_key_file: Optional[Path],
+ ssh_known_hosts_file: Optional[Path],
+):
+ env = {"PYTHONUNBUFFERED": "1"}
+
+ if ssh_key_file and ssh_known_hosts_file:
+ if not ssh_key_file.exists():
+ raise RuntimeError("Can't find specified ssh_key file.")
+ if not ssh_known_hosts_file.exists():
+ raise RuntimeError("Can't find specified known_hosts file.")
+ env["GIT_SSH_COMMAND"] = " ".join(
+ [
+ "ssh",
+ "-oIdentityFile={}".format(ssh_key_file.as_posix()),
+ "-oStrictHostKeyChecking=yes",
+ "-oUserKnownHostsFile={}".format(ssh_known_hosts_file.as_posix()),
+ ]
+ )
+ elif ssh_key_file or ssh_known_hosts_file:
+ raise RuntimeError(
+ "Must specify both ssh_key_file and ssh_known_hosts_file, if either are specified",
+ )
+
+ if not os.path.exists(destination_path):
+ # Repository doesn't already exist, needs to be cloned
+ args = [
+ "git",
+ "clone",
+ base_repo if base_repo else head_repo,
+ destination_path,
+ ]
+
+ retry_required_command(b"vcs", args, extra_env=env)
+
+ if base_ref:
+ args = ["git", "fetch", "origin", base_ref]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # Create local branch so that taskgraph is able to compute differences
+ # between the head branch and the base one, if needed
+ args = ["git", "checkout", base_ref]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # When commits are force-pushed (like on a testing branch), base_rev doesn't
+ # exist on base_ref. Fetching it allows taskgraph to compute differences
+ # between the previous state before the force-push and the current state.
+ #
+ # Unlike base_ref just above, there is no need to checkout the revision:
+ # it's immediately avaiable after the fetch.
+ if base_rev and base_rev != NULL_REVISION:
+ args = ["git", "fetch", "origin", base_rev]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ # If a ref isn't provided, we fetch all refs from head_repo, which may be slow
+ args = [
+ "git",
+ "fetch",
+ "--no-tags",
+ head_repo,
+ ref if ref else "+refs/heads/*:refs/remotes/work/*",
+ ]
+
+ retry_required_command(b"vcs", args, cwd=destination_path, extra_env=env)
+
+ args = [
+ "git",
+ "checkout",
+ "-f",
+ ]
+
+ if ref:
+ args.extend(["-B", ref])
+ args.append(commit if commit else ref)
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ if os.path.exists(os.path.join(destination_path, ".gitmodules")):
+ args = [
+ "git",
+ "submodule",
+ "init",
+ ]
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ args = [
+ "git",
+ "submodule",
+ "update",
+ ]
+
+ run_required_command(b"vcs", args, cwd=destination_path)
+
+ _clean_git_checkout(destination_path)
+
+ args = ["git", "rev-parse", "--verify", "HEAD"]
+
+ commit_hash = subprocess.check_output(
+ args, cwd=destination_path, universal_newlines=True
+ ).strip()
+ assert re.match("^[a-f0-9]{40}$", commit_hash)
+
+ if head_repo.startswith("https://github.com"):
+ if head_repo.endswith("/"):
+ head_repo = head_repo[:-1]
+
+ tinderbox_link = "{}/commit/{}".format(head_repo, commit_hash)
+ repo_name = head_repo.split("/")[-1]
+ else:
+ tinderbox_link = head_repo
+ repo_name = head_repo
+
+ msg = (
+ "TinderboxPrint:<a href='{link}' "
+ "title='Built from {name} commit {commit_hash}'>"
+ "{commit_hash}</a>\n".format(
+ commit_hash=commit_hash, link=tinderbox_link, name=repo_name
+ )
+ )
+
+ print_line(b"vcs", msg.encode("utf-8"))
+
+ return commit_hash
+
+
+def fetch_ssh_secret(secret_name):
+ """Retrieves the private ssh key, and returns it as a StringIO object"""
+ secret_url = SECRET_BASEURL_TPL.format(secret_name)
+ try:
+ print_line(
+ b"vcs",
+ b"fetching secret %s from %s\n"
+ % (secret_name.encode("utf-8"), secret_url.encode("utf-8")),
+ )
+ res = urllib.request.urlopen(secret_url, timeout=10)
+ secret = res.read()
+ try:
+ secret = json.loads(secret.decode("utf-8"))
+ except ValueError:
+ print_line(b"vcs", b"invalid JSON in secret")
+ sys.exit(1)
+ except (urllib.error.URLError, socket.timeout):
+ print_line(b"vcs", b"Unable to retrieve ssh secret. aborting...")
+ sys.exit(1)
+
+ return secret["secret"]["ssh_privkey"]
+
+
+def hg_checkout(
+ destination_path: str,
+ head_repo: str,
+ base_repo: Optional[str],
+ store_path: str,
+ sparse_profile: Optional[str],
+ branch: Optional[str],
+ revision: Optional[str],
+):
+ if IS_MACOSX:
+ hg_bin = "/tools/python27-mercurial/bin/hg"
+ elif IS_POSIX:
+ hg_bin = "hg"
+ elif IS_WINDOWS:
+ # This is where OCC installs it in the AMIs.
+ hg_bin = r"C:\Program Files\Mercurial\hg.exe"
+ if not os.path.exists(hg_bin):
+ print("could not find Mercurial executable: %s" % hg_bin)
+ sys.exit(1)
+ else:
+ raise RuntimeError("Must be running on mac, posix or windows")
+
+ args = [
+ hg_bin,
+ "robustcheckout",
+ "--sharebase",
+ store_path,
+ "--purge",
+ ]
+
+ if base_repo:
+ args.extend(["--upstream", base_repo])
+ if sparse_profile:
+ args.extend(["--sparseprofile", sparse_profile])
+
+ # Specify method to checkout a revision. This defaults to revisions as
+ # SHA-1 strings, but also supports symbolic revisions like `tip` via the
+ # branch flag.
+ args.extend(
+ [
+ "--branch" if branch else "--revision",
+ branch or revision,
+ head_repo,
+ destination_path,
+ ]
+ )
+
+ run_required_command(b"vcs", args, extra_env={"PYTHONUNBUFFERED": "1"})
+
+ # Update the current revision hash and ensure that it is well formed.
+ revision = subprocess.check_output(
+ [hg_bin, "log", "--rev", ".", "--template", "{node}"],
+ cwd=destination_path,
+ # Triggers text mode on Python 3.
+ universal_newlines=True,
+ )
+
+ assert re.match("^[a-f0-9]{40}$", revision)
+
+ msg = (
+ "TinderboxPrint:<a href={head_repo}/rev/{revision} "
+ "title='Built from {repo_name} revision {revision}'>"
+ "{revision}</a>\n".format(
+ revision=revision, head_repo=head_repo, repo_name=head_repo.split("/")[-1]
+ )
+ )
+
+ print_line(b"vcs", msg.encode("utf-8"))
+
+ return revision
+
+
+def fetch_artifacts():
+ print_line(b"fetches", b"fetching artifacts\n")
+
+ fetch_content = shutil.which("fetch-content")
+
+ if not fetch_content or not os.path.isfile(fetch_content):
+ fetch_content = os.path.join(os.path.dirname(__file__), "fetch-content")
+
+ if not os.path.isfile(fetch_content):
+ print(FETCH_CONTENT_NOT_FOUND)
+ sys.exit(1)
+
+ cmd = [sys.executable, "-u", fetch_content, "task-artifacts"]
+ print_line(b"fetches", b"executing %r\n" % cmd)
+ subprocess.run(cmd, check=True, env=os.environ)
+ print_line(b"fetches", b"finished fetching artifacts\n")
+
+
+def add_vcs_arguments(parser, project, name):
+ """Adds arguments to ArgumentParser to control VCS options for a project."""
+
+ parser.add_argument(
+ "--%s-checkout" % project,
+ help="Directory where %s checkout should be created" % name,
+ )
+ parser.add_argument(
+ "--%s-sparse-profile" % project,
+ help="Path to sparse profile for %s checkout" % name,
+ )
+
+
+def collect_vcs_options(args, project, name):
+ checkout = getattr(args, "%s_checkout" % project)
+ sparse_profile = getattr(args, "%s_sparse_profile" % project)
+
+ env_prefix = project.upper()
+
+ repo_type = os.environ.get("%s_REPOSITORY_TYPE" % env_prefix)
+ base_repo = os.environ.get("%s_BASE_REPOSITORY" % env_prefix)
+ base_ref = os.environ.get("%s_BASE_REF" % env_prefix)
+ base_rev = os.environ.get("%s_BASE_REV" % env_prefix)
+ head_repo = os.environ.get("%s_HEAD_REPOSITORY" % env_prefix)
+ revision = os.environ.get("%s_HEAD_REV" % env_prefix)
+ ref = os.environ.get("%s_HEAD_REF" % env_prefix)
+ pip_requirements = os.environ.get("%s_PIP_REQUIREMENTS" % env_prefix)
+ private_key_secret = os.environ.get("%s_SSH_SECRET_NAME" % env_prefix)
+
+ store_path = os.environ.get("HG_STORE_PATH")
+
+ # Expand ~ in some paths.
+ if checkout:
+ checkout = os.path.abspath(os.path.expanduser(checkout))
+ if store_path:
+ store_path = os.path.abspath(os.path.expanduser(store_path))
+
+ if pip_requirements:
+ pip_requirements = os.path.join(checkout, pip_requirements)
+
+ # Some callers set the base repository to mozilla-central for historical
+ # reasons. Switch to mozilla-unified because robustcheckout works best
+ # with it.
+ if base_repo == "https://hg.mozilla.org/mozilla-central":
+ base_repo = "https://hg.mozilla.org/mozilla-unified"
+
+ return {
+ "store-path": store_path,
+ "project": project,
+ "name": name,
+ "env-prefix": env_prefix,
+ "checkout": checkout,
+ "sparse-profile": sparse_profile,
+ "base-repo": base_repo,
+ "base-ref": base_ref,
+ "base-rev": base_rev,
+ "head-repo": head_repo,
+ "revision": revision,
+ "ref": ref,
+ "repo-type": repo_type,
+ "ssh-secret-name": private_key_secret,
+ "pip-requirements": pip_requirements,
+ }
+
+
+def vcs_checkout_from_args(options):
+
+ if not options["checkout"]:
+ if options["ref"] and not options["revision"]:
+ print("task should be defined in terms of non-symbolic revision")
+ sys.exit(1)
+ return
+
+ revision = options["revision"]
+ ref = options["ref"]
+ ssh_key_file = None
+ ssh_known_hosts_file = None
+ ssh_dir = None
+
+ try:
+ if options.get("ssh-secret-name"):
+ ssh_dir = Path("~/.ssh-run-task").expanduser()
+ os.makedirs(ssh_dir, 0o700)
+ ssh_key_file = ssh_dir.joinpath("private_ssh_key")
+ ssh_key = fetch_ssh_secret(options["ssh-secret-name"])
+ # We don't use write_text here, to avoid \n -> \r\n on windows
+ ssh_key_file.write_bytes(ssh_key.encode("ascii"))
+ ssh_key_file.chmod(0o600)
+ # TODO: We should pull this from a secret, so it can be updated on old trees
+ ssh_known_hosts_file = ssh_dir.joinpath("known_hosts")
+ ssh_known_hosts_file.write_bytes(GITHUB_SSH_FINGERPRINT)
+
+ if options["repo-type"] == "git":
+ if not revision and not ref:
+ raise RuntimeError(
+ "Git requires that either a ref, a revision, or both are provided"
+ )
+
+ if not ref:
+ print("Providing a ref will improve the performance of this checkout")
+
+ revision = git_checkout(
+ options["checkout"],
+ options["head-repo"],
+ options["base-repo"],
+ options["base-ref"],
+ options["base-rev"],
+ ref,
+ revision,
+ ssh_key_file,
+ ssh_known_hosts_file,
+ )
+ elif options["repo-type"] == "hg":
+ if not revision and not ref:
+ raise RuntimeError(
+ "Hg requires that at least one of a ref or revision " "is provided"
+ )
+
+ revision = hg_checkout(
+ options["checkout"],
+ options["head-repo"],
+ options["base-repo"],
+ options["store-path"],
+ options["sparse-profile"],
+ ref,
+ revision,
+ )
+ else:
+ raise RuntimeError('Type of VCS must be either "git" or "hg"')
+ finally:
+ if ssh_dir:
+ shutil.rmtree(ssh_dir, ignore_errors=True)
+ pass
+
+ os.environ["%s_HEAD_REV" % options["env-prefix"]] = revision
+
+
+def install_pip_requirements(repositories):
+ """Install pip requirements files from specified repositories, if necessary."""
+ requirements = [
+ r["pip-requirements"] for r in repositories if r["pip-requirements"]
+ ]
+ if not requirements:
+ return
+
+ cmd = [sys.executable, "-mpip", "install"]
+ if os.environ.get("PIP_DISABLE_REQUIRE_HASHES") != "1":
+ cmd.append("--require-hashes")
+
+ for path in requirements:
+ cmd.extend(["-r", path])
+
+ run_required_command(b"pip-install", cmd)
+
+
+def maybe_run_resource_monitoring():
+ """Run the resource monitor if available.
+
+ Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160
+ and https://bugzil.la/1648051
+
+ """
+ if "MOZ_FETCHES" not in os.environ:
+ return
+ if "RESOURCE_MONITOR_OUTPUT" not in os.environ:
+ return
+
+ prefix = b"resource_monitor"
+
+ executable = "{}/resource-monitor/resource-monitor{}".format(
+ os.environ.get("MOZ_FETCHES_DIR"), ".exe" if IS_WINDOWS else ""
+ )
+
+ if not os.path.exists(executable) or not os.access(executable, os.X_OK):
+ print_line(prefix, b"%s not executable\n" % executable.encode("utf-8"))
+ return
+ args = [
+ executable,
+ "-process",
+ str(os.getpid()),
+ "-output",
+ os.environ["RESOURCE_MONITOR_OUTPUT"],
+ ]
+ print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode("utf-8"))
+ # Avoid environment variables the payload doesn't need.
+ del os.environ["RESOURCE_MONITOR_OUTPUT"]
+
+ # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too.
+ process = subprocess.Popen(
+ args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0,
+ cwd=os.getcwd(),
+ )
+
+ def capture_output():
+ fh = io.TextIOWrapper(process.stdout, encoding="latin1")
+ while True:
+ data = fh.readline().encode("latin1")
+ if data == b"":
+ break
+ print_line(prefix, data)
+
+ monitor_process = Thread(target=capture_output)
+ monitor_process.start()
+ return process
+
+
+def main(args):
+ os.environ["TASK_WORKDIR"] = os.getcwd()
+ print_line(
+ b"setup",
+ b"run-task started in %s\n" % os.environ["TASK_WORKDIR"].encode("utf-8"),
+ )
+ running_as_root = IS_POSIX and os.getuid() == 0
+
+ # Arguments up to '--' are ours. After are for the main task
+ # to be executed.
+ try:
+ i = args.index("--")
+ our_args = args[0:i]
+ task_args = args[i + 1 :]
+ except ValueError:
+ our_args = args
+ task_args = []
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--user", default="worker", help="user to run as")
+ parser.add_argument("--group", default="worker", help="group to run as")
+ parser.add_argument("--task-cwd", help="directory to run the provided command in")
+
+ repositories = os.environ.get("REPOSITORIES")
+ if repositories:
+ repositories = json.loads(repositories)
+ else:
+ repositories = {"vcs": "repository"}
+
+ for repository, name in repositories.items():
+ add_vcs_arguments(parser, repository, name)
+
+ parser.add_argument(
+ "--fetch-hgfingerprint", action="store_true", help=argparse.SUPPRESS
+ )
+
+ args = parser.parse_args(our_args)
+
+ repositories = [
+ collect_vcs_options(args, repository, name)
+ for (repository, name) in repositories.items()
+ ]
+ # Sort repositories so that parent checkout paths come before children
+ repositories.sort(key=lambda repo: Path(repo["checkout"] or "/").parts)
+
+ uid = gid = gids = None
+ if IS_POSIX and running_as_root:
+ user, group, gids = get_posix_user_group(args.user, args.group)
+ uid = user.pw_uid
+ gid = group.gr_gid
+
+ if running_as_root and os.path.exists("/dev/kvm"):
+ # Ensure kvm permissions for worker, required for Android x86
+ st = os.stat("/dev/kvm")
+ os.chmod("/dev/kvm", st.st_mode | 0o666)
+
+ # Validate caches.
+ #
+ # Taskgraph should pass in a list of paths that are caches via an
+ # environment variable (which we don't want to pass down to child
+ # processes).
+
+ if "TASKCLUSTER_CACHES" in os.environ:
+ caches = os.environ["TASKCLUSTER_CACHES"].split(";")
+ del os.environ["TASKCLUSTER_CACHES"]
+ else:
+ caches = []
+
+ if "TASKCLUSTER_UNTRUSTED_CACHES" in os.environ:
+ untrusted_caches = True
+ del os.environ["TASKCLUSTER_UNTRUSTED_CACHES"]
+ else:
+ untrusted_caches = False
+
+ for cache in caches:
+ if not os.path.isdir(cache):
+ print(
+ "error: cache %s is not a directory; this should never "
+ "happen" % cache
+ )
+ return 1
+
+ purge = configure_cache_posix(
+ cache, user, group, untrusted_caches, running_as_root
+ )
+
+ if purge:
+ return EXIT_PURGE_CACHE
+
+ if "TASKCLUSTER_VOLUMES" in os.environ:
+ volumes = os.environ["TASKCLUSTER_VOLUMES"].split(";")
+ del os.environ["TASKCLUSTER_VOLUMES"]
+ else:
+ volumes = []
+
+ if volumes and not IS_POSIX:
+ print("assertion failed: volumes not expected on Windows")
+ return 1
+
+ # Sanitize volumes.
+ for volume in volumes:
+ # If a volume is a cache, it was dealt with above.
+ if volume in caches:
+ print_line(b"volume", b"volume %s is a cache\n" % volume.encode("utf-8"))
+ continue
+
+ configure_volume_posix(volume, user, group, running_as_root)
+
+ all_caches_and_volumes = set(map(os.path.normpath, caches))
+ all_caches_and_volumes |= set(map(os.path.normpath, volumes))
+
+ def path_in_cache_or_volume(path):
+ path = os.path.normpath(path)
+
+ while path:
+ if path in all_caches_and_volumes:
+ return True
+
+ path, child = os.path.split(path)
+ if not child:
+ break
+
+ return False
+
+ def prepare_checkout_dir(checkout):
+ if not checkout:
+ return
+
+ # The checkout path becomes the working directory. Since there are
+ # special cache files in the cache's root directory and working
+ # directory purging could blow them away, disallow this scenario.
+ if os.path.exists(os.path.join(checkout, ".cacherequires")):
+ print("error: cannot perform vcs checkout into cache root: %s" % checkout)
+ sys.exit(1)
+
+ # TODO given the performance implications, consider making this a fatal
+ # error.
+ if not path_in_cache_or_volume(checkout):
+ print_line(
+ b"vcs",
+ b"WARNING: vcs checkout path (%s) not in cache "
+ b"or volume; performance will likely suffer\n"
+ % checkout.encode("utf-8"),
+ )
+
+ # Ensure the directory for the source checkout exists.
+ try:
+ os.makedirs(os.path.dirname(checkout))
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ # And that it is owned by the appropriate user/group.
+ if running_as_root:
+ os.chown(os.path.dirname(checkout), uid, gid)
+
+ def prepare_hg_store_path():
+ # And ensure the shared store path exists and has proper permissions.
+ if "HG_STORE_PATH" not in os.environ:
+ print("error: HG_STORE_PATH environment variable not set")
+ sys.exit(1)
+
+ store_path = os.environ["HG_STORE_PATH"]
+
+ if not path_in_cache_or_volume(store_path):
+ print_line(
+ b"vcs",
+ b"WARNING: HG_STORE_PATH (%s) not in cache or "
+ b"volume; performance will likely suffer\n"
+ % store_path.encode("utf-8"),
+ )
+
+ try:
+ os.makedirs(store_path)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ if running_as_root:
+ os.chown(store_path, uid, gid)
+
+ repository_paths = [
+ Path(repo["checkout"]) for repo in repositories if repo["checkout"]
+ ]
+ for repo in repositories:
+ if not repo["checkout"]:
+ continue
+ parents = Path(repo["checkout"]).parents
+ if any((path in repository_paths) for path in parents):
+ # Skip creating any checkouts that are inside other checokuts
+ continue
+ prepare_checkout_dir(repo["checkout"])
+
+ if any(repo["checkout"] and repo["repo-type"] == "hg" for repo in repositories):
+ prepare_hg_store_path()
+
+ if IS_POSIX and running_as_root:
+ # Drop permissions to requested user.
+ # This code is modeled after what `sudo` was observed to do in a Docker
+ # container. We do not bother calling setrlimit() because containers have
+ # their own limits.
+ print_line(
+ b"setup",
+ b"running as %s:%s\n"
+ % (args.user.encode("utf-8"), args.group.encode("utf-8")),
+ )
+
+ os.setgroups(gids)
+ os.umask(0o22)
+ os.setresgid(gid, gid, gid)
+ os.setresuid(uid, uid, uid)
+
+ for repo in repositories:
+ vcs_checkout_from_args(repo)
+
+ resource_process = None
+
+ try:
+ for k in ["MOZ_FETCHES_DIR", "UPLOAD_DIR"] + [
+ "{}_PATH".format(repository["project"].upper())
+ for repository in repositories
+ ]:
+ if k in os.environ:
+ os.environ[k] = os.path.abspath(os.environ[k])
+ print_line(
+ b"setup",
+ b"%s is %s\n" % (k.encode("utf-8"), os.environ[k].encode("utf-8")),
+ )
+
+ if "MOZ_FETCHES" in os.environ:
+ fetch_artifacts()
+
+ # Install Python requirements after fetches in case tasks want to use
+ # fetches to grab dependencies.
+ install_pip_requirements(repositories)
+
+ resource_process = maybe_run_resource_monitoring()
+
+ return run_command(b"task", task_args, cwd=args.task_cwd)
+ finally:
+ if resource_process:
+ print_line(b"resource_monitor", b"terminating\n")
+ if IS_WINDOWS:
+ # .terminate() on Windows is not a graceful shutdown, due to
+ # differences in signals. CTRL_BREAK_EVENT will work provided
+ # the subprocess is in a different process group, so this script
+ # isn't also killed.
+ os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT)
+ else:
+ resource_process.terminate()
+ resource_process.wait()
+ fetches_dir = os.environ.get("MOZ_FETCHES_DIR")
+ if fetches_dir and os.path.isdir(fetches_dir):
+ print_line(b"fetches", b"removing %s\n" % fetches_dir.encode("utf-8"))
+ remove(fetches_dir)
+ print_line(b"fetches", b"finished\n")
+
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))