diff options
Diffstat (limited to 'taskcluster/scripts/run-task')
-rwxr-xr-x | taskcluster/scripts/run-task | 1021 |
1 files changed, 1021 insertions, 0 deletions
diff --git a/taskcluster/scripts/run-task b/taskcluster/scripts/run-task new file mode 100755 index 0000000000..2f3f6460db --- /dev/null +++ b/taskcluster/scripts/run-task @@ -0,0 +1,1021 @@ +#!/usr/bin/python3 -u +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +"""Run a task after performing common actions. + +This script is meant to be the "driver" for TaskCluster based tasks. +It receives some common arguments to control the run-time environment. + +It performs actions as requested from the arguments. Then it executes +the requested process and prints its output, prefixing it with the +current time to improve log usefulness. +""" + +import sys + + +if sys.version_info[0:2] < (3, 5): + print('run-task requires Python 3.5+') + sys.exit(1) + + +import argparse +import datetime +import errno +import io +import json +import os +import random +import re +import shutil +import signal +import socket +import stat +import subprocess + +import urllib.error +import urllib.request + +from threading import Thread + +FINGERPRINT_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgfingerprint' +FALLBACK_FINGERPRINT = { + 'fingerprints': + "sha256:4D:EB:21:6E:35:2F:99:C6:8F:C3:47:9B:57:B8:6C:17:15:8F:86:09:D4:6C:17:1D:87:B0:DE:F9:0E:51:70:FC," + "sha256:90:85:39:A8:4F:47:20:58:98:0D:48:4D:8A:AC:71:DB:5C:AF:76:44:F1:B1:3E:56:92:FF:21:8C:C9:A9:F7:11" +} + +HGMOINTERNAL_CONFIG_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgmointernal' + +CACHE_UID_GID_MISMATCH = ''' +There is a UID/GID mismatch on the cache. This likely means: + +a) different tasks are running as a different user/group +b) different Docker images have different UID/GID for the same user/group + +Our cache policy is that the UID/GID for ALL tasks must be consistent +for the lifetime of the cache. This eliminates permissions problems due +to file/directory user/group ownership. + +To make this error go away, ensure that all Docker images are use +a consistent UID/GID and that all tasks using this cache are running as +the same user/group. +''' + + +NON_EMPTY_VOLUME = ''' +error: volume %s is not empty + +Our Docker image policy requires volumes to be empty. + +The volume was likely populated as part of building the Docker image. +Change the Dockerfile and anything run from it to not create files in +any VOLUME. + +A lesser possibility is that you stumbled upon a TaskCluster platform bug +where it fails to use new volumes for tasks. +''' + + +FETCH_CONTENT_NOT_FOUND = ''' +error: fetch-content script not found + +The script at `taskcluster/scripts/misc/fetch-content` could not be +detected in the current environment. + +If this task clones gecko, make sure the GECKO_PATH environment variable +is set to proper location. Otherwise, the script may need to be mounted +or added to the task's docker image then added to the PATH. +''' + +# The exit code to use when caches should be purged and the task retried. +# This is EX_OSFILE (from sysexits.h): +# Some system file does not exist, cannot be opened, or has some +# sort of error (e.g., syntax error). +EXIT_PURGE_CACHE = 72 + + +IS_MACOSX = sys.platform == 'darwin' +IS_POSIX = os.name == 'posix' +IS_WINDOWS = os.name == 'nt' + + +def print_line(prefix, m): + now = datetime.datetime.utcnow().isoformat().encode('utf-8') + # slice microseconds to 3 decimals. + now = now[:-3] if now[-7:-6] == b'.' else now + bytes = b'[%s %sZ] %s' % (prefix, now, m) + written = 0 + while written < len(bytes): + written += (sys.stdout.buffer.write(bytes[written:]) or 0) + sys.stdout.buffer.flush() + + +def run_and_prefix_output(prefix, args, *, extra_env=None, cwd=None): + """Runs a process and prefixes its output with the time. + + Returns the process exit code. + """ + print_line( + prefix, + b"executing %r%s\n" % (args, b"in %s" % (cwd.encode("utf-8"),) if cwd else b""), + ) + + env = dict(os.environ) + env.update(extra_env or {}) + + # Note: TaskCluster's stdin is a TTY. This attribute is lost + # when we pass sys.stdin to the invoked process. If we cared + # to preserve stdin as a TTY, we could make this work. But until + # someone needs it, don't bother. + + # We want stdout to be bytes on Python 3. That means we can't use + # universal_newlines=True (because it implies text mode). But + # p.stdout.readline() won't work for bytes text streams. So, on Python 3, + # we manually install a latin1 stream wrapper. This allows us to readline() + # and preserves bytes, without losing any data. + + p = subprocess.Popen(args, + # Disable buffering because we want to receive output + # as it is generated so timestamps in logs are + # accurate. + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin.fileno(), + env=env, + cwd=cwd) + + stdout = io.TextIOWrapper(p.stdout, encoding='latin1') + + while True: + data = stdout.readline().encode('latin1') + + if data == b'': + break + + print_line(prefix, data) + + return p.wait() + + +def get_posix_user_group(user, group): + import grp + import pwd + + try: + user_record = pwd.getpwnam(user) + except KeyError: + print('could not find user %s; specify a valid user with --user' % user) + sys.exit(1) + + try: + group_record = grp.getgrnam(group) + except KeyError: + print('could not find group %s; specify a valid group with --group' % + group) + sys.exit(1) + + # Most tasks use worker:worker. We require they have a specific numeric ID + # because otherwise it is too easy for files written to caches to have + # mismatched numeric IDs, which results in permissions errors. + if user_record.pw_name == 'worker' and user_record.pw_uid != 1000: + print('user `worker` must have uid=1000; got %d' % user_record.pw_uid) + sys.exit(1) + + if group_record.gr_name == 'worker' and group_record.gr_gid != 1000: + print('group `worker` must have gid=1000; got %d' % group_record.gr_gid) + sys.exit(1) + + # Find all groups to which this user is a member. + gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem] + + return user_record, group_record, gids + + +def write_audit_entry(path, msg): + now = datetime.datetime.utcnow().isoformat().encode('utf-8') + with open(path, 'ab') as fh: + fh.write(b'[%sZ %s] %s\n' % ( + now, os.environb.get(b'TASK_ID', b'UNKNOWN'), msg)) + + +WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR + + +def set_dir_permissions(path, uid, gid): + st = os.lstat(path) + + if st.st_uid != uid or st.st_gid != gid: + os.chown(path, uid, gid) + + # Also make sure dirs are writable in case we need to delete + # them. + if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE: + os.chmod(path, st.st_mode | WANTED_DIR_MODE) + + +def chown_recursive(path, user, group, uid, gid): + print_line(b'chown', + b'recursively changing ownership of %s to %s:%s\n' % + (path.encode('utf-8'), user.encode('utf-8'), group.encode( + 'utf-8'))) + + set_dir_permissions(path, uid, gid) + + for root, dirs, files in os.walk(path): + for d in dirs: + set_dir_permissions(os.path.join(root, d), uid, gid) + + for f in files: + # File may be a symlink that points to nowhere. In which case + # os.chown() would fail because it attempts to follow the + # symlink. We only care about directory entries, not what + # they point to. So setting the owner of the symlink should + # be sufficient. + os.lchown(os.path.join(root, f), uid, gid) + + +def configure_cache_posix(cache, user, group, + untrusted_caches, running_as_root): + """Configure a cache path on POSIX platforms. + + For each cache, we write out a special file denoting attributes and + capabilities of run-task and the task being executed. These attributes + are used by subsequent run-task invocations to validate that use of + the cache is acceptable. + + We /could/ blow away the cache data on requirements mismatch. + While this would be convenient, this could result in "competing" tasks + effectively undoing the other's work. This would slow down task + execution in aggregate. Without monitoring for this, people may not notice + the problem and tasks would be slower than they could be. We follow the + principle of "fail fast" to ensure optimal task execution. + + We also write an audit log of who used the caches. This log is printed + during failures to help aid debugging. + """ + + our_requirements = { + # Include a version string that we can bump whenever to trigger + # fresh caches. The actual value is not relevant and doesn't need + # to follow any explicit order. Since taskgraph bakes this file's + # hash into cache names, any change to this file/version is sufficient + # to force the use of a new cache. + b'version=1', + # Include the UID and GID the task will run as to ensure that tasks + # with different UID and GID don't share the same cache. + b'uid=%d' % user.pw_uid, + b'gid=%d' % group.gr_gid, + } + + requires_path = os.path.join(cache, '.cacherequires') + audit_path = os.path.join(cache, '.cachelog') + + # The cache is empty. Configure it. + if not os.listdir(cache): + print_line(b'cache', b'cache %s is empty; writing requirements: ' + b'%s\n' % ( + cache.encode('utf-8'), b' '.join(sorted(our_requirements)))) + + # We write a requirements file so future invocations know what the + # requirements are. + with open(requires_path, 'wb') as fh: + fh.write(b'\n'.join(sorted(our_requirements))) + + # And make it read-only as a precaution against deletion. + os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + + write_audit_entry(audit_path, + b'created; requirements: %s' % + b', '.join(sorted(our_requirements))) + + set_dir_permissions(cache, user.pw_uid, group.gr_gid) + return + + # The cache has content and we have a requirements file. Validate + # requirements alignment. + if os.path.exists(requires_path): + with open(requires_path, 'rb') as fh: + wanted_requirements = set(fh.read().splitlines()) + + print_line(b'cache', b'cache %s exists; requirements: %s\n' % ( + cache.encode('utf-8'), b' '.join(sorted(wanted_requirements)))) + + missing = wanted_requirements - our_requirements + + # Allow requirements mismatch for uid/gid if and only if caches + # are untrusted. This allows cache behavior on Try to be + # reasonable. Otherwise, random tasks could "poison" cache + # usability by introducing uid/gid mismatches. For untrusted + # environments like Try, this is a perfectly reasonable thing to + # allow. + if missing and untrusted_caches and running_as_root and \ + all(s.startswith((b'uid=', b'gid=')) for s in missing): + print_line(b'cache', + b'cache %s uid/gid mismatch; this is acceptable ' + b'because caches for this task are untrusted; ' + b'changing ownership to facilitate cache use\n' % + cache.encode('utf-8')) + chown_recursive(cache, user.pw_name, group.gr_name, user.pw_uid, + group.gr_gid) + + # And write out the updated reality. + with open(requires_path, 'wb') as fh: + fh.write(b'\n'.join(sorted(our_requirements))) + + write_audit_entry(audit_path, + b'chown; requirements: %s' % + b', '.join(sorted(our_requirements))) + + elif missing: + print('error: requirements for populated cache %s differ from ' + 'this task' % cache) + print('cache requirements: %s' % ' '.join(sorted( + s.decode('utf-8') for s in wanted_requirements))) + print('our requirements: %s' % ' '.join(sorted( + s.decode('utf-8') for s in our_requirements))) + if any(s.startswith((b'uid=', b'gid=')) for s in missing): + print(CACHE_UID_GID_MISMATCH) + + write_audit_entry(audit_path, + b'requirements mismatch; wanted: %s' % + b', '.join(sorted(our_requirements))) + + print('') + print('audit log:') + with open(audit_path, 'r') as fh: + print(fh.read()) + + return True + else: + write_audit_entry(audit_path, b'used') + + # We don't need to adjust permissions here because the cache is + # associated with a uid/gid and the first task should have set + # a proper owner/group. + + return + + # The cache has content and no requirements file. This shouldn't + # happen because run-task should be the first thing that touches a + # cache. + print('error: cache %s is not empty and is missing a ' + '.cacherequires file; the cache names for this task are ' + 'likely mis-configured or TASKCLUSTER_CACHES is not set ' + 'properly' % cache) + + write_audit_entry(audit_path, b'missing .cacherequires') + return True + + +def configure_volume_posix(volume, user, group, running_as_root): + # The only time we should see files in the volume is if the Docker + # image build put files there. + # + # For the sake of simplicity, our policy is that volumes should be + # empty. This also has the advantage that an empty volume looks + # a lot like an empty cache. Tasks can rely on caches being + # swapped in and out on any volume without any noticeable change + # of behavior. + volume_files = os.listdir(volume) + if volume_files: + print(NON_EMPTY_VOLUME % volume) + print('entries in root directory: %s' % + ' '.join(sorted(volume_files))) + sys.exit(1) + + # The volume is almost certainly owned by root:root. Chown it so it + # is writable. + + if running_as_root: + print_line(b'volume', b'changing ownership of volume %s ' + b'to %d:%d\n' % (volume.encode('utf-8'), + user.pw_uid, + group.gr_gid)) + set_dir_permissions(volume, user.pw_uid, group.gr_gid) + + +def vcs_checkout(source_repo, dest, store_path, + base_repo=None, revision=None, branch=None, + fetch_hgfingerprint=False, sparse_profile=None): + # Specify method to checkout a revision. This defaults to revisions as + # SHA-1 strings, but also supports symbolic revisions like `tip` via the + # branch flag. + if revision: + revision_flag = '--revision' + revision_value = revision + elif branch: + revision_flag = '--branch' + revision_value = branch + else: + print('revision is not specified for checkout') + sys.exit(1) + + if IS_MACOSX or IS_POSIX: + hg_bin = 'hg' + elif IS_WINDOWS: + # This is where OCC installs it in the AMIs. + hg_bin = r'C:\Program Files\Mercurial\hg.exe' + if not os.path.exists(hg_bin): + print('could not find Mercurial executable: %s' % hg_bin) + sys.exit(1) + + store_path = os.path.abspath(store_path) + args = [ + hg_bin, + 'robustcheckout', + '--sharebase', store_path, + '--purge', + ] + + # Obtain certificate fingerprints. Without this, the checkout will use the fingerprint + # on the system, which is managed some other way (such as puppet) + if fetch_hgfingerprint: + try: + print_line(b'vcs', b'fetching hg.mozilla.org fingerprint from %s\n' % + FINGERPRINT_URL.encode('utf-8')) + res = urllib.request.urlopen(FINGERPRINT_URL, timeout=10) + secret = res.read() + try: + secret = json.loads(secret.decode('utf-8')) + except ValueError: + print_line(b'vcs', b'invalid JSON in hg fingerprint secret') + sys.exit(1) + except (urllib.error.URLError, socket.timeout): + print_line(b'vcs', b'Unable to retrieve current hg.mozilla.org fingerprint' + b'using the secret service, using fallback instead.') + # XXX This fingerprint will not be accurate if running on an old + # revision after the server fingerprint has changed. + secret = {'secret': FALLBACK_FINGERPRINT} + + hgmo_fingerprint = secret['secret']['fingerprints'] + args.extend([ + '--config', 'hostsecurity.hg.mozilla.org:fingerprints=%s' % hgmo_fingerprint, + ]) + + if base_repo: + args.extend(['--upstream', base_repo]) + if sparse_profile: + args.extend(['--sparseprofile', sparse_profile]) + + dest = os.path.abspath(dest) + args.extend([ + revision_flag, revision_value, + source_repo, dest, + ]) + + res = run_and_prefix_output(b'vcs', args, + extra_env={'PYTHONUNBUFFERED': '1'}) + if res: + # Mitigation for bug 1539681: if for some reason the clone failed, + # we just remove it, so that its possible incomplete state doesn't + # interfere with cloning in subsequent tasks. + shutil.rmtree(dest, ignore_errors=True) + sys.exit(res) + + # Update the current revision hash and ensure that it is well formed. + revision = subprocess.check_output( + [hg_bin, 'log', + '--rev', '.', + '--template', '{node}'], + cwd=dest, + # Triggers text mode on Python 3. + universal_newlines=True) + + assert re.match('^[a-f0-9]{40}$', revision) + + msg = ("TinderboxPrint:<a href={source_repo}/rev/{revision} " + "title='Built from {repo_name} revision {revision}'>" + "{revision}</a>\n".format(revision=revision, + source_repo=source_repo, + repo_name=source_repo.split('/')[-1])) + + print_line(b'vcs', msg.encode('utf-8')) + + return revision + + +def fetch_artifacts(): + print_line(b'fetches', b'fetching artifacts\n') + + fetch_content = shutil.which('fetch-content') + if not fetch_content and os.environ.get('GECKO_PATH'): + fetch_content = os.path.join(os.environ['GECKO_PATH'], 'taskcluster', + 'scripts', 'misc', 'fetch-content') + + if not fetch_content or not os.path.isfile(fetch_content): + fetch_content = os.path.join(os.path.dirname(__file__), + 'fetch-content') + + if not os.path.isfile(fetch_content): + print(FETCH_CONTENT_NOT_FOUND) + sys.exit(1) + + cmd = [sys.executable, '-u', fetch_content, 'task-artifacts'] + res = run_and_prefix_output(b'fetches', cmd) + if res: + sys.exit(res) + + print_line(b'fetches', b'finished fetching artifacts\n') + + +def add_vcs_arguments(parser, project, name): + """Adds arguments to ArgumentParser to control VCS options for a project.""" + + parser.add_argument('--%s-checkout' % project, + help='Directory where %s checkout should be created' % + name) + parser.add_argument('--%s-sparse-profile' % project, + help='Path to sparse profile for %s checkout' % name) + + +def resolve_checkout_url(base_repo, head_repo): + """Resolve the Mercurial URL to perform a checkout against, either the + public hg.mozilla.org service or a CI-only regional mirror. + + The config will be of the form: + { + "aws/us-west-2": { # key built from `TASKCLUSTER_WORKER_LOCATION` variable + "rate": 0.5, + "domain": "us-west-2.hgmointernal.net" + }, + "google/us-central1": {...} + } + """ + worker_location = os.getenv('TASKCLUSTER_WORKER_LOCATION') + if not worker_location: + print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION environment variable not set; ' + b'using public hg.mozilla.org service\n') + return base_repo, head_repo + + try: + worker_location = json.loads(worker_location) + except json.JSONDecodeError: + print_line(b'vcs', b'Could not decode TASKCLUSTER_WORKER_LOCATION environment variable ' + b'as JSON. Content: %s\n' % worker_location.encode('utf-8')) + print_line(b'vcs', b'using public hg.mozilla.org service\n') + return base_repo, head_repo + + if 'cloud' not in worker_location or 'region' not in worker_location: + print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION missing required keys; ' + b'using public hg.mozilla.org service\n') + return base_repo, head_repo + + config_key = '%(cloud)s/%(region)s' % worker_location + + try: + print_line(b'vcs', b'fetching hgmointernal config from %s\n' % + HGMOINTERNAL_CONFIG_URL.encode('utf-8')) + + # Get the hgmointernal config Taskcluster secret + res = urllib.request.urlopen(HGMOINTERNAL_CONFIG_URL, timeout=10) + hgmointernal_config = json.loads(res.read().decode('utf-8'))['secret'] + + # Use public hg service if region not yet supported + if config_key not in hgmointernal_config: + print_line(b'vcs', b'region %s not yet supported; using public ' + b'hg.mozilla.org service\n' % config_key.encode('utf-8')) + + return base_repo, head_repo + + # Only send a percentage of traffic to the internal mirror + rate = float(hgmointernal_config[config_key]['rate']) + + if random.random() > rate: + print_line(b'vcs', b'hgmointernal rate miss; using ' + b'public hg.mozilla.org service\n') + return base_repo, head_repo + + print_line(b'vcs', b'hgmointernal rate hit; cloning from ' + b'private hgweb mirror\n') + + mirror_domain = hgmointernal_config[config_key]['domain'] + + if base_repo and base_repo.startswith('https://hg.mozilla.org'): + base_repo = base_repo.replace('hg.mozilla.org', mirror_domain, 1) + + if head_repo and head_repo.startswith('https://hg.mozilla.org'): + head_repo = head_repo.replace('hg.mozilla.org', mirror_domain, 1) + + return base_repo, head_repo + + except (KeyError, ValueError): + print_line(b'vcs', b'invalid JSON in hgmointernal config; ' + b'falling back to public hg.mozilla.org service\n') + + except (urllib.error.URLError, socket.timeout): + print_line(b'vcs', b'Unable to retrieve hgmointernal config using ' + b'the secret service; falling back to public hg.mozilla.org ' + b'service\n') + + return base_repo, head_repo + + +def collect_vcs_options(args, project): + checkout = getattr(args, '%s_checkout' % project) + sparse_profile = getattr(args, '%s_sparse_profile' % project) + + env_prefix = project.upper() + + base_repo = os.environ.get('%s_BASE_REPOSITORY' % env_prefix) + head_repo = os.environ.get('%s_HEAD_REPOSITORY' % env_prefix) + revision = os.environ.get('%s_HEAD_REV' % env_prefix) + branch = os.environ.get('%s_HEAD_REF' % env_prefix) + + store_path = os.environ.get('HG_STORE_PATH') + + # Expand ~ in some paths. + if checkout: + checkout = os.path.expanduser(checkout) + if store_path: + store_path = os.path.expanduser(store_path) + + # Some callers set the base repository to mozilla-central for historical + # reasons. Switch to mozilla-unified because robustcheckout works best + # with it. + if base_repo == 'https://hg.mozilla.org/mozilla-central': + base_repo = 'https://hg.mozilla.org/mozilla-unified' + + # No need to check the hgmointernal config if we aren't performing + # a checkout. + if checkout: + base_repo, head_repo = resolve_checkout_url(base_repo, head_repo) + + return { + 'store-path': store_path, + 'project': project, + 'env-prefix': env_prefix, + 'checkout': checkout, + 'sparse-profile': sparse_profile, + 'base-repo': base_repo, + 'head-repo': head_repo, + 'revision': revision, + 'branch': branch, + } + + +def vcs_checkout_from_args(args, project): + options = collect_vcs_options(args, project) + + if not options['checkout']: + if options['branch'] and not options['revision']: + print('task should be defined in terms of non-symbolic revision') + sys.exit(1) + return + + os.environ['%s_HEAD_REV' % options['env-prefix']] = vcs_checkout( + options['head-repo'], + options['checkout'], + options['store-path'], + base_repo=options['base-repo'], + revision=options['revision'], + fetch_hgfingerprint=args.fetch_hgfingerprint, + branch=options['branch'], + sparse_profile=options['sparse-profile']) + + +def maybe_run_resource_monitoring(): + """Run the resource monitor if available. + + Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160 + and https://bugzil.la/1648051 + """ + if 'MOZ_FETCHES' not in os.environ: + return + if 'RESOURCE_MONITOR_OUTPUT' not in os.environ: + return + + prefix = b'resource_monitor' + + executable = '{}/resource-monitor/resource-monitor{}'.format( + os.environ.get('MOZ_FETCHES_DIR'), '.exe' if IS_WINDOWS else '') + + if not os.path.exists(executable) or not os.access(executable, os.X_OK): + print_line(prefix, b"%s not executable\n" % executable.encode('utf-8')) + return + args = [ + executable, + '-process', + str(os.getpid()), + '-output', + os.environ["RESOURCE_MONITOR_OUTPUT"], + ] + print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode('utf-8')) + # Avoid environment variables the payload doesn't need. + del os.environ['RESOURCE_MONITOR_OUTPUT'] + + # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too. + process = subprocess.Popen(args, + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0, + cwd=os.getcwd()) + + def capture_output(): + fh = io.TextIOWrapper(process.stdout, encoding='latin1') + while True: + data = fh.readline().encode('latin1') + if data == b'': + break + print_line(prefix, data) + + monitor_process = Thread(target=capture_output) + monitor_process.start() + return process + + +def main(args): + print_line(b'setup', b'run-task started in %s\n' % os.getcwd().encode('utf-8')) + running_as_root = IS_POSIX and os.getuid() == 0 + + # Set a reasonable limit to the number of open files. + # Running under docker inherits the system defaults, which are not subject + # to the "standard" limits set by pam_limits.so, and while they work well + # for servers that may receive a lot of connections, they cause performance + # problems for things that close file descriptors before forking (for good + # reasons), like python's `subprocess.Popen(..., close_fds=True)` (and while + # the default was close_fds=False in python2, that changed in python3). + # In some cases, Firefox does the same thing when spawning subprocesses. + # Processes spawned by this one will inherit the limit set here. + try: + import resource + # Keep the hard limit the same, though, allowing processes to change their + # soft limit if they need to (Firefox does, for instance). + (soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE) + limit = os.environ.get('MOZ_LIMIT_NOFILE') + if limit: + limit = int(limit) + else: + # If no explicit limit is given, use 1024 if it's less than the current + # soft limit. For instance, the default on macOS is 256, so we'd pick + # that rather than 1024. + limit = min(soft, 1024) + # Now apply the limit, if it's different from the original one. + if limit != soft: + resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard)) + except ImportError: + # The resource module is UNIX only. + pass + + # Arguments up to '--' are ours. After are for the main task + # to be executed. + try: + i = args.index('--') + our_args = args[0:i] + task_args = args[i + 1:] + except ValueError: + our_args = args + task_args = [] + + parser = argparse.ArgumentParser() + parser.add_argument('--user', default='worker', help='user to run as') + parser.add_argument('--group', default='worker', help='group to run as') + parser.add_argument('--task-cwd', help='directory to run the provided command in') + + add_vcs_arguments(parser, 'gecko', 'Firefox') + add_vcs_arguments(parser, 'comm', 'Comm') + + parser.add_argument('--fetch-hgfingerprint', action='store_true', + help='Fetch the latest hgfingerprint from the secrets store, ' + 'using the taskclsuerProxy') + + args = parser.parse_args(our_args) + + uid = gid = gids = None + if IS_POSIX and running_as_root: + user, group, gids = get_posix_user_group(args.user, args.group) + uid = user.pw_uid + gid = group.gr_gid + + if running_as_root and os.path.exists("/dev/kvm"): + # Ensure kvm permissions for worker, required for Android x86 + st = os.stat("/dev/kvm") + os.chmod("/dev/kvm", st.st_mode | 0o666) + + # Validate caches. + # + # Taskgraph should pass in a list of paths that are caches via an + # environment variable (which we don't want to pass down to child + # processes). + + if 'TASKCLUSTER_CACHES' in os.environ: + caches = os.environ['TASKCLUSTER_CACHES'].split(';') + del os.environ['TASKCLUSTER_CACHES'] + else: + caches = [] + + if 'TASKCLUSTER_UNTRUSTED_CACHES' in os.environ: + untrusted_caches = True + del os.environ['TASKCLUSTER_UNTRUSTED_CACHES'] + else: + untrusted_caches = False + + for cache in caches: + if not os.path.isdir(cache): + print('error: cache %s is not a directory; this should never ' + 'happen' % cache) + return 1 + + if running_as_root: + purge = configure_cache_posix(cache, user, group, untrusted_caches, + running_as_root) + + if purge: + return EXIT_PURGE_CACHE + + if 'TASKCLUSTER_VOLUMES' in os.environ: + volumes = os.environ['TASKCLUSTER_VOLUMES'].split(';') + del os.environ['TASKCLUSTER_VOLUMES'] + else: + volumes = [] + + if volumes and not IS_POSIX: + print('assertion failed: volumes not expected on Windows') + return 1 + + # Sanitize volumes. + for volume in volumes: + # If a volume is a cache, it was dealt with above. + if volume in caches: + print_line(b'volume', b'volume %s is a cache\n' % + volume.encode('utf-8')) + continue + + if running_as_root: + configure_volume_posix(volume, user, group, running_as_root) + + all_caches_and_volumes = set(map(os.path.normpath, caches)) + all_caches_and_volumes |= set(map(os.path.normpath, volumes)) + + def path_in_cache_or_volume(path): + path = os.path.normpath(path) + + while path: + if path in all_caches_and_volumes: + return True + + path, child = os.path.split(path) + if not child: + break + + return False + + def prepare_checkout_dir(checkout): + if not checkout: + return + + # The checkout path becomes the working directory. Since there are + # special cache files in the cache's root directory and working + # directory purging could blow them away, disallow this scenario. + if os.path.exists(os.path.join(checkout, '.cacherequires')): + print('error: cannot perform vcs checkout into cache root: %s' % + checkout) + sys.exit(1) + + # TODO given the performance implications, consider making this a fatal + # error. + if not path_in_cache_or_volume(checkout): + print_line(b'vcs', b'WARNING: vcs checkout path (%s) not in cache ' + b'or volume; performance will likely suffer\n' % + checkout.encode('utf-8')) + + # Ensure the directory for the source checkout exists. + try: + os.makedirs(os.path.dirname(checkout)) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + # And that it is owned by the appropriate user/group. + if running_as_root: + os.chown(os.path.dirname(checkout), uid, gid) + + def prepare_hg_store_path(): + # And ensure the shared store path exists and has proper permissions. + if 'HG_STORE_PATH' not in os.environ: + print('error: HG_STORE_PATH environment variable not set') + sys.exit(1) + + store_path = os.environ['HG_STORE_PATH'] + + if not path_in_cache_or_volume(store_path): + print_line(b'vcs', b'WARNING: HG_STORE_PATH (%s) not in cache or ' + b'volume; performance will likely suffer\n' % + store_path.encode('utf-8')) + + try: + os.makedirs(store_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + if running_as_root: + os.chown(store_path, uid, gid) + + prepare_checkout_dir(args.gecko_checkout) + if args.gecko_checkout or args.comm_checkout: + prepare_hg_store_path() + + if IS_POSIX and running_as_root: + # Drop permissions to requested user. + # This code is modeled after what `sudo` was observed to do in a Docker + # container. We do not bother calling setrlimit() because containers have + # their own limits. + print_line(b'setup', b'running as %s:%s\n' % ( + args.user.encode('utf-8'), args.group.encode('utf-8'))) + + os.setgroups(gids) + os.umask(0o22) + os.setresgid(gid, gid, gid) + os.setresuid(uid, uid, uid) + + vcs_checkout_from_args(args, 'gecko') + vcs_checkout_from_args(args, 'comm') + + resource_process = None + + try: + for k in ('GECKO_PATH', 'MOZ_FETCHES_DIR', 'UPLOAD_DIR', 'MOZ_PYTHON_HOME'): + if k in os.environ: + # Normalize paths to use forward slashes. Some shell scripts + # tolerate that better on Windows. + os.environ[k] = os.path.abspath(os.environ[k]).replace(os.sep, '/') + print_line(b'setup', b'%s is %s\n' % ( + k.encode('utf-8'), + os.environ[k].encode('utf-8'))) + + if 'MOZ_FETCHES' in os.environ: + fetch_artifacts() + + # If Python is a fetch dependency, add it to the PATH and setting + # the mozilla-specific MOZ_PYTHON_HOME to relocate binaries. + if 'MOZ_PYTHON_HOME' in os.environ: + + print_line(b'setup', + b'Setting up local python environment\n') + prev = [os.environ['PATH']] if 'PATH' in os.environ else [] + + moz_python_home = os.environ['MOZ_PYTHON_HOME'] + if IS_WINDOWS: + ext = '.exe' + moz_python_bindir = moz_python_home + else: + ext = '' + moz_python_bindir = moz_python_home + '/bin' + + + new = os.environ['PATH'] = os.pathsep.join([moz_python_bindir] + + prev) + + # Relocate the python binary. Standard way uses PYTHONHOME, but + # this conflicts with system python (e.g. used by hg) so we + # maintain a small patch to use MOZPYTHONHOME instead. + os.environ['MOZPYTHONHOME'] = moz_python_home + + pyinterp = os.path.join(moz_python_bindir, f'python3{ext}') + # just a sanity check + if not os.path.exists(pyinterp): + raise RuntimeError("Inconsistent Python installation: " + "archive found, but no python3 binary " + "detected") + + if IS_MACOSX: + # On OSX, we may not have access to the system certificate, + # so use the certifi ones. + certifi_cert_file = subprocess.check_output( + [pyinterp, '-c', + 'import certifi; print(certifi.where())'], + text=True + ) + os.environ['SSL_CERT_FILE'] = certifi_cert_file.strip() + print_line(b'setup', + b'patching ssl certificate\n') + + print_line(b'setup', + b'updated PATH with python artifact: ' + + new.encode() + b'\n') + + + resource_process = maybe_run_resource_monitoring() + + return run_and_prefix_output(b'task', task_args, cwd=args.task_cwd) + finally: + if resource_process: + print_line(b'resource_monitor', b'terminating\n') + if IS_WINDOWS: + # .terminate() on Windows is not a graceful shutdown, due to + # differences in signals. CTRL_BREAK_EVENT will work provided + # the subprocess is in a different process group, so this script + # isn't also killed. + os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT) + else: + resource_process.terminate() + resource_process.wait() + + +if __name__ == '__main__': + sys.exit(main(sys.argv[1:])) |