1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# Text progress bar library, like curl or scp.
import math
import sys
from datetime import datetime, timedelta
if sys.platform.startswith("win"):
from .terminal_win import Terminal
else:
from .terminal_unix import Terminal
class NullProgressBar(object):
def update(self, current, data):
pass
def poke(self):
pass
def finish(self, complete=True):
pass
def beginline(self):
pass
def message(self, msg):
sys.stdout.write(msg + "\n")
@staticmethod
def update_granularity():
return timedelta.max
class ProgressBar(object):
def __init__(self, limit, fmt):
assert self.conservative_isatty()
self.prior = None
self.atLineStart = True
# [{str:str}] Describtion of how to lay out each field in the counters map.
self.counters_fmt = fmt
# int: The value of 'current' equal to 100%.
self.limit = limit
# int: max digits in limit
self.limit_digits = int(math.ceil(math.log10(self.limit)))
# datetime: The start time.
self.t0 = datetime.now()
# datetime: Optional, the last time update() ran.
self.last_update_time = None
# Compute the width of the counters and build the format string.
self.counters_width = 1 # [
for layout in self.counters_fmt:
self.counters_width += self.limit_digits
# | (or ']' for the last one)
self.counters_width += 1
self.barlen = 64 - self.counters_width
@staticmethod
def update_granularity():
return timedelta(seconds=0.1)
def update(self, current, data):
# Record prior for poke.
self.prior = (current, data)
self.atLineStart = False
# Build counters string.
sys.stdout.write("\r[")
for layout in self.counters_fmt:
Terminal.set_color(layout["color"])
sys.stdout.write(
("{:" + str(self.limit_digits) + "d}").format(data[layout["value"]])
)
Terminal.reset_color()
if layout != self.counters_fmt[-1]:
sys.stdout.write("|")
else:
sys.stdout.write("] ")
# Build the bar.
pct = int(100.0 * current / self.limit)
sys.stdout.write("{:3d}% ".format(pct))
barlen = int(1.0 * self.barlen * current / self.limit) - 1
bar = "=" * barlen + ">" + " " * (self.barlen - barlen - 1)
sys.stdout.write(bar + "|")
# Update the bar.
now = datetime.now()
dt = now - self.t0
dt = dt.seconds + dt.microseconds * 1e-6
sys.stdout.write("{:6.1f}s".format(dt))
Terminal.clear_right()
# Force redisplay, since we didn't write a \n.
sys.stdout.flush()
self.last_update_time = now
def poke(self):
if not self.prior:
return
if datetime.now() - self.last_update_time < self.update_granularity():
return
self.update(*self.prior)
def finish(self, complete=True):
if not self.prior:
sys.stdout.write(
"No test run... You can try adding"
" --run-slow-tests or --run-skipped to run more tests\n"
)
return
final_count = self.limit if complete else self.prior[0]
self.update(final_count, self.prior[1])
sys.stdout.write("\n")
def beginline(self):
if not self.atLineStart:
sys.stdout.write("\n")
self.atLineStart = True
def message(self, msg):
self.beginline()
sys.stdout.write(msg)
sys.stdout.write("\n")
@staticmethod
def conservative_isatty():
"""
Prefer erring on the side of caution and not using terminal commands if
the current output stream may be a file.
"""
return sys.stdout.isatty()
|