diff options
Diffstat (limited to 'taskcluster/gecko_taskgraph/main.py')
-rw-r--r-- | taskcluster/gecko_taskgraph/main.py | 764 |
1 files changed, 764 insertions, 0 deletions
diff --git a/taskcluster/gecko_taskgraph/main.py b/taskcluster/gecko_taskgraph/main.py new file mode 100644 index 0000000000..42a7303b12 --- /dev/null +++ b/taskcluster/gecko_taskgraph/main.py @@ -0,0 +1,764 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import argparse +import atexit +import json +import logging +import os +import re +import shutil +import subprocess +import sys +import tempfile +import traceback +from collections import namedtuple +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path +from typing import Any, List + +import appdirs +import yaml + +Command = namedtuple("Command", ["func", "args", "kwargs", "defaults"]) +commands = {} + + +def command(*args, **kwargs): + defaults = kwargs.pop("defaults", {}) + + def decorator(func): + commands[args[0]] = Command(func, args, kwargs, defaults) + return func + + return decorator + + +def argument(*args, **kwargs): + def decorator(func): + if not hasattr(func, "args"): + func.args = [] + func.args.append((args, kwargs)) + return func + + return decorator + + +def format_taskgraph_labels(taskgraph): + return "\n".join( + sorted( + taskgraph.tasks[index].label for index in taskgraph.graph.visit_postorder() + ) + ) + + +def format_taskgraph_json(taskgraph): + return json.dumps( + taskgraph.to_json(), sort_keys=True, indent=2, separators=(",", ": ") + ) + + +def format_taskgraph_yaml(taskgraph): + return yaml.safe_dump(taskgraph.to_json(), default_flow_style=False) + + +def get_filtered_taskgraph(taskgraph, tasksregex): + """ + Filter all the tasks on basis of a regular expression + and returns a new TaskGraph object + """ + from taskgraph.graph import Graph + from taskgraph.taskgraph import TaskGraph + + # return original taskgraph if no regular expression is passed + if not tasksregex: + return taskgraph + named_links_dict = taskgraph.graph.named_links_dict() + filteredtasks = {} + filterededges = set() + regexprogram = re.compile(tasksregex) + + for key in taskgraph.graph.visit_postorder(): + task = taskgraph.tasks[key] + if regexprogram.match(task.label): + filteredtasks[key] = task + for depname, dep in named_links_dict[key].items(): + if regexprogram.match(dep): + filterededges.add((key, dep, depname)) + filtered_taskgraph = TaskGraph( + filteredtasks, Graph(set(filteredtasks), filterededges) + ) + return filtered_taskgraph + + +FORMAT_METHODS = { + "labels": format_taskgraph_labels, + "json": format_taskgraph_json, + "yaml": format_taskgraph_yaml, +} + + +def get_taskgraph_generator(root, parameters): + """Helper function to make testing a little easier.""" + from taskgraph.generator import TaskGraphGenerator + + return TaskGraphGenerator(root_dir=root, parameters=parameters) + + +def format_taskgraph(options, parameters, logfile=None): + import taskgraph + from taskgraph.parameters import parameters_loader + + if logfile: + handler = logging.FileHandler(logfile, mode="w") + if logging.root.handlers: + oldhandler = logging.root.handlers[-1] + logging.root.removeHandler(oldhandler) + handler.setFormatter(oldhandler.formatter) + logging.root.addHandler(handler) + + if options["fast"]: + taskgraph.fast = True + + if isinstance(parameters, str): + parameters = parameters_loader( + parameters, + overrides={"target-kind": options.get("target_kind")}, + strict=False, + ) + + tgg = get_taskgraph_generator(options.get("root"), parameters) + + tg = getattr(tgg, options["graph_attr"]) + tg = get_filtered_taskgraph(tg, options["tasks_regex"]) + format_method = FORMAT_METHODS[options["format"] or "labels"] + return format_method(tg) + + +def dump_output(out, path=None, params_spec=None): + from taskgraph.parameters import Parameters + + params_name = Parameters.format_spec(params_spec) + fh = None + if path: + # Substitute params name into file path if necessary + if params_spec and "{params}" not in path: + name, ext = os.path.splitext(path) + name += "_{params}" + path = name + ext + + path = path.format(params=params_name) + fh = open(path, "w") + else: + print( + "Dumping result with parameters from {}:".format(params_name), + file=sys.stderr, + ) + print(out + "\n", file=fh) + + +def generate_taskgraph(options, parameters, logdir): + from taskgraph.parameters import Parameters + + def logfile(spec): + """Determine logfile given a parameters specification.""" + if logdir is None: + return None + return os.path.join( + logdir, + "{}_{}.log".format(options["graph_attr"], Parameters.format_spec(spec)), + ) + + # Don't bother using futures if there's only one parameter. This can make + # tracebacks a little more readable and avoids additional process overhead. + if len(parameters) == 1: + spec = parameters[0] + out = format_taskgraph(options, spec, logfile(spec)) + dump_output(out, options["output_file"]) + return + + futures = {} + with ProcessPoolExecutor() as executor: + for spec in parameters: + f = executor.submit(format_taskgraph, options, spec, logfile(spec)) + futures[f] = spec + + for future in as_completed(futures): + output_file = options["output_file"] + spec = futures[future] + e = future.exception() + if e: + out = "".join(traceback.format_exception(type(e), e, e.__traceback__)) + if options["diff"]: + # Dump to console so we don't accidentally diff the tracebacks. + output_file = None + else: + out = future.result() + + dump_output( + out, + path=output_file, + params_spec=spec if len(parameters) > 1 else None, + ) + + +@command( + "tasks", + help="Show all tasks in the taskgraph.", + defaults={"graph_attr": "full_task_set"}, +) +@command( + "full", help="Show the full taskgraph.", defaults={"graph_attr": "full_task_graph"} +) +@command( + "target", + help="Show the set of target tasks.", + defaults={"graph_attr": "target_task_set"}, +) +@command( + "target-graph", + help="Show the target graph.", + defaults={"graph_attr": "target_task_graph"}, +) +@command( + "optimized", + help="Show the optimized graph.", + defaults={"graph_attr": "optimized_task_graph"}, +) +@command( + "morphed", + help="Show the morphed graph.", + defaults={"graph_attr": "morphed_task_graph"}, +) +@argument("--root", "-r", help="root of the taskgraph definition relative to topsrcdir") +@argument("--quiet", "-q", action="store_true", help="suppress all logging output") +@argument( + "--verbose", "-v", action="store_true", help="include debug-level logging output" +) +@argument( + "--json", + "-J", + action="store_const", + dest="format", + const="json", + help="Output task graph as a JSON object", +) +@argument( + "--yaml", + "-Y", + action="store_const", + dest="format", + const="yaml", + help="Output task graph as a YAML object", +) +@argument( + "--labels", + "-L", + action="store_const", + dest="format", + const="labels", + help="Output the label for each task in the task graph (default)", +) +@argument( + "--parameters", + "-p", + default=None, + action="append", + help="Parameters to use for the generation. Can be a path to file (.yml or " + ".json; see `taskcluster/docs/parameters.rst`), a directory (containing " + "parameters files), a url, of the form `project=mozilla-central` to download " + "latest parameters file for the specified project from CI, or of the form " + "`task-id=<decision task id>` to download parameters from the specified " + "decision task. Can be specified multiple times, in which case multiple " + "generations will happen from the same invocation (one per parameters " + "specified).", +) +@argument( + "--no-optimize", + dest="optimize", + action="store_false", + default="true", + help="do not remove tasks from the graph that are found in the " + "index (a.k.a. optimize the graph)", +) +@argument( + "-o", + "--output-file", + default=None, + help="file path to store generated output.", +) +@argument( + "--tasks-regex", + "--tasks", + default=None, + help="only return tasks with labels matching this regular " "expression.", +) +@argument( + "--target-kind", + default=None, + help="only return tasks that are of the given kind, or their dependencies.", +) +@argument( + "-F", + "--fast", + default=False, + action="store_true", + help="enable fast task generation for local debugging.", +) +@argument( + "--diff", + const="default", + nargs="?", + default=None, + help="Generate and diff the current taskgraph against another revision. " + "Without args the base revision will be used. A revision specifier such as " + "the hash or `.~1` (hg) or `HEAD~1` (git) can be used as well.", +) +def show_taskgraph(options): + from mozversioncontrol import get_repository_object as get_repository + from taskgraph.parameters import Parameters, parameters_loader + + if options.pop("verbose", False): + logging.root.setLevel(logging.DEBUG) + + repo = None + cur_ref = None + diffdir = None + output_file = options["output_file"] + + if options["diff"]: + repo = get_repository(os.getcwd()) + + if not repo.working_directory_clean(): + print( + "abort: can't diff taskgraph with dirty working directory", + file=sys.stderr, + ) + return 1 + + # We want to return the working directory to the current state + # as best we can after we're done. In all known cases, using + # branch or bookmark (which are both available on the VCS object) + # as `branch` is preferable to a specific revision. + cur_ref = repo.branch or repo.head_ref[:12] + + diffdir = tempfile.mkdtemp() + atexit.register( + shutil.rmtree, diffdir + ) # make sure the directory gets cleaned up + options["output_file"] = os.path.join( + diffdir, f"{options['graph_attr']}_{cur_ref}" + ) + print(f"Generating {options['graph_attr']} @ {cur_ref}", file=sys.stderr) + + parameters: List[Any[str, Parameters]] = options.pop("parameters") + if not parameters: + overrides = { + "target-kind": options.get("target_kind"), + } + parameters = [ + parameters_loader(None, strict=False, overrides=overrides) + ] # will use default values + + for param in parameters[:]: + if isinstance(param, str) and os.path.isdir(param): + parameters.remove(param) + parameters.extend( + [ + p.as_posix() + for p in Path(param).iterdir() + if p.suffix in (".yml", ".json") + ] + ) + + logdir = None + if len(parameters) > 1: + # Log to separate files for each process instead of stderr to + # avoid interleaving. + basename = os.path.basename(os.getcwd()) + logdir = os.path.join(appdirs.user_log_dir("taskgraph"), basename) + if not os.path.isdir(logdir): + os.makedirs(logdir) + else: + # Only setup logging if we have a single parameter spec. Otherwise + # logging will go to files. This is also used as a hook for Gecko + # to setup its `mach` based logging. + setup_logging() + + generate_taskgraph(options, parameters, logdir) + + if options["diff"]: + assert diffdir is not None + assert repo is not None + + # Reload taskgraph modules to pick up changes and clear global state. + for mod in sys.modules.copy(): + if mod != __name__ and mod.split(".", 1)[0].endswith("taskgraph"): + del sys.modules[mod] + + # Ensure gecko_taskgraph is ahead of taskcluster_taskgraph in sys.path. + # Without this, we may end up validating some things against the wrong + # schema. + import gecko_taskgraph # noqa + + if options["diff"] == "default": + base_ref = repo.base_ref + else: + base_ref = options["diff"] + + try: + repo.update(base_ref) + base_ref = repo.head_ref[:12] + options["output_file"] = os.path.join( + diffdir, f"{options['graph_attr']}_{base_ref}" + ) + print(f"Generating {options['graph_attr']} @ {base_ref}", file=sys.stderr) + generate_taskgraph(options, parameters, logdir) + finally: + repo.update(cur_ref) + + # Generate diff(s) + diffcmd = [ + "diff", + "-U20", + "--report-identical-files", + f"--label={options['graph_attr']}@{base_ref}", + f"--label={options['graph_attr']}@{cur_ref}", + ] + + non_fatal_failures = [] + for spec in parameters: + base_path = os.path.join(diffdir, f"{options['graph_attr']}_{base_ref}") + cur_path = os.path.join(diffdir, f"{options['graph_attr']}_{cur_ref}") + + params_name = None + if len(parameters) > 1: + params_name = Parameters.format_spec(spec) + base_path += f"_{params_name}" + cur_path += f"_{params_name}" + + # If the base or cur files are missing it means that generation + # failed. If one of them failed but not the other, the failure is + # likely due to the patch making changes to taskgraph in modules + # that don't get reloaded (safe to ignore). If both generations + # failed, there's likely a real issue. + base_missing = not os.path.isfile(base_path) + cur_missing = not os.path.isfile(cur_path) + if base_missing != cur_missing: # != is equivalent to XOR for booleans + non_fatal_failures.append(os.path.basename(base_path)) + continue + + try: + # If the output file(s) are missing, this command will raise + # CalledProcessError with a returncode > 1. + proc = subprocess.run( + diffcmd + [base_path, cur_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + check=True, + ) + diff_output = proc.stdout + returncode = 0 + except subprocess.CalledProcessError as e: + # returncode 1 simply means diffs were found + if e.returncode != 1: + print(e.stderr, file=sys.stderr) + raise + diff_output = e.output + returncode = e.returncode + + dump_output( + diff_output, + # Don't bother saving file if no diffs were found. Log to + # console in this case instead. + path=None if returncode == 0 else output_file, + params_spec=spec if len(parameters) > 1 else None, + ) + + if non_fatal_failures: + failstr = "\n ".join(sorted(non_fatal_failures)) + print( + "WARNING: Diff skipped for the following generation{s} " + "due to failures:\n {failstr}".format( + s="s" if len(non_fatal_failures) > 1 else "", failstr=failstr + ), + file=sys.stderr, + ) + + if options["format"] != "json": + print( + "If you were expecting differences in task bodies " + 'you should pass "-J"\n', + file=sys.stderr, + ) + + if len(parameters) > 1: + print("See '{}' for logs".format(logdir), file=sys.stderr) + + +@command("build-image", help="Build a Docker image") +@argument("image_name", help="Name of the image to build") +@argument( + "-t", "--tag", help="tag that the image should be built as.", metavar="name:tag" +) +@argument( + "--context-only", + help="File name the context tarball should be written to." + "with this option it will only build the context.tar.", + metavar="context.tar", +) +def build_image(args): + from gecko_taskgraph.docker import build_context, build_image + + if args["context_only"] is None: + build_image(args["image_name"], args["tag"], os.environ) + else: + build_context(args["image_name"], args["context_only"], os.environ) + + +@command( + "load-image", + help="Load a pre-built Docker image. Note that you need to " + "have docker installed and running for this to work.", +) +@argument( + "--task-id", + help="Load the image at public/image.tar.zst in this task, " + "rather than searching the index", +) +@argument( + "-t", + "--tag", + help="tag that the image should be loaded as. If not " + "image will be loaded with tag from the tarball", + metavar="name:tag", +) +@argument( + "image_name", + nargs="?", + help="Load the image of this name based on the current " + "contents of the tree (as built for mozilla-central " + "or mozilla-inbound)", +) +def load_image(args): + from gecko_taskgraph.docker import load_image_by_name, load_image_by_task_id + + if not args.get("image_name") and not args.get("task_id"): + print("Specify either IMAGE-NAME or TASK-ID") + sys.exit(1) + try: + if args["task_id"]: + ok = load_image_by_task_id(args["task_id"], args.get("tag")) + else: + ok = load_image_by_name(args["image_name"], args.get("tag")) + if not ok: + sys.exit(1) + except Exception: + traceback.print_exc() + sys.exit(1) + + +@command("image-digest", help="Print the digest of a docker image.") +@argument( + "image_name", + help="Print the digest of the image of this name based on the current " + "contents of the tree.", +) +def image_digest(args): + from gecko_taskgraph.docker import get_image_digest + + try: + digest = get_image_digest(args["image_name"]) + print(digest) + except Exception: + traceback.print_exc() + sys.exit(1) + + +@command("decision", help="Run the decision task") +@argument("--root", "-r", help="root of the taskgraph definition relative to topsrcdir") +@argument( + "--message", + required=False, + help=argparse.SUPPRESS, +) +@argument( + "--project", + required=True, + help="Project to use for creating task graph. Example: --project=try", +) +@argument("--pushlog-id", dest="pushlog_id", required=True, default="0") +@argument("--pushdate", dest="pushdate", required=True, type=int, default=0) +@argument("--owner", required=True, help="email address of who owns this graph") +@argument("--level", required=True, help="SCM level of this repository") +@argument( + "--target-tasks-method", help="method for selecting the target tasks to generate" +) +@argument( + "--repository-type", + required=True, + help='Type of repository, either "hg" or "git"', +) +@argument("--base-repository", required=True, help='URL for "base" repository to clone') +@argument( + "--base-ref", default="", help='Reference of the revision in the "base" repository' +) +@argument( + "--base-rev", + default="", + help="Taskgraph decides what to do based on the revision range between " + "`--base-rev` and `--head-rev`. Value is determined automatically if not provided", +) +@argument( + "--head-repository", + required=True, + help='URL for "head" repository to fetch revision from', +) +@argument( + "--head-ref", required=True, help="Reference (this is same as rev usually for hg)" +) +@argument( + "--head-rev", required=True, help="Commit revision to use from head repository" +) +@argument("--head-tag", help="Tag attached to the revision", default="") +@argument( + "--tasks-for", required=True, help="the tasks_for value used to generate this task" +) +@argument("--try-task-config-file", help="path to try task configuration file") +def decision(options): + from gecko_taskgraph.decision import taskgraph_decision + + taskgraph_decision(options) + + +@command("action-callback", description="Run action callback used by action tasks") +@argument( + "--root", + "-r", + default="taskcluster/ci", + help="root of the taskgraph definition relative to topsrcdir", +) +def action_callback(options): + from gecko_taskgraph.actions import trigger_action_callback + from gecko_taskgraph.actions.util import get_parameters + + try: + # the target task for this action (or null if it's a group action) + task_id = json.loads(os.environ.get("ACTION_TASK_ID", "null")) + # the target task group for this action + task_group_id = os.environ.get("ACTION_TASK_GROUP_ID", None) + input = json.loads(os.environ.get("ACTION_INPUT", "null")) + callback = os.environ.get("ACTION_CALLBACK", None) + root = options["root"] + + parameters = get_parameters(task_group_id) + + return trigger_action_callback( + task_group_id=task_group_id, + task_id=task_id, + input=input, + callback=callback, + parameters=parameters, + root=root, + test=False, + ) + except Exception: + traceback.print_exc() + sys.exit(1) + + +@command("test-action-callback", description="Run an action callback in a testing mode") +@argument( + "--root", + "-r", + default="taskcluster/ci", + help="root of the taskgraph definition relative to topsrcdir", +) +@argument( + "--parameters", + "-p", + default="", + help="parameters file (.yml or .json; see " "`taskcluster/docs/parameters.rst`)`", +) +@argument("--task-id", default=None, help="TaskId to which the action applies") +@argument( + "--task-group-id", default=None, help="TaskGroupId to which the action applies" +) +@argument("--input", default=None, help="Action input (.yml or .json)") +@argument("callback", default=None, help="Action callback name (Python function name)") +def test_action_callback(options): + import taskgraph.parameters + from taskgraph.config import load_graph_config + from taskgraph.util import yaml + + import gecko_taskgraph.actions + + def load_data(filename): + with open(filename) as f: + if filename.endswith(".yml"): + return yaml.load_stream(f) + elif filename.endswith(".json"): + return json.load(f) + else: + raise Exception(f"unknown filename {filename}") + + try: + task_id = options["task_id"] + + if options["input"]: + input = load_data(options["input"]) + else: + input = None + + root = options["root"] + graph_config = load_graph_config(root) + trust_domain = graph_config["trust-domain"] + graph_config.register() + + parameters = taskgraph.parameters.load_parameters_file( + options["parameters"], strict=False, trust_domain=trust_domain + ) + parameters.check() + + return gecko_taskgraph.actions.trigger_action_callback( + task_group_id=options["task_group_id"], + task_id=task_id, + input=input, + callback=options["callback"], + parameters=parameters, + root=root, + test=True, + ) + except Exception: + traceback.print_exc() + sys.exit(1) + + +def create_parser(): + parser = argparse.ArgumentParser(description="Interact with taskgraph") + subparsers = parser.add_subparsers() + for _, (func, args, kwargs, defaults) in commands.items(): + subparser = subparsers.add_parser(*args, **kwargs) + for arg in func.args: + subparser.add_argument(*arg[0], **arg[1]) + subparser.set_defaults(command=func, **defaults) + return parser + + +def setup_logging(): + logging.basicConfig( + format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO + ) + + +def main(args=sys.argv[1:]): + setup_logging() + parser = create_parser() + args = parser.parse_args(args) + try: + args.command(vars(args)) + except Exception: + traceback.print_exc() + sys.exit(1) |