diff options
Diffstat (limited to 'third_party/python/taskcluster_taskgraph/taskgraph/optimize/base.py')
-rw-r--r-- | third_party/python/taskcluster_taskgraph/taskgraph/optimize/base.py | 551 |
1 files changed, 551 insertions, 0 deletions
diff --git a/third_party/python/taskcluster_taskgraph/taskgraph/optimize/base.py b/third_party/python/taskcluster_taskgraph/taskgraph/optimize/base.py new file mode 100644 index 0000000000..367b94e1de --- /dev/null +++ b/third_party/python/taskcluster_taskgraph/taskgraph/optimize/base.py @@ -0,0 +1,551 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +""" +The objective of optimization is to remove as many tasks from the graph as +possible, as efficiently as possible, thereby delivering useful results as +quickly as possible. For example, ideally if only a test script is modified in +a push, then the resulting graph contains only the corresponding test suite +task. + +See ``taskcluster/docs/optimization.rst`` for more information. +""" + +import datetime +import logging +from abc import ABCMeta, abstractmethod, abstractproperty +from collections import defaultdict + +from slugid import nice as slugid + +from taskgraph.graph import Graph +from taskgraph.taskgraph import TaskGraph +from taskgraph.util.parameterization import resolve_task_references, resolve_timestamps +from taskgraph.util.python_path import import_sibling_modules + +logger = logging.getLogger(__name__) +registry = {} + + +def register_strategy(name, args=()): + def wrap(cls): + if name not in registry: + registry[name] = cls(*args) + if not hasattr(registry[name], "description"): + registry[name].description = name + return cls + + return wrap + + +def optimize_task_graph( + target_task_graph, + requested_tasks, + params, + do_not_optimize, + decision_task_id, + existing_tasks=None, + strategy_override=None, +): + """ + Perform task optimization, returning a taskgraph and a map from label to + assigned taskId, including replacement tasks. + """ + label_to_taskid = {} + if not existing_tasks: + existing_tasks = {} + + # instantiate the strategies for this optimization process + strategies = registry.copy() + if strategy_override: + strategies.update(strategy_override) + + optimizations = _get_optimizations(target_task_graph, strategies) + + removed_tasks = remove_tasks( + target_task_graph=target_task_graph, + requested_tasks=requested_tasks, + optimizations=optimizations, + params=params, + do_not_optimize=do_not_optimize, + ) + + replaced_tasks = replace_tasks( + target_task_graph=target_task_graph, + optimizations=optimizations, + params=params, + do_not_optimize=do_not_optimize, + label_to_taskid=label_to_taskid, + existing_tasks=existing_tasks, + removed_tasks=removed_tasks, + ) + + return ( + get_subgraph( + target_task_graph, + removed_tasks, + replaced_tasks, + label_to_taskid, + decision_task_id, + ), + label_to_taskid, + ) + + +def _get_optimizations(target_task_graph, strategies): + def optimizations(label): + task = target_task_graph.tasks[label] + if task.optimization: + opt_by, arg = list(task.optimization.items())[0] + strategy = strategies[opt_by] + if hasattr(strategy, "description"): + opt_by += f" ({strategy.description})" + return (opt_by, strategy, arg) + else: + return ("never", strategies["never"], None) + + return optimizations + + +def _log_optimization(verb, opt_counts, opt_reasons=None): + if opt_reasons: + message = "optimize: {label} {action} because of {reason}" + for label, (action, reason) in opt_reasons.items(): + logger.debug(message.format(label=label, action=action, reason=reason)) + + if opt_counts: + logger.info( + f"{verb.title()} " + + ", ".join(f"{c} tasks by {b}" for b, c in sorted(opt_counts.items())) + + " during optimization." + ) + else: + logger.info(f"No tasks {verb} during optimization") + + +def remove_tasks( + target_task_graph, requested_tasks, params, optimizations, do_not_optimize +): + """ + Implement the "Removing Tasks" phase, returning a set of task labels of all removed tasks. + """ + opt_counts = defaultdict(int) + opt_reasons = {} + removed = set() + dependents_of = target_task_graph.graph.reverse_links_dict() + tasks = target_task_graph.tasks + prune_candidates = set() + + # Traverse graph so dependents (child nodes) are guaranteed to be processed + # first. + for label in target_task_graph.graph.visit_preorder(): + # Dependents that can be pruned away (shouldn't cause this task to run). + # Only dependents that either: + # A) Explicitly reference this task in their 'if_dependencies' list, or + # B) Don't have an 'if_dependencies' attribute (i.e are in 'prune_candidates' + # because they should be removed but have prune_deps themselves) + # should be considered. + prune_deps = { + l + for l in dependents_of[label] + if l in prune_candidates + if not tasks[l].if_dependencies or label in tasks[l].if_dependencies + } + + def _keep(reason): + """Mark a task as being kept in the graph. Also recursively removes + any dependents from `prune_candidates`, assuming they should be + kept because of this task. + """ + opt_reasons[label] = ("kept", reason) + + # Removes dependents that were in 'prune_candidates' from a task + # that ended up being kept (and therefore the dependents should + # also be kept). + queue = list(prune_deps) + while queue: + l = queue.pop() + + # If l is a prune_dep of multiple tasks it could be queued up + # multiple times. Guard against it being already removed. + if l not in prune_candidates: + continue + + # If a task doesn't set 'if_dependencies' itself (rather it was + # added to 'prune_candidates' due to one of its depenendents), + # then we shouldn't remove it. + if not tasks[l].if_dependencies: + continue + + prune_candidates.remove(l) + queue.extend([r for r in dependents_of[l] if r in prune_candidates]) + + def _remove(reason): + """Potentially mark a task as being removed from the graph. If the + task has dependents that can be pruned, add this task to + `prune_candidates` rather than removing it. + """ + if prune_deps: + # If there are prune_deps, unsure if we can remove this task yet. + prune_candidates.add(label) + else: + opt_reasons[label] = ("removed", reason) + opt_counts[reason] += 1 + removed.add(label) + + # if we're not allowed to optimize, that's easy.. + if label in do_not_optimize: + _keep("do not optimize") + continue + + # If there are remaining tasks depending on this one, do not remove. + if any( + l for l in dependents_of[label] if l not in removed and l not in prune_deps + ): + _keep("dependent tasks") + continue + + # Some tasks in the task graph only exist because they were required + # by a task that has just been optimized away. They can now be removed. + if label not in requested_tasks: + _remove("dependents optimized") + continue + + # Call the optimization strategy. + task = tasks[label] + opt_by, opt, arg = optimizations(label) + if opt.should_remove_task(task, params, arg): + _remove(opt_by) + continue + + # Some tasks should only run if their dependency was also run. Since we + # haven't processed dependencies yet, we add them to a list of + # candidate tasks for pruning. + if task.if_dependencies: + opt_reasons[label] = ("kept", opt_by) + prune_candidates.add(label) + else: + _keep(opt_by) + + if prune_candidates: + reason = "if-dependencies pruning" + for label in prune_candidates: + # There's an edge case where a triangle graph can cause a + # dependency to stay in 'prune_candidates' when the dependent + # remains. Do a final check to ensure we don't create any bad + # edges. + dependents = any( + d + for d in dependents_of[label] + if d not in prune_candidates + if d not in removed + ) + if dependents: + opt_reasons[label] = ("kept", "dependent tasks") + continue + removed.add(label) + opt_counts[reason] += 1 + opt_reasons[label] = ("removed", reason) + + _log_optimization("removed", opt_counts, opt_reasons) + return removed + + +def replace_tasks( + target_task_graph, + params, + optimizations, + do_not_optimize, + label_to_taskid, + removed_tasks, + existing_tasks, +): + """ + Implement the "Replacing Tasks" phase, returning a set of task labels of + all replaced tasks. The replacement taskIds are added to label_to_taskid as + a side-effect. + """ + opt_counts = defaultdict(int) + replaced = set() + dependents_of = target_task_graph.graph.reverse_links_dict() + dependencies_of = target_task_graph.graph.links_dict() + + for label in target_task_graph.graph.visit_postorder(): + # if we're not allowed to optimize, that's easy.. + if label in do_not_optimize: + continue + + # if this task depends on un-replaced, un-removed tasks, do not replace + if any( + l not in replaced and l not in removed_tasks for l in dependencies_of[label] + ): + continue + + # if the task already exists, that's an easy replacement + repl = existing_tasks.get(label) + if repl: + label_to_taskid[label] = repl + replaced.add(label) + opt_counts["existing_tasks"] += 1 + continue + + # call the optimization strategy + task = target_task_graph.tasks[label] + opt_by, opt, arg = optimizations(label) + + # compute latest deadline of dependents (if any) + dependents = [target_task_graph.tasks[l] for l in dependents_of[label]] + deadline = None + if dependents: + now = datetime.datetime.utcnow() + deadline = max( + resolve_timestamps(now, task.task["deadline"]) for task in dependents + ) + repl = opt.should_replace_task(task, params, deadline, arg) + if repl: + if repl is True: + # True means remove this task; get_subgraph will catch any + # problems with removed tasks being depended on + removed_tasks.add(label) + else: + label_to_taskid[label] = repl + replaced.add(label) + opt_counts[opt_by] += 1 + continue + + _log_optimization("replaced", opt_counts) + return replaced + + +def get_subgraph( + target_task_graph, + removed_tasks, + replaced_tasks, + label_to_taskid, + decision_task_id, +): + """ + Return the subgraph of target_task_graph consisting only of + non-optimized tasks and edges between them. + + To avoid losing track of taskIds for tasks optimized away, this method + simultaneously substitutes real taskIds for task labels in the graph, and + populates each task definition's `dependencies` key with the appropriate + taskIds. Task references are resolved in the process. + """ + + # check for any dependency edges from included to removed tasks + bad_edges = [ + (l, r, n) + for l, r, n in target_task_graph.graph.edges + if l not in removed_tasks and r in removed_tasks + ] + if bad_edges: + probs = ", ".join( + f"{l} depends on {r} as {n} but it has been removed" + for l, r, n in bad_edges + ) + raise Exception("Optimization error: " + probs) + + # fill in label_to_taskid for anything not removed or replaced + assert replaced_tasks <= set(label_to_taskid) + for label in sorted( + target_task_graph.graph.nodes - removed_tasks - set(label_to_taskid) + ): + label_to_taskid[label] = slugid() + + # resolve labels to taskIds and populate task['dependencies'] + tasks_by_taskid = {} + named_links_dict = target_task_graph.graph.named_links_dict() + omit = removed_tasks | replaced_tasks + for label, task in target_task_graph.tasks.items(): + if label in omit: + continue + task.task_id = label_to_taskid[label] + named_task_dependencies = { + name: label_to_taskid[label] + for name, label in named_links_dict.get(label, {}).items() + } + + # Add remaining soft dependencies + if task.soft_dependencies: + named_task_dependencies.update( + { + label: label_to_taskid[label] + for label in task.soft_dependencies + if label in label_to_taskid and label not in omit + } + ) + + task.task = resolve_task_references( + task.label, + task.task, + task_id=task.task_id, + decision_task_id=decision_task_id, + dependencies=named_task_dependencies, + ) + deps = task.task.setdefault("dependencies", []) + deps.extend(sorted(named_task_dependencies.values())) + tasks_by_taskid[task.task_id] = task + + # resolve edges to taskIds + edges_by_taskid = ( + (label_to_taskid.get(left), label_to_taskid.get(right), name) + for (left, right, name) in target_task_graph.graph.edges + ) + # ..and drop edges that are no longer entirely in the task graph + # (note that this omits edges to replaced tasks, but they are still in task.dependnecies) + edges_by_taskid = { + (left, right, name) + for (left, right, name) in edges_by_taskid + if left in tasks_by_taskid and right in tasks_by_taskid + } + + return TaskGraph(tasks_by_taskid, Graph(set(tasks_by_taskid), edges_by_taskid)) + + +@register_strategy("never") +class OptimizationStrategy: + def should_remove_task(self, task, params, arg): + """Determine whether to optimize this task by removing it. Returns + True to remove.""" + return False + + def should_replace_task(self, task, params, deadline, arg): + """Determine whether to optimize this task by replacing it. Returns a + taskId to replace this task, True to replace with nothing, or False to + keep the task.""" + return False + + +@register_strategy("always") +class Always(OptimizationStrategy): + def should_remove_task(self, task, params, arg): + return True + + +class CompositeStrategy(OptimizationStrategy, metaclass=ABCMeta): + def __init__(self, *substrategies, **kwargs): + self.substrategies = [] + missing = set() + for sub in substrategies: + if isinstance(sub, str): + if sub not in registry.keys(): + missing.add(sub) + continue + sub = registry[sub] + + self.substrategies.append(sub) + + if missing: + raise TypeError( + "substrategies aren't registered: {}".format( + ", ".join(sorted(missing)) + ) + ) + + self.split_args = kwargs.pop("split_args", None) + if not self.split_args: + self.split_args = lambda arg, substrategies: [arg] * len(substrategies) + if kwargs: + raise TypeError("unexpected keyword args") + + @abstractproperty + def description(self): + """A textual description of the combined substrategies.""" + + @abstractmethod + def reduce(self, results): + """Given all substrategy results as a generator, return the overall + result.""" + + def _generate_results(self, fname, *args): + *passthru, arg = args + for sub, arg in zip( + self.substrategies, self.split_args(arg, self.substrategies) + ): + yield getattr(sub, fname)(*passthru, arg) + + def should_remove_task(self, *args): + results = self._generate_results("should_remove_task", *args) + return self.reduce(results) + + def should_replace_task(self, *args): + results = self._generate_results("should_replace_task", *args) + return self.reduce(results) + + +class Any(CompositeStrategy): + """Given one or more optimization strategies, remove or replace a task if any of them + says to. + + Replacement will use the value returned by the first strategy that says to replace. + """ + + @property + def description(self): + return "-or-".join([s.description for s in self.substrategies]) + + @classmethod + def reduce(cls, results): + for rv in results: + if rv: + return rv + return False + + +class All(CompositeStrategy): + """Given one or more optimization strategies, remove or replace a task if all of them + says to. + + Replacement will use the value returned by the first strategy passed in. + Note the values used for replacement need not be the same, as long as they + all say to replace. + """ + + @property + def description(self): + return "-and-".join([s.description for s in self.substrategies]) + + @classmethod + def reduce(cls, results): + for rv in results: + if not rv: + return rv + return True + + +class Alias(CompositeStrategy): + """Provides an alias to an existing strategy. + + This can be useful to swap strategies in and out without needing to modify + the task transforms. + """ + + def __init__(self, strategy): + super().__init__(strategy) + + @property + def description(self): + return self.substrategies[0].description + + def reduce(self, results): + return next(results) + + +class Not(CompositeStrategy): + """Given a strategy, returns the opposite.""" + + def __init__(self, strategy): + super().__init__(strategy) + + @property + def description(self): + return "not-" + self.substrategies[0].description + + def reduce(self, results): + return not next(results) + + +# Trigger registration in sibling modules. +import_sibling_modules() |