diff options
Diffstat (limited to 'taskcluster/taskgraph/generator.py')
-rw-r--r-- | taskcluster/taskgraph/generator.py | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/taskcluster/taskgraph/generator.py b/taskcluster/taskgraph/generator.py new file mode 100644 index 0000000000..3a5a53f5ce --- /dev/null +++ b/taskcluster/taskgraph/generator.py @@ -0,0 +1,492 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import, print_function, unicode_literals +import logging +import os +import copy +import attr +import six +from six import text_type, ensure_text + +from . import filter_tasks +from .graph import Graph +from .taskgraph import TaskGraph +from .task import Task +from .optimize import optimize_task_graph +from .morph import morph +from .parameters import Parameters +from .util.python_path import find_object +from .transforms.base import TransformSequence, TransformConfig +from .util.verify import ( + verify_docs, + verifications, +) +from .util.yaml import load_yaml +from .config import load_graph_config, GraphConfig + +logger = logging.getLogger(__name__) + + +class KindNotFound(Exception): + """ + Raised when trying to load kind from a directory without a kind.yml. + """ + + +@attr.s(frozen=True) +class Kind(object): + + name = attr.ib(type=text_type) + path = attr.ib(type=text_type) + config = attr.ib(type=dict) + graph_config = attr.ib(type=GraphConfig) + + def _get_loader(self): + try: + loader = self.config["loader"] + except KeyError: + raise KeyError("{!r} does not define `loader`".format(self.path)) + return find_object(loader) + + def load_tasks(self, parameters, loaded_tasks, write_artifacts): + loader = self._get_loader() + config = copy.deepcopy(self.config) + + kind_dependencies = config.get("kind-dependencies", []) + kind_dependencies_tasks = { + task.label: task for task in loaded_tasks if task.kind in kind_dependencies + } + + inputs = loader(self.name, self.path, config, parameters, loaded_tasks) + + transforms = TransformSequence() + for xform_path in config["transforms"]: + transform = find_object(xform_path) + transforms.add(transform) + + # perform the transformations on the loaded inputs + trans_config = TransformConfig( + self.name, + self.path, + config, + parameters, + kind_dependencies_tasks, + self.graph_config, + write_artifacts=write_artifacts, + ) + tasks = [ + Task( + self.name, + label=task_dict["label"], + description=task_dict["description"], + attributes=task_dict["attributes"], + task=task_dict["task"], + optimization=task_dict.get("optimization"), + dependencies=task_dict.get("dependencies"), + soft_dependencies=task_dict.get("soft-dependencies"), + if_dependencies=task_dict.get("if-dependencies"), + release_artifacts=task_dict.get("release-artifacts"), + ) + for task_dict in transforms(trans_config, inputs) + ] + return tasks + + @classmethod + def load(cls, root_dir, graph_config, kind_name): + path = os.path.join(root_dir, kind_name) + kind_yml = os.path.join(path, "kind.yml") + if not os.path.exists(kind_yml): + raise KindNotFound(kind_yml) + + logger.debug("loading kind `{}` from `{}`".format(kind_name, path)) + config = load_yaml(kind_yml) + + return cls(kind_name, path, config, graph_config) + + +class TaskGraphGenerator(object): + """ + The central controller for taskgraph. This handles all phases of graph + generation. The task is generated from all of the kinds defined in + subdirectories of the generator's root directory. + + Access to the results of this generation, as well as intermediate values at + various phases of generation, is available via properties. This encourages + the provision of all generation inputs at instance construction time. + """ + + # Task-graph generation is implemented as a Python generator that yields + # each "phase" of generation. This allows some mach subcommands to short- + # circuit generation of the entire graph by never completing the generator. + + def __init__( + self, + root_dir, + parameters, + decision_task_id="DECISION-TASK", + write_artifacts=False, + ): + """ + @param root_dir: root directory, with subdirectories for each kind + @param paramaters: parameters for this task-graph generation, or callable + taking a `GraphConfig` and returning parameters + @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]] + """ + if root_dir is None: + root_dir = "taskcluster/ci" + self.root_dir = ensure_text(root_dir) + self._parameters = parameters + self._decision_task_id = decision_task_id + self._write_artifacts = write_artifacts + + # start the generator + self._run = self._run() + self._run_results = {} + + @property + def parameters(self): + """ + The properties used for this graph. + + @type: Properties + """ + return self._run_until("parameters") + + @property + def full_task_set(self): + """ + The full task set: all tasks defined by any kind (a graph without edges) + + @type: TaskGraph + """ + return self._run_until("full_task_set") + + @property + def full_task_graph(self): + """ + The full task graph: the full task set, with edges representing + dependencies. + + @type: TaskGraph + """ + return self._run_until("full_task_graph") + + @property + def target_task_set(self): + """ + The set of targetted tasks (a graph without edges) + + @type: TaskGraph + """ + return self._run_until("target_task_set") + + @property + def target_task_graph(self): + """ + The set of targetted tasks and all of their dependencies + + @type: TaskGraph + """ + return self._run_until("target_task_graph") + + @property + def optimized_task_graph(self): + """ + The set of targetted tasks and all of their dependencies; tasks that + have been optimized out are either omitted or replaced with a Task + instance containing only a task_id. + + @type: TaskGraph + """ + return self._run_until("optimized_task_graph") + + @property + def label_to_taskid(self): + """ + A dictionary mapping task label to assigned taskId. This property helps + in interpreting `optimized_task_graph`. + + @type: dictionary + """ + return self._run_until("label_to_taskid") + + @property + def morphed_task_graph(self): + """ + The optimized task graph, with any subsequent morphs applied. This graph + will have the same meaning as the optimized task graph, but be in a form + more palatable to TaskCluster. + + @type: TaskGraph + """ + return self._run_until("morphed_task_graph") + + @property + def graph_config(self): + """ + The configuration for this graph. + + @type: TaskGraph + """ + return self._run_until("graph_config") + + def _load_kinds(self, graph_config, target_kind=None): + if target_kind: + # docker-image is an implicit dependency that never appears in + # kind-dependencies. + queue = [target_kind, "docker-image"] + seen_kinds = set() + while queue: + kind_name = queue.pop() + if kind_name in seen_kinds: + continue + seen_kinds.add(kind_name) + kind = Kind.load(self.root_dir, graph_config, kind_name) + yield kind + queue.extend(kind.config.get("kind-dependencies", [])) + else: + for kind_name in os.listdir(self.root_dir): + try: + yield Kind.load(self.root_dir, graph_config, kind_name) + except KindNotFound: + continue + + def _run(self): + logger.info("Loading graph configuration.") + graph_config = load_graph_config(self.root_dir) + + yield ("graph_config", graph_config) + + graph_config.register() + + if callable(self._parameters): + parameters = self._parameters(graph_config) + else: + parameters = self._parameters + self.verify_parameters(parameters) + + filters = parameters.get("filters", []) + # Always add legacy target tasks method until we deprecate that API. + if "target_tasks_method" not in filters: + filters.insert(0, "target_tasks_method") + filters = [filter_tasks.filter_task_functions[f] for f in filters] + + yield ("parameters", parameters) + + logger.info("Loading kinds") + # put the kinds into a graph and sort topologically so that kinds are loaded + # in post-order + if parameters.get("target-kind"): + target_kind = parameters["target-kind"] + logger.info( + "Limiting kinds to {target_kind} and dependencies".format( + target_kind=target_kind + ) + ) + kinds = { + kind.name: kind + for kind in self._load_kinds(graph_config, parameters.get("target-kind")) + } + self.verify_kinds(kinds) + + edges = set() + for kind in six.itervalues(kinds): + for dep in kind.config.get("kind-dependencies", []): + edges.add((kind.name, dep, "kind-dependency")) + kind_graph = Graph(set(kinds), edges) + + if parameters.get("target-kind"): + kind_graph = kind_graph.transitive_closure({target_kind, "docker-image"}) + + logger.info("Generating full task set") + all_tasks = {} + for kind_name in kind_graph.visit_postorder(): + logger.debug("Loading tasks for kind {}".format(kind_name)) + kind = kinds[kind_name] + try: + new_tasks = kind.load_tasks( + parameters, + list(all_tasks.values()), + self._write_artifacts, + ) + except Exception: + logger.exception("Error loading tasks for kind {}:".format(kind_name)) + raise + for task in new_tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + logger.info( + "Generated {} tasks for kind {}".format(len(new_tasks), kind_name) + ) + full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set())) + self.verify_attributes(all_tasks) + self.verify_run_using() + yield verifications("full_task_set", full_task_set, graph_config, parameters) + + logger.info("Generating full task graph") + edges = set() + for t in full_task_set: + for depname, dep in six.iteritems(t.dependencies): + edges.add((t.label, dep, depname)) + + full_task_graph = TaskGraph(all_tasks, Graph(full_task_set.graph.nodes, edges)) + logger.info( + "Full task graph contains %d tasks and %d dependencies" + % (len(full_task_set.graph.nodes), len(edges)) + ) + yield verifications( + "full_task_graph", full_task_graph, graph_config, parameters + ) + + logger.info("Generating target task set") + target_task_set = TaskGraph( + dict(all_tasks), Graph(set(all_tasks.keys()), set()) + ) + for fltr in filters: + old_len = len(target_task_set.graph.nodes) + target_tasks = set(fltr(target_task_set, parameters, graph_config)) + target_task_set = TaskGraph( + {l: all_tasks[l] for l in target_tasks}, Graph(target_tasks, set()) + ) + logger.info( + "Filter %s pruned %d tasks (%d remain)" + % (fltr.__name__, old_len - len(target_tasks), len(target_tasks)) + ) + + yield verifications( + "target_task_set", target_task_set, graph_config, parameters + ) + + logger.info("Generating target task graph") + # include all docker-image build tasks here, in case they are needed for a graph morph + docker_image_tasks = set( + t.label + for t in six.itervalues(full_task_graph.tasks) + if t.attributes["kind"] == "docker-image" + ) + # include all tasks with `always_target` set + if parameters["tasks_for"] == "hg-push": + always_target_tasks = set( + t.label + for t in six.itervalues(full_task_graph.tasks) + if t.attributes.get("always_target") + ) + else: + always_target_tasks = set() + logger.info( + "Adding %d tasks with `always_target` attribute" + % (len(always_target_tasks) - len(always_target_tasks & target_tasks)) + ) + requested_tasks = target_tasks | docker_image_tasks | always_target_tasks + target_graph = full_task_graph.graph.transitive_closure(requested_tasks) + target_task_graph = TaskGraph( + {l: all_tasks[l] for l in target_graph.nodes}, target_graph + ) + yield verifications( + "target_task_graph", target_task_graph, graph_config, parameters + ) + + logger.info("Generating optimized task graph") + existing_tasks = parameters.get("existing_tasks") + do_not_optimize = set(parameters.get("do_not_optimize", [])) + if not parameters.get("optimize_target_tasks", True): + do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize) + + # this is used for testing experimental optimization strategies + strategies = os.environ.get( + "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies") + ) + if strategies: + strategies = find_object(strategies) + + optimized_task_graph, label_to_taskid = optimize_task_graph( + target_task_graph, + requested_tasks, + parameters, + do_not_optimize, + self._decision_task_id, + existing_tasks=existing_tasks, + strategy_override=strategies, + ) + + yield verifications( + "optimized_task_graph", optimized_task_graph, graph_config, parameters + ) + + morphed_task_graph, label_to_taskid = morph( + optimized_task_graph, + label_to_taskid, + parameters, + graph_config, + self._decision_task_id, + ) + + yield "label_to_taskid", label_to_taskid + yield verifications( + "morphed_task_graph", morphed_task_graph, graph_config, parameters + ) + + def _run_until(self, name): + while name not in self._run_results: + try: + k, v = next(self._run) + except StopIteration: + raise AttributeError("No such run result {}".format(name)) + self._run_results[k] = v + return self._run_results[name] + + def verify_parameters(self, parameters): + if not parameters.strict: + return + + parameters_dict = dict(**parameters) + verify_docs( + filename="parameters.rst", + identifiers=list(parameters_dict), + appearing_as="inline-literal", + ) + + def verify_kinds(self, kinds): + verify_docs( + filename="kinds.rst", identifiers=kinds.keys(), appearing_as="heading" + ) + + def verify_attributes(self, all_tasks): + attribute_set = set() + for label, task in six.iteritems(all_tasks): + attribute_set.update(task.attributes.keys()) + verify_docs( + filename="attributes.rst", + identifiers=list(attribute_set), + appearing_as="heading", + ) + + def verify_run_using(self): + from .transforms.job import registry + + verify_docs( + filename="transforms.rst", + identifiers=registry.keys(), + appearing_as="inline-literal", + ) + + +def load_tasks_for_kind(parameters, kind, root_dir=None): + """ + Get all the tasks of a given kind. + + This function is designed to be called from outside of taskgraph. + """ + # make parameters read-write + parameters = dict(parameters) + parameters["target-kind"] = kind + parameters = Parameters(strict=False, **parameters) + tgg = TaskGraphGenerator(root_dir=root_dir, parameters=parameters) + return { + task.task["metadata"]["name"]: task + for task in tgg.full_task_set + if task.kind == kind + } |