summaryrefslogtreecommitdiffstats
path: root/taskcluster/scripts/run-task
diff options
context:
space:
mode:
Diffstat (limited to 'taskcluster/scripts/run-task')
-rwxr-xr-xtaskcluster/scripts/run-task1005
1 files changed, 1005 insertions, 0 deletions
diff --git a/taskcluster/scripts/run-task b/taskcluster/scripts/run-task
new file mode 100755
index 0000000000..fb0d32f8f5
--- /dev/null
+++ b/taskcluster/scripts/run-task
@@ -0,0 +1,1005 @@
+#!/usr/bin/python3 -u
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""Run a task after performing common actions.
+
+This script is meant to be the "driver" for TaskCluster based tasks.
+It receives some common arguments to control the run-time environment.
+
+It performs actions as requested from the arguments. Then it executes
+the requested process and prints its output, prefixing it with the
+current time to improve log usefulness.
+"""
+
+import sys
+
+
+if sys.version_info[0:2] < (3, 5):
+ print('run-task requires Python 3.5+')
+ sys.exit(1)
+
+
+import argparse
+import datetime
+import errno
+import io
+import json
+import os
+import random
+import re
+import shutil
+import signal
+import socket
+import stat
+import subprocess
+
+import urllib.error
+import urllib.request
+
+from threading import Thread
+
+FINGERPRINT_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgfingerprint'
+FALLBACK_FINGERPRINT = {
+ 'fingerprints':
+ "sha256:4D:EB:21:6E:35:2F:99:C6:8F:C3:47:9B:57:B8:6C:17:15:8F:86:09:D4:6C:17:1D:87:B0:DE:F9:0E:51:70:FC,"
+ "sha256:90:85:39:A8:4F:47:20:58:98:0D:48:4D:8A:AC:71:DB:5C:AF:76:44:F1:B1:3E:56:92:FF:21:8C:C9:A9:F7:11"
+}
+
+HGMOINTERNAL_CONFIG_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgmointernal'
+
+CACHE_UID_GID_MISMATCH = '''
+There is a UID/GID mismatch on the cache. This likely means:
+
+a) different tasks are running as a different user/group
+b) different Docker images have different UID/GID for the same user/group
+
+Our cache policy is that the UID/GID for ALL tasks must be consistent
+for the lifetime of the cache. This eliminates permissions problems due
+to file/directory user/group ownership.
+
+To make this error go away, ensure that all Docker images are use
+a consistent UID/GID and that all tasks using this cache are running as
+the same user/group.
+'''
+
+
+NON_EMPTY_VOLUME = '''
+error: volume %s is not empty
+
+Our Docker image policy requires volumes to be empty.
+
+The volume was likely populated as part of building the Docker image.
+Change the Dockerfile and anything run from it to not create files in
+any VOLUME.
+
+A lesser possibility is that you stumbled upon a TaskCluster platform bug
+where it fails to use new volumes for tasks.
+'''
+
+
+FETCH_CONTENT_NOT_FOUND = '''
+error: fetch-content script not found
+
+The script at `taskcluster/scripts/misc/fetch-content` could not be
+detected in the current environment.
+
+If this task clones gecko, make sure the GECKO_PATH environment variable
+is set to proper location. Otherwise, the script may need to be mounted
+or added to the task's docker image then added to the PATH.
+'''
+
+# The exit code to use when caches should be purged and the task retried.
+# This is EX_OSFILE (from sysexits.h):
+# Some system file does not exist, cannot be opened, or has some
+# sort of error (e.g., syntax error).
+EXIT_PURGE_CACHE = 72
+
+
+IS_MACOSX = sys.platform == 'darwin'
+IS_POSIX = os.name == 'posix'
+IS_WINDOWS = os.name == 'nt'
+
+
+def print_line(prefix, m):
+ now = datetime.datetime.utcnow().isoformat().encode('utf-8')
+ # slice microseconds to 3 decimals.
+ now = now[:-3] if now[-7:-6] == b'.' else now
+ bytes = b'[%s %sZ] %s' % (prefix, now, m)
+ written = 0
+ while written < len(bytes):
+ written += (sys.stdout.buffer.write(bytes[written:]) or 0)
+ sys.stdout.buffer.flush()
+
+
+def run_and_prefix_output(prefix, args, *, extra_env=None, cwd=None):
+ """Runs a process and prefixes its output with the time.
+
+ Returns the process exit code.
+ """
+ print_line(
+ prefix,
+ b"executing %r%s\n" % (args, b"in %s" % (cwd.encode("utf-8"),) if cwd else b""),
+ )
+
+ env = dict(os.environ)
+ env.update(extra_env or {})
+
+ # Note: TaskCluster's stdin is a TTY. This attribute is lost
+ # when we pass sys.stdin to the invoked process. If we cared
+ # to preserve stdin as a TTY, we could make this work. But until
+ # someone needs it, don't bother.
+
+ # We want stdout to be bytes on Python 3. That means we can't use
+ # universal_newlines=True (because it implies text mode). But
+ # p.stdout.readline() won't work for bytes text streams. So, on Python 3,
+ # we manually install a latin1 stream wrapper. This allows us to readline()
+ # and preserves bytes, without losing any data.
+
+ p = subprocess.Popen(args,
+ # Disable buffering because we want to receive output
+ # as it is generated so timestamps in logs are
+ # accurate.
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=sys.stdin.fileno(),
+ env=env,
+ cwd=cwd)
+
+ stdout = io.TextIOWrapper(p.stdout, encoding='latin1')
+
+ while True:
+ data = stdout.readline().encode('latin1')
+
+ if data == b'':
+ break
+
+ print_line(prefix, data)
+
+ return p.wait()
+
+
+def get_posix_user_group(user, group):
+ import grp
+ import pwd
+
+ try:
+ user_record = pwd.getpwnam(user)
+ except KeyError:
+ print('could not find user %s; specify a valid user with --user' % user)
+ sys.exit(1)
+
+ try:
+ group_record = grp.getgrnam(group)
+ except KeyError:
+ print('could not find group %s; specify a valid group with --group' %
+ group)
+ sys.exit(1)
+
+ # Most tasks use worker:worker. We require they have a specific numeric ID
+ # because otherwise it is too easy for files written to caches to have
+ # mismatched numeric IDs, which results in permissions errors.
+ if user_record.pw_name == 'worker' and user_record.pw_uid != 1000:
+ print('user `worker` must have uid=1000; got %d' % user_record.pw_uid)
+ sys.exit(1)
+
+ if group_record.gr_name == 'worker' and group_record.gr_gid != 1000:
+ print('group `worker` must have gid=1000; got %d' % group_record.gr_gid)
+ sys.exit(1)
+
+ # Find all groups to which this user is a member.
+ gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem]
+
+ return user_record, group_record, gids
+
+
+def write_audit_entry(path, msg):
+ now = datetime.datetime.utcnow().isoformat().encode('utf-8')
+ with open(path, 'ab') as fh:
+ fh.write(b'[%sZ %s] %s\n' % (
+ now, os.environb.get(b'TASK_ID', b'UNKNOWN'), msg))
+
+
+WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR
+
+
+def set_dir_permissions(path, uid, gid):
+ st = os.lstat(path)
+
+ if st.st_uid != uid or st.st_gid != gid:
+ os.chown(path, uid, gid)
+
+ # Also make sure dirs are writable in case we need to delete
+ # them.
+ if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE:
+ os.chmod(path, st.st_mode | WANTED_DIR_MODE)
+
+
+def chown_recursive(path, user, group, uid, gid):
+ print_line(b'chown',
+ b'recursively changing ownership of %s to %s:%s\n' %
+ (path.encode('utf-8'), user.encode('utf-8'), group.encode(
+ 'utf-8')))
+
+ set_dir_permissions(path, uid, gid)
+
+ for root, dirs, files in os.walk(path):
+ for d in dirs:
+ set_dir_permissions(os.path.join(root, d), uid, gid)
+
+ for f in files:
+ # File may be a symlink that points to nowhere. In which case
+ # os.chown() would fail because it attempts to follow the
+ # symlink. We only care about directory entries, not what
+ # they point to. So setting the owner of the symlink should
+ # be sufficient.
+ os.lchown(os.path.join(root, f), uid, gid)
+
+
+def configure_cache_posix(cache, user, group,
+ untrusted_caches, running_as_root):
+ """Configure a cache path on POSIX platforms.
+
+ For each cache, we write out a special file denoting attributes and
+ capabilities of run-task and the task being executed. These attributes
+ are used by subsequent run-task invocations to validate that use of
+ the cache is acceptable.
+
+ We /could/ blow away the cache data on requirements mismatch.
+ While this would be convenient, this could result in "competing" tasks
+ effectively undoing the other's work. This would slow down task
+ execution in aggregate. Without monitoring for this, people may not notice
+ the problem and tasks would be slower than they could be. We follow the
+ principle of "fail fast" to ensure optimal task execution.
+
+ We also write an audit log of who used the caches. This log is printed
+ during failures to help aid debugging.
+ """
+
+ our_requirements = {
+ # Include a version string that we can bump whenever to trigger
+ # fresh caches. The actual value is not relevant and doesn't need
+ # to follow any explicit order. Since taskgraph bakes this file's
+ # hash into cache names, any change to this file/version is sufficient
+ # to force the use of a new cache.
+ b'version=1',
+ # Include the UID and GID the task will run as to ensure that tasks
+ # with different UID and GID don't share the same cache.
+ b'uid=%d' % user.pw_uid,
+ b'gid=%d' % group.gr_gid,
+ }
+
+ requires_path = os.path.join(cache, '.cacherequires')
+ audit_path = os.path.join(cache, '.cachelog')
+
+ # The cache is empty. Configure it.
+ if not os.listdir(cache):
+ print_line(b'cache', b'cache %s is empty; writing requirements: '
+ b'%s\n' % (
+ cache.encode('utf-8'), b' '.join(sorted(our_requirements))))
+
+ # We write a requirements file so future invocations know what the
+ # requirements are.
+ with open(requires_path, 'wb') as fh:
+ fh.write(b'\n'.join(sorted(our_requirements)))
+
+ # And make it read-only as a precaution against deletion.
+ os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+
+ write_audit_entry(audit_path,
+ b'created; requirements: %s' %
+ b', '.join(sorted(our_requirements)))
+
+ set_dir_permissions(cache, user.pw_uid, group.gr_gid)
+ return
+
+ # The cache has content and we have a requirements file. Validate
+ # requirements alignment.
+ if os.path.exists(requires_path):
+ with open(requires_path, 'rb') as fh:
+ wanted_requirements = set(fh.read().splitlines())
+
+ print_line(b'cache', b'cache %s exists; requirements: %s\n' % (
+ cache.encode('utf-8'), b' '.join(sorted(wanted_requirements))))
+
+ missing = wanted_requirements - our_requirements
+
+ # Allow requirements mismatch for uid/gid if and only if caches
+ # are untrusted. This allows cache behavior on Try to be
+ # reasonable. Otherwise, random tasks could "poison" cache
+ # usability by introducing uid/gid mismatches. For untrusted
+ # environments like Try, this is a perfectly reasonable thing to
+ # allow.
+ if missing and untrusted_caches and running_as_root and \
+ all(s.startswith((b'uid=', b'gid=')) for s in missing):
+ print_line(b'cache',
+ b'cache %s uid/gid mismatch; this is acceptable '
+ b'because caches for this task are untrusted; '
+ b'changing ownership to facilitate cache use\n' %
+ cache.encode('utf-8'))
+ chown_recursive(cache, user.pw_name, group.gr_name, user.pw_uid,
+ group.gr_gid)
+
+ # And write out the updated reality.
+ with open(requires_path, 'wb') as fh:
+ fh.write(b'\n'.join(sorted(our_requirements)))
+
+ write_audit_entry(audit_path,
+ b'chown; requirements: %s' %
+ b', '.join(sorted(our_requirements)))
+
+ elif missing:
+ print('error: requirements for populated cache %s differ from '
+ 'this task' % cache)
+ print('cache requirements: %s' % ' '.join(sorted(
+ s.decode('utf-8') for s in wanted_requirements)))
+ print('our requirements: %s' % ' '.join(sorted(
+ s.decode('utf-8') for s in our_requirements)))
+ if any(s.startswith((b'uid=', b'gid=')) for s in missing):
+ print(CACHE_UID_GID_MISMATCH)
+
+ write_audit_entry(audit_path,
+ b'requirements mismatch; wanted: %s' %
+ b', '.join(sorted(our_requirements)))
+
+ print('')
+ print('audit log:')
+ with open(audit_path, 'r') as fh:
+ print(fh.read())
+
+ return True
+ else:
+ write_audit_entry(audit_path, b'used')
+
+ # We don't need to adjust permissions here because the cache is
+ # associated with a uid/gid and the first task should have set
+ # a proper owner/group.
+
+ return
+
+ # The cache has content and no requirements file. This shouldn't
+ # happen because run-task should be the first thing that touches a
+ # cache.
+ print('error: cache %s is not empty and is missing a '
+ '.cacherequires file; the cache names for this task are '
+ 'likely mis-configured or TASKCLUSTER_CACHES is not set '
+ 'properly' % cache)
+
+ write_audit_entry(audit_path, b'missing .cacherequires')
+ return True
+
+
+def configure_volume_posix(volume, user, group, running_as_root):
+ # The only time we should see files in the volume is if the Docker
+ # image build put files there.
+ #
+ # For the sake of simplicity, our policy is that volumes should be
+ # empty. This also has the advantage that an empty volume looks
+ # a lot like an empty cache. Tasks can rely on caches being
+ # swapped in and out on any volume without any noticeable change
+ # of behavior.
+ volume_files = os.listdir(volume)
+ if volume_files:
+ print(NON_EMPTY_VOLUME % volume)
+ print('entries in root directory: %s' %
+ ' '.join(sorted(volume_files)))
+ sys.exit(1)
+
+ # The volume is almost certainly owned by root:root. Chown it so it
+ # is writable.
+
+ if running_as_root:
+ print_line(b'volume', b'changing ownership of volume %s '
+ b'to %d:%d\n' % (volume.encode('utf-8'),
+ user.pw_uid,
+ group.gr_gid))
+ set_dir_permissions(volume, user.pw_uid, group.gr_gid)
+
+
+def vcs_checkout(source_repo, dest, store_path,
+ base_repo=None, revision=None, branch=None,
+ fetch_hgfingerprint=False, sparse_profile=None):
+ # Specify method to checkout a revision. This defaults to revisions as
+ # SHA-1 strings, but also supports symbolic revisions like `tip` via the
+ # branch flag.
+ if revision:
+ revision_flag = '--revision'
+ revision_value = revision
+ elif branch:
+ revision_flag = '--branch'
+ revision_value = branch
+ else:
+ print('revision is not specified for checkout')
+ sys.exit(1)
+
+ if IS_MACOSX or IS_POSIX:
+ hg_bin = 'hg'
+ elif IS_WINDOWS:
+ # This is where OCC installs it in the AMIs.
+ hg_bin = r'C:\Program Files\Mercurial\hg.exe'
+ if not os.path.exists(hg_bin):
+ print('could not find Mercurial executable: %s' % hg_bin)
+ sys.exit(1)
+
+ store_path = os.path.abspath(store_path)
+ args = [
+ hg_bin,
+ 'robustcheckout',
+ '--sharebase', store_path,
+ '--purge',
+ ]
+
+ # Obtain certificate fingerprints. Without this, the checkout will use the fingerprint
+ # on the system, which is managed some other way (such as puppet)
+ if fetch_hgfingerprint:
+ try:
+ print_line(b'vcs', b'fetching hg.mozilla.org fingerprint from %s\n' %
+ FINGERPRINT_URL.encode('utf-8'))
+ res = urllib.request.urlopen(FINGERPRINT_URL, timeout=10)
+ secret = res.read()
+ try:
+ secret = json.loads(secret.decode('utf-8'))
+ except ValueError:
+ print_line(b'vcs', b'invalid JSON in hg fingerprint secret')
+ sys.exit(1)
+ except (urllib.error.URLError, socket.timeout):
+ print_line(b'vcs', b'Unable to retrieve current hg.mozilla.org fingerprint'
+ b'using the secret service, using fallback instead.')
+ # XXX This fingerprint will not be accurate if running on an old
+ # revision after the server fingerprint has changed.
+ secret = {'secret': FALLBACK_FINGERPRINT}
+
+ hgmo_fingerprint = secret['secret']['fingerprints']
+ args.extend([
+ '--config', 'hostsecurity.hg.mozilla.org:fingerprints=%s' % hgmo_fingerprint,
+ ])
+
+ if base_repo:
+ args.extend(['--upstream', base_repo])
+ if sparse_profile:
+ args.extend(['--sparseprofile', sparse_profile])
+
+ dest = os.path.abspath(dest)
+ args.extend([
+ revision_flag, revision_value,
+ source_repo, dest,
+ ])
+
+ res = run_and_prefix_output(b'vcs', args,
+ extra_env={'PYTHONUNBUFFERED': '1'})
+ if res:
+ sys.exit(res)
+
+ # Update the current revision hash and ensure that it is well formed.
+ revision = subprocess.check_output(
+ [hg_bin, 'log',
+ '--rev', '.',
+ '--template', '{node}'],
+ cwd=dest,
+ # Triggers text mode on Python 3.
+ universal_newlines=True)
+
+ assert re.match('^[a-f0-9]{40}$', revision)
+
+ msg = ("TinderboxPrint:<a href={source_repo}/rev/{revision} "
+ "title='Built from {repo_name} revision {revision}'>"
+ "{revision}</a>\n".format(revision=revision,
+ source_repo=source_repo,
+ repo_name=source_repo.split('/')[-1]))
+
+ print_line(b'vcs', msg.encode('utf-8'))
+
+ return revision
+
+
+def fetch_artifacts():
+ print_line(b'fetches', b'fetching artifacts\n')
+
+ fetch_content = shutil.which('fetch-content')
+ if not fetch_content and os.environ.get('GECKO_PATH'):
+ fetch_content = os.path.join(os.environ['GECKO_PATH'], 'taskcluster',
+ 'scripts', 'misc', 'fetch-content')
+
+ if not fetch_content or not os.path.isfile(fetch_content):
+ fetch_content = os.path.join(os.path.dirname(__file__),
+ 'fetch-content')
+
+ if not os.path.isfile(fetch_content):
+ print(FETCH_CONTENT_NOT_FOUND)
+ sys.exit(1)
+
+ cmd = [sys.executable, '-u', fetch_content, 'task-artifacts']
+ res = run_and_prefix_output(b'fetches', cmd)
+ if res:
+ sys.exit(res)
+
+ print_line(b'fetches', b'finished fetching artifacts\n')
+
+
+def add_vcs_arguments(parser, project, name):
+ """Adds arguments to ArgumentParser to control VCS options for a project."""
+
+ parser.add_argument('--%s-checkout' % project,
+ help='Directory where %s checkout should be created' %
+ name)
+ parser.add_argument('--%s-sparse-profile' % project,
+ help='Path to sparse profile for %s checkout' % name)
+
+
+def resolve_checkout_url(base_repo, head_repo):
+ """Resolve the Mercurial URL to perform a checkout against, either the
+ public hg.mozilla.org service or a CI-only regional mirror.
+
+ The config will be of the form:
+ {
+ "aws/us-west-2": { # key built from `TASKCLUSTER_WORKER_LOCATION` variable
+ "rate": 0.5,
+ "domain": "us-west-2.hgmointernal.net"
+ },
+ "google/us-central1": {...}
+ }
+ """
+ worker_location = os.getenv('TASKCLUSTER_WORKER_LOCATION')
+ if not worker_location:
+ print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION environment variable not set; '
+ b'using public hg.mozilla.org service\n')
+ return base_repo, head_repo
+
+ try:
+ worker_location = json.loads(worker_location)
+ except json.JSONDecodeError:
+ print_line(b'vcs', b'Could not decode TASKCLUSTER_WORKER_LOCATION environment variable '
+ b'as JSON. Content: %s\n' % worker_location.encode('utf-8'))
+ print_line(b'vcs', b'using public hg.mozilla.org service\n')
+ return base_repo, head_repo
+
+ if 'cloud' not in worker_location or 'region' not in worker_location:
+ print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION missing required keys; '
+ b'using public hg.mozilla.org service\n')
+ return base_repo, head_repo
+
+ config_key = '%(cloud)s/%(region)s' % worker_location
+
+ try:
+ print_line(b'vcs', b'fetching hgmointernal config from %s\n' %
+ HGMOINTERNAL_CONFIG_URL.encode('utf-8'))
+
+ # Get the hgmointernal config Taskcluster secret
+ res = urllib.request.urlopen(HGMOINTERNAL_CONFIG_URL, timeout=10)
+ hgmointernal_config = json.loads(res.read().decode('utf-8'))['secret']
+
+ # Use public hg service if region not yet supported
+ if config_key not in hgmointernal_config:
+ print_line(b'vcs', b'region %s not yet supported; using public '
+ b'hg.mozilla.org service\n' % config_key.encode('utf-8'))
+
+ return base_repo, head_repo
+
+ # Only send a percentage of traffic to the internal mirror
+ rate = float(hgmointernal_config[config_key]['rate'])
+
+ if random.random() > rate:
+ print_line(b'vcs', b'hgmointernal rate miss; using '
+ b'public hg.mozilla.org service\n')
+ return base_repo, head_repo
+
+ print_line(b'vcs', b'hgmointernal rate hit; cloning from '
+ b'private hgweb mirror\n')
+
+ mirror_domain = hgmointernal_config[config_key]['domain']
+
+ if base_repo and base_repo.startswith('https://hg.mozilla.org'):
+ base_repo = base_repo.replace('hg.mozilla.org', mirror_domain, 1)
+
+ if head_repo and head_repo.startswith('https://hg.mozilla.org'):
+ head_repo = head_repo.replace('hg.mozilla.org', mirror_domain, 1)
+
+ return base_repo, head_repo
+
+ except (KeyError, ValueError):
+ print_line(b'vcs', b'invalid JSON in hgmointernal config; '
+ b'falling back to public hg.mozilla.org service\n')
+
+ except (urllib.error.URLError, socket.timeout):
+ print_line(b'vcs', b'Unable to retrieve hgmointernal config using '
+ b'the secret service; falling back to public hg.mozilla.org '
+ b'service\n')
+
+ return base_repo, head_repo
+
+
+def collect_vcs_options(args, project):
+ checkout = getattr(args, '%s_checkout' % project)
+ sparse_profile = getattr(args, '%s_sparse_profile' % project)
+
+ env_prefix = project.upper()
+
+ base_repo = os.environ.get('%s_BASE_REPOSITORY' % env_prefix)
+ head_repo = os.environ.get('%s_HEAD_REPOSITORY' % env_prefix)
+ revision = os.environ.get('%s_HEAD_REV' % env_prefix)
+ branch = os.environ.get('%s_HEAD_REF' % env_prefix)
+
+ store_path = os.environ.get('HG_STORE_PATH')
+
+ # Expand ~ in some paths.
+ if checkout:
+ checkout = os.path.expanduser(checkout)
+ if store_path:
+ store_path = os.path.expanduser(store_path)
+
+ # Some callers set the base repository to mozilla-central for historical
+ # reasons. Switch to mozilla-unified because robustcheckout works best
+ # with it.
+ if base_repo == 'https://hg.mozilla.org/mozilla-central':
+ base_repo = 'https://hg.mozilla.org/mozilla-unified'
+
+ # No need to check the hgmointernal config if we aren't performing
+ # a checkout.
+ if checkout:
+ base_repo, head_repo = resolve_checkout_url(base_repo, head_repo)
+
+ return {
+ 'store-path': store_path,
+ 'project': project,
+ 'env-prefix': env_prefix,
+ 'checkout': checkout,
+ 'sparse-profile': sparse_profile,
+ 'base-repo': base_repo,
+ 'head-repo': head_repo,
+ 'revision': revision,
+ 'branch': branch,
+ }
+
+
+def vcs_checkout_from_args(args, project):
+ options = collect_vcs_options(args, project)
+
+ if not options['checkout']:
+ if options['branch'] and not options['revision']:
+ print('task should be defined in terms of non-symbolic revision')
+ sys.exit(1)
+ return
+
+ os.environ['%s_HEAD_REV' % options['env-prefix']] = vcs_checkout(
+ options['head-repo'],
+ options['checkout'],
+ options['store-path'],
+ base_repo=options['base-repo'],
+ revision=options['revision'],
+ fetch_hgfingerprint=args.fetch_hgfingerprint,
+ branch=options['branch'],
+ sparse_profile=options['sparse-profile'])
+
+
+def maybe_run_resource_monitoring():
+ """Run the resource monitor if available.
+
+ Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160
+ and https://bugzil.la/1648051
+ """
+ if 'MOZ_FETCHES' not in os.environ:
+ return
+ if 'RESOURCE_MONITOR_OUTPUT' not in os.environ:
+ return
+
+ prefix = b'resource_monitor'
+
+ executable = '{}/resource-monitor/resource-monitor{}'.format(
+ os.environ.get('MOZ_FETCHES_DIR'), '.exe' if IS_WINDOWS else '')
+
+ if not os.path.exists(executable) or not os.access(executable, os.X_OK):
+ print_line(prefix, b"%s not executable\n" % executable.encode('utf-8'))
+ return
+ args = [
+ executable,
+ '-process',
+ str(os.getpid()),
+ '-output',
+ os.environ["RESOURCE_MONITOR_OUTPUT"],
+ ]
+ print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode('utf-8'))
+ # Avoid environment variables the payload doesn't need.
+ del os.environ['RESOURCE_MONITOR_OUTPUT']
+
+ # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too.
+ process = subprocess.Popen(args,
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0,
+ cwd=os.getcwd())
+
+ def capture_output():
+ fh = io.TextIOWrapper(process.stdout, encoding='latin1')
+ while True:
+ data = fh.readline().encode('latin1')
+ if data == b'':
+ break
+ print_line(prefix, data)
+
+ monitor_process = Thread(target=capture_output)
+ monitor_process.start()
+ return process
+
+
+def main(args):
+ print_line(b'setup', b'run-task started in %s\n' % os.getcwd().encode('utf-8'))
+ running_as_root = IS_POSIX and os.getuid() == 0
+
+ # Set a reasonable limit to the number of open files.
+ # Running under docker inherits the system defaults, which are not subject
+ # to the "standard" limits set by pam_limits.so, and while they work well
+ # for servers that may receive a lot of connections, they cause performance
+ # problems for things that close file descriptors before forking (for good
+ # reasons), like python's `subprocess.Popen(..., close_fds=True)` (and while
+ # the default was close_fds=False in python2, that changed in python3).
+ # In some cases, Firefox does the same thing when spawning subprocesses.
+ # Processes spawned by this one will inherit the limit set here.
+ try:
+ import resource
+ # Keep the hard limit the same, though, allowing processes to change their
+ # soft limit if they need to (Firefox does, for instance).
+ (soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE)
+ limit = os.environ.get('MOZ_LIMIT_NOFILE')
+ if limit:
+ limit = int(limit)
+ else:
+ # If no explicit limit is given, use 1024 if it's less than the current
+ # soft limit. For instance, the default on macOS is 256, so we'd pick
+ # that rather than 1024.
+ limit = min(soft, 1024)
+ # Now apply the limit, if it's different from the original one.
+ if limit != soft:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))
+ except ImportError:
+ # The resource module is UNIX only.
+ pass
+
+ # Arguments up to '--' are ours. After are for the main task
+ # to be executed.
+ try:
+ i = args.index('--')
+ our_args = args[0:i]
+ task_args = args[i + 1:]
+ except ValueError:
+ our_args = args
+ task_args = []
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--user', default='worker', help='user to run as')
+ parser.add_argument('--group', default='worker', help='group to run as')
+ parser.add_argument('--task-cwd', help='directory to run the provided command in')
+
+ add_vcs_arguments(parser, 'gecko', 'Firefox')
+ add_vcs_arguments(parser, 'comm', 'Comm')
+
+ parser.add_argument('--fetch-hgfingerprint', action='store_true',
+ help='Fetch the latest hgfingerprint from the secrets store, '
+ 'using the taskclsuerProxy')
+
+ args = parser.parse_args(our_args)
+
+ uid = gid = gids = None
+ if IS_POSIX and running_as_root:
+ user, group, gids = get_posix_user_group(args.user, args.group)
+ uid = user.pw_uid
+ gid = group.gr_gid
+
+ if running_as_root and os.path.exists("/dev/kvm"):
+ # Ensure kvm permissions for worker, required for Android x86
+ st = os.stat("/dev/kvm")
+ os.chmod("/dev/kvm", st.st_mode | 0o666)
+
+ # Validate caches.
+ #
+ # Taskgraph should pass in a list of paths that are caches via an
+ # environment variable (which we don't want to pass down to child
+ # processes).
+
+ if 'TASKCLUSTER_CACHES' in os.environ:
+ caches = os.environ['TASKCLUSTER_CACHES'].split(';')
+ del os.environ['TASKCLUSTER_CACHES']
+ else:
+ caches = []
+
+ if 'TASKCLUSTER_UNTRUSTED_CACHES' in os.environ:
+ untrusted_caches = True
+ del os.environ['TASKCLUSTER_UNTRUSTED_CACHES']
+ else:
+ untrusted_caches = False
+
+ for cache in caches:
+ if not os.path.isdir(cache):
+ print('error: cache %s is not a directory; this should never '
+ 'happen' % cache)
+ return 1
+
+ if running_as_root:
+ purge = configure_cache_posix(cache, user, group, untrusted_caches,
+ running_as_root)
+
+ if purge:
+ return EXIT_PURGE_CACHE
+
+ if 'TASKCLUSTER_VOLUMES' in os.environ:
+ volumes = os.environ['TASKCLUSTER_VOLUMES'].split(';')
+ del os.environ['TASKCLUSTER_VOLUMES']
+ else:
+ volumes = []
+
+ if volumes and not IS_POSIX:
+ print('assertion failed: volumes not expected on Windows')
+ return 1
+
+ # Sanitize volumes.
+ for volume in volumes:
+ # If a volume is a cache, it was dealt with above.
+ if volume in caches:
+ print_line(b'volume', b'volume %s is a cache\n' %
+ volume.encode('utf-8'))
+ continue
+
+ if running_as_root:
+ configure_volume_posix(volume, user, group, running_as_root)
+
+ all_caches_and_volumes = set(map(os.path.normpath, caches))
+ all_caches_and_volumes |= set(map(os.path.normpath, volumes))
+
+ def path_in_cache_or_volume(path):
+ path = os.path.normpath(path)
+
+ while path:
+ if path in all_caches_and_volumes:
+ return True
+
+ path, child = os.path.split(path)
+ if not child:
+ break
+
+ return False
+
+ def prepare_checkout_dir(checkout):
+ if not checkout:
+ return
+
+ # The checkout path becomes the working directory. Since there are
+ # special cache files in the cache's root directory and working
+ # directory purging could blow them away, disallow this scenario.
+ if os.path.exists(os.path.join(checkout, '.cacherequires')):
+ print('error: cannot perform vcs checkout into cache root: %s' %
+ checkout)
+ sys.exit(1)
+
+ # TODO given the performance implications, consider making this a fatal
+ # error.
+ if not path_in_cache_or_volume(checkout):
+ print_line(b'vcs', b'WARNING: vcs checkout path (%s) not in cache '
+ b'or volume; performance will likely suffer\n' %
+ checkout.encode('utf-8'))
+
+ # Ensure the directory for the source checkout exists.
+ try:
+ os.makedirs(os.path.dirname(checkout))
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ # And that it is owned by the appropriate user/group.
+ if running_as_root:
+ os.chown(os.path.dirname(checkout), uid, gid)
+
+ def prepare_hg_store_path():
+ # And ensure the shared store path exists and has proper permissions.
+ if 'HG_STORE_PATH' not in os.environ:
+ print('error: HG_STORE_PATH environment variable not set')
+ sys.exit(1)
+
+ store_path = os.environ['HG_STORE_PATH']
+
+ if not path_in_cache_or_volume(store_path):
+ print_line(b'vcs', b'WARNING: HG_STORE_PATH (%s) not in cache or '
+ b'volume; performance will likely suffer\n' %
+ store_path.encode('utf-8'))
+
+ try:
+ os.makedirs(store_path)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ if running_as_root:
+ os.chown(store_path, uid, gid)
+
+ prepare_checkout_dir(args.gecko_checkout)
+ if args.gecko_checkout or args.comm_checkout:
+ prepare_hg_store_path()
+
+ if IS_POSIX and running_as_root:
+ # Drop permissions to requested user.
+ # This code is modeled after what `sudo` was observed to do in a Docker
+ # container. We do not bother calling setrlimit() because containers have
+ # their own limits.
+ print_line(b'setup', b'running as %s:%s\n' % (
+ args.user.encode('utf-8'), args.group.encode('utf-8')))
+
+ os.setgroups(gids)
+ os.umask(0o22)
+ os.setresgid(gid, gid, gid)
+ os.setresuid(uid, uid, uid)
+
+ vcs_checkout_from_args(args, 'gecko')
+ vcs_checkout_from_args(args, 'comm')
+
+ resource_process = None
+
+ try:
+ for k in ('GECKO_PATH', 'MOZ_FETCHES_DIR', 'UPLOAD_DIR', 'MOZ_PYTHON_HOME'):
+ if k in os.environ:
+ # Normalize paths to use forward slashes. Some shell scripts
+ # tolerate that better on Windows.
+ os.environ[k] = os.path.abspath(os.environ[k]).replace(os.sep, '/')
+ print_line(b'setup', b'%s is %s\n' % (
+ k.encode('utf-8'),
+ os.environ[k].encode('utf-8')))
+
+ if 'MOZ_FETCHES' in os.environ:
+ fetch_artifacts()
+
+ # If Python is a fetch dependency, add it to the PATH and setting
+ # the mozilla-specific MOZ_PYTHON_HOME to relocate binaries.
+ if 'MOZ_PYTHON_HOME' in os.environ:
+
+ print_line(b'setup',
+ b'Setting up local python environment\n')
+ prev = [os.environ['PATH']] if 'PATH' in os.environ else []
+
+ moz_python_home = os.environ['MOZ_PYTHON_HOME']
+ if IS_WINDOWS:
+ ext = '.exe'
+ moz_python_bindir = moz_python_home
+ else:
+ ext = ''
+ moz_python_bindir = moz_python_home + '/bin'
+
+
+ # just a sanity check
+ candidate = os.path.join(moz_python_bindir, f'python3{ext}')
+ if not os.path.exists(candidate):
+ raise RuntimeError("Inconsistent Python installation: "
+ "archive found, but no python3 binary "
+ "detected")
+
+ new = os.environ['PATH'] = os.pathsep.join([moz_python_bindir]
+ + prev)
+
+ # Relocate the python binary. Standard way uses PYTHONHOME, but
+ # this conflicts with system python (e.g. used by hg) so we
+ # maintain a small patch to use MOZPYTHONHOME instead.
+ os.environ['MOZPYTHONHOME'] = moz_python_home
+
+ print_line(b'setup',
+ b'updated PATH with python artifact: '
+ + new.encode() + b'\n')
+
+
+ resource_process = maybe_run_resource_monitoring()
+
+ return run_and_prefix_output(b'task', task_args, cwd=args.task_cwd)
+ finally:
+ if resource_process:
+ print_line(b'resource_monitor', b'terminating\n')
+ if IS_WINDOWS:
+ # .terminate() on Windows is not a graceful shutdown, due to
+ # differences in signals. CTRL_BREAK_EVENT will work provided
+ # the subprocess is in a different process group, so this script
+ # isn't also killed.
+ os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT)
+ else:
+ resource_process.terminate()
+ resource_process.wait()
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv[1:]))