1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function
import os
import json
from datetime import datetime, timedelta
TASK_DURATION_CACHE = "task_duration_history.json"
GRAPH_QUANTILE_CACHE = "graph_quantile_cache.csv"
TASK_DURATION_TAG_FILE = "task_duration_tag.json"
def find_all_dependencies(graph, tasklist):
all_dependencies = dict()
def find_dependencies(task):
dependencies = set()
if task in all_dependencies:
return all_dependencies[task]
if task not in graph:
# Don't add tasks (and so durations) for
# things optimised out.
return dependencies
dependencies.add(task)
for dep in graph.get(task, list()):
all_dependencies[dep] = find_dependencies(dep)
dependencies.update(all_dependencies[dep])
return dependencies
full_deps = set()
for task in tasklist:
full_deps.update(find_dependencies(task))
# Since these have been asked for, they're not inherited dependencies.
return sorted(full_deps - set(tasklist))
def find_longest_path(graph, tasklist, duration_data):
dep_durations = dict()
def find_dependency_durations(task):
if task in dep_durations:
return dep_durations[task]
durations = [find_dependency_durations(dep) for dep in graph.get(task, list())]
durations.append(0.0)
md = max(durations) + duration_data.get(task, 0.0)
dep_durations[task] = md
return md
longest_paths = [find_dependency_durations(task) for task in tasklist]
# Default in case there are no tasks
if longest_paths:
return max(longest_paths)
else:
return 0
def determine_quantile(quantiles_file, duration):
duration = duration.total_seconds()
with open(quantiles_file) as f:
f.readline() # skip header
boundaries = [float(l.strip()) for l in f.readlines()]
boundaries.sort()
for i, v in enumerate(boundaries):
if duration < v:
break
# In case we weren't given 100 elements
return int(100 * i / len(boundaries))
def task_duration_data(cache_dir):
with open(os.path.join(cache_dir, TASK_DURATION_CACHE)) as f:
return json.load(f)
def duration_summary(graph_cache_file, tasklist, cache_dir):
durations = task_duration_data(cache_dir)
graph = dict()
if graph_cache_file:
with open(graph_cache_file) as f:
graph = json.load(f)
dependencies = find_all_dependencies(graph, tasklist)
longest_path = find_longest_path(graph, tasklist, durations)
dependency_duration = 0.0
for task in dependencies:
dependency_duration += int(durations.get(task, 0.0))
total_requested_duration = 0.0
for task in tasklist:
duration = int(durations.get(task, 0.0))
total_requested_duration += duration
output = dict()
total_requested_duration = timedelta(seconds=total_requested_duration)
total_dependency_duration = timedelta(seconds=dependency_duration)
output["selected_duration"] = total_requested_duration
output["dependency_duration"] = total_dependency_duration
output["dependency_count"] = len(dependencies)
output["selected_count"] = len(tasklist)
quantile = None
graph_quantile_cache = os.path.join(cache_dir, GRAPH_QUANTILE_CACHE)
if os.path.isfile(graph_quantile_cache):
quantile = 100 - determine_quantile(
graph_quantile_cache, total_dependency_duration + total_requested_duration
)
if quantile:
output["quantile"] = quantile
output["wall_duration_seconds"] = timedelta(seconds=int(longest_path))
output["eta_datetime"] = datetime.now() + timedelta(seconds=longest_path)
output["task_durations"] = {
task: int(durations.get(task, 0.0)) for task in tasklist
}
return output
|