# mypy: allow-untyped-defs import argparse import json import logging import os import re import subprocess from collections import OrderedDict import taskcluster from . import taskgraph here = os.path.abspath(os.path.dirname(__file__)) logging.basicConfig() logger = logging.getLogger() def get_triggers(event): # Set some variables that we use to get the commits on the current branch ref_prefix = "refs/heads/" is_pr = "pull_request" in event branch = None if not is_pr and "ref" in event: branch = event["ref"] if branch.startswith(ref_prefix): branch = branch[len(ref_prefix):] return is_pr, branch def fetch_event_data(queue): try: task_id = os.environ["TASK_ID"] except KeyError: logger.warning("Missing TASK_ID environment variable") # For example under local testing return None task_data = queue.task(task_id) return task_data.get("extra", {}).get("github_event") def filter_triggers(event, all_tasks): is_pr, branch = get_triggers(event) triggered = OrderedDict() for name, task in all_tasks.items(): if "trigger" in task: if is_pr and "pull-request" in task["trigger"]: triggered[name] = task elif branch is not None and "branch" in task["trigger"]: for trigger_branch in task["trigger"]["branch"]: if (trigger_branch == branch or trigger_branch.endswith("*") and branch.startswith(trigger_branch[:-1])): triggered[name] = task logger.info("Triggers match tasks:\n * %s" % "\n * ".join(triggered.keys())) return triggered def get_run_jobs(event): from tools.ci import jobs revish = "%s..%s" % (event["pull_request"]["base"]["sha"] if "pull_request" in event else event["before"], event["pull_request"]["head"]["sha"] if "pull_request" in event else event["after"]) logger.info("Looking for changes in range %s" % revish) paths = jobs.get_paths(revish=revish) logger.info("Found changes in paths:%s" % "\n".join(paths)) path_jobs = jobs.get_jobs(paths) all_jobs = path_jobs | get_extra_jobs(event) logger.info("Including jobs:\n * %s" % "\n * ".join(all_jobs)) return all_jobs def get_extra_jobs(event): body = None jobs = set() if "commits" in event and event["commits"]: body = event["commits"][0]["message"] elif "pull_request" in event: body = event["pull_request"]["body"] if not body: return jobs regexp = re.compile(r"\s*tc-jobs:(.*)$") for line in body.splitlines(): m = regexp.match(line) if m: items = m.group(1) for item in items.split(","): jobs.add(item.strip()) break return jobs def filter_excluded_users(tasks, event): # Some users' pull requests are excluded from tasks, # such as pull requests from automated exports. try: submitter = event["pull_request"]["user"]["login"] except KeyError: # Just ignore excluded users if the # username cannot be pulled from the event. logger.debug("Unable to read username from event. Continuing.") return excluded_tasks = [] # A separate list of items for tasks is needed to iterate over # because removing an item during iteration will raise an error. for name, task in list(tasks.items()): if submitter in task.get("exclude-users", []): excluded_tasks.append(name) tasks.pop(name) # removing excluded task if excluded_tasks: logger.info( f"Tasks excluded for user {submitter}:\n * " + "\n * ".join(excluded_tasks) ) def filter_schedule_if(event, tasks): scheduled = OrderedDict() run_jobs = None for name, task in tasks.items(): if "schedule-if" in task: if "run-job" in task["schedule-if"]: if run_jobs is None: run_jobs = get_run_jobs(event) if "all" in run_jobs or any(item in run_jobs for item in task["schedule-if"]["run-job"]): scheduled[name] = task else: scheduled[name] = task logger.info("Scheduling rules match tasks:\n * %s" % "\n * ".join(scheduled.keys())) return scheduled def get_fetch_rev(event): is_pr, _ = get_triggers(event) if is_pr: # Try to get the actual rev so that all non-decision tasks are pinned to that rv = ["refs/pull/%s/merge" % event["pull_request"]["number"]] # For every PR GitHub maintains a 'head' branch with commits from the # PR, and a 'merge' branch containing a merge commit between the base # branch and the PR. for ref_type in ["head", "merge"]: ref = "refs/pull/%s/%s" % (event["pull_request"]["number"], ref_type) sha = None try: output = subprocess.check_output(["git", "ls-remote", "origin", ref]) except subprocess.CalledProcessError: import traceback logger.error(traceback.format_exc()) logger.error("Failed to get commit sha1 for %s" % ref) else: if not output: logger.error("Failed to get commit for %s" % ref) else: sha = output.decode("utf-8").split()[0] rv.append(sha) rv = tuple(rv) else: # For a branch push we have a ref and a head but no merge SHA rv = (event["ref"], event["after"], None) assert len(rv) == 3 return rv def build_full_command(event, task): fetch_ref, head_sha, merge_sha = get_fetch_rev(event) cmd_args = { "task_name": task["name"], "repo_url": event["repository"]["clone_url"], "fetch_ref": fetch_ref, "task_cmd": task["command"], "install_str": "", } options = task.get("options", {}) options_args = [] options_args.append("--ref=%s" % fetch_ref) if head_sha is not None: options_args.append("--head-rev=%s" % head_sha) if merge_sha is not None: options_args.append("--merge-rev=%s" % merge_sha) if options.get("oom-killer"): options_args.append("--oom-killer") if options.get("xvfb"): options_args.append("--xvfb") if not options.get("hosts"): options_args.append("--no-hosts") else: options_args.append("--hosts") # Check out the expected SHA unless it is overridden (e.g. to base_head). if options.get("checkout"): options_args.append("--checkout=%s" % options["checkout"]) for browser in options.get("browser", []): options_args.append("--browser=%s" % browser) if options.get("channel"): options_args.append("--channel=%s" % options["channel"]) if options.get("install-certificates"): options_args.append("--install-certificates") cmd_args["options_str"] = " ".join(str(item) for item in options_args) install_packages = task.get("install") if install_packages: install_items = ["apt update -qqy"] install_items.extend("apt install -qqy %s" % item for item in install_packages) cmd_args["install_str"] = "\n".join("sudo %s;" % item for item in install_items) return ["/bin/bash", "--login", "-xc", """ ~/start.sh \ %(repo_url)s \ %(fetch_ref)s; %(install_str)s cd web-platform-tests; ./tools/ci/run_tc.py %(options_str)s -- %(task_cmd)s; """ % cmd_args] def get_owner(event): if "pusher" in event: pusher = event.get("pusher", {}).get("email", "") if pusher and "@" in pusher: return pusher return "web-platform-tests@users.noreply.github.com" def create_tc_task(event, task, taskgroup_id, depends_on_ids, env_extra=None): command = build_full_command(event, task) task_id = taskcluster.slugId() task_data = { "taskGroupId": taskgroup_id, "created": taskcluster.fromNowJSON(""), "deadline": taskcluster.fromNowJSON(task["deadline"]), "provisionerId": task["provisionerId"], "schedulerId": task["schedulerId"], "workerType": task["workerType"], "scopes": task.get("scopes", []), "metadata": { "name": task["name"], "description": task.get("description", ""), "owner": get_owner(event), "source": event["repository"]["clone_url"] }, "payload": { "artifacts": task.get("artifacts"), "command": command, "image": task.get("image"), "maxRunTime": task.get("maxRunTime"), "env": task.get("env", {}), }, "extra": { "github_event": json.dumps(event) }, "routes": ["checks"] } if "extra" in task: task_data["extra"].update(task["extra"]) if task.get("privileged"): if "capabilities" not in task_data["payload"]: task_data["payload"]["capabilities"] = {} task_data["payload"]["capabilities"]["privileged"] = True if env_extra: task_data["payload"]["env"].update(env_extra) if depends_on_ids: task_data["dependencies"] = depends_on_ids task_data["requires"] = task.get("requires", "all-completed") return task_id, task_data def get_artifact_data(artifact, task_id_map): task_id, data = task_id_map[artifact["task"]] return { "task": task_id, "glob": artifact["glob"], "dest": artifact["dest"], "extract": artifact.get("extract", False) } def build_task_graph(event, all_tasks, tasks): task_id_map = OrderedDict() taskgroup_id = os.environ.get("TASK_ID", taskcluster.slugId()) sink_task_depends_on = [] def add_task(task_name, task): depends_on_ids = [] if "depends-on" in task: for depends_name in task["depends-on"]: if depends_name not in task_id_map: add_task(depends_name, all_tasks[depends_name]) depends_on_ids.append(task_id_map[depends_name][0]) env_extra = {} if "download-artifacts" in task: env_extra["TASK_ARTIFACTS"] = json.dumps( [get_artifact_data(artifact, task_id_map) for artifact in task["download-artifacts"]]) task_id, task_data = create_tc_task(event, task, taskgroup_id, depends_on_ids, env_extra=env_extra) task_id_map[task_name] = (task_id, task_data) # The string conversion here is because if we use variables they are # converted to a string, so it's easier to use a string always if str(task.get("required", "True")) != "False" and task_name != "sink-task": sink_task_depends_on.append(task_id) for task_name, task in tasks.items(): if task_name == "sink-task": # sink-task will be created below at the end of the ordered dict, # so that it can depend on all other tasks. continue add_task(task_name, task) # GitHub branch protection for pull requests needs us to name explicit # required tasks - which doesn't suffice when using a dynamic task graph. # To work around this we declare a sink task that depends on all the other # tasks completing, and checks if they have succeeded. We can then # make the sink task the sole required task for pull requests. sink_task = tasks.get("sink-task") if sink_task: logger.info("Scheduling sink-task") sink_task["command"] += " {}".format(" ".join(sink_task_depends_on)) task_id_map["sink-task"] = create_tc_task( event, sink_task, taskgroup_id, sink_task_depends_on) else: logger.info("sink-task is not scheduled") return task_id_map def create_tasks(queue, task_id_map): for (task_id, task_data) in task_id_map.values(): queue.createTask(task_id, task_data) def get_event(queue, event_path): if event_path is not None: try: with open(event_path) as f: event_str = f.read() except OSError: logger.error("Missing event file at path %s" % event_path) raise elif "TASK_EVENT" in os.environ: event_str = os.environ["TASK_EVENT"] else: event_str = fetch_event_data(queue) if not event_str: raise ValueError("Can't find GitHub event definition; for local testing pass --event-path") try: return json.loads(event_str) except ValueError: logger.error("Event was not valid JSON") raise def decide(event): all_tasks = taskgraph.load_tasks_from_path(os.path.join(here, "tasks", "test.yml")) triggered_tasks = filter_triggers(event, all_tasks) scheduled_tasks = filter_schedule_if(event, triggered_tasks) filter_excluded_users(scheduled_tasks, event) logger.info("UNSCHEDULED TASKS:\n %s" % "\n ".join(sorted(set(all_tasks.keys()) - set(scheduled_tasks.keys())))) logger.info("SCHEDULED TASKS:\n %s" % "\n ".join(sorted(scheduled_tasks.keys()))) task_id_map = build_task_graph(event, all_tasks, scheduled_tasks) return task_id_map def get_parser(): parser = argparse.ArgumentParser() parser.add_argument("--event-path", help="Path to file containing serialized GitHub event") parser.add_argument("--dry-run", action="store_true", help="Don't actually create the tasks, just output the tasks that " "would be created") parser.add_argument("--tasks-path", help="Path to file in which to write payload for all scheduled tasks") return parser def run(venv, **kwargs): queue = taskcluster.Queue({'rootUrl': os.environ['TASKCLUSTER_PROXY_URL']}) event = get_event(queue, event_path=kwargs["event_path"]) task_id_map = decide(event) try: if not kwargs["dry_run"]: create_tasks(queue, task_id_map) else: print(json.dumps(task_id_map, indent=2)) finally: if kwargs["tasks_path"]: with open(kwargs["tasks_path"], "w") as f: json.dump(task_id_map, f, indent=2)