summaryrefslogtreecommitdiffstats
path: root/js/src/tests/lib/progressbar.py
blob: a37f74af19c85e02cc5529e0552a7975e9b1a7f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Text progress bar library, like curl or scp.

import math
import sys
from datetime import datetime, timedelta

if sys.platform.startswith("win"):
    from .terminal_win import Terminal
else:
    from .terminal_unix import Terminal


class NullProgressBar(object):
    def update(self, current, data):
        pass

    def poke(self):
        pass

    def finish(self, complete=True):
        pass

    def beginline(self):
        pass

    def message(self, msg):
        sys.stdout.write(msg + "\n")

    @staticmethod
    def update_granularity():
        return timedelta.max


class ProgressBar(object):
    def __init__(self, limit, fmt):
        assert self.conservative_isatty()

        self.prior = None
        self.atLineStart = True
        # [{str:str}] Describtion of how to lay out each field in the counters map.
        self.counters_fmt = fmt
        # int: The value of 'current' equal to 100%.
        self.limit = limit
        # int: max digits in limit
        self.limit_digits = int(math.ceil(math.log10(self.limit)))
        # datetime: The start time.
        self.t0 = datetime.now()
        # datetime: Optional, the last time update() ran.
        self.last_update_time = None

        # Compute the width of the counters and build the format string.
        self.counters_width = 1  # [
        for layout in self.counters_fmt:
            self.counters_width += self.limit_digits
            # | (or ']' for the last one)
            self.counters_width += 1

        self.barlen = 64 - self.counters_width

    @staticmethod
    def update_granularity():
        return timedelta(seconds=0.1)

    def update(self, current, data):
        # Record prior for poke.
        self.prior = (current, data)
        self.atLineStart = False

        # Build counters string.
        sys.stdout.write("\r[")
        for layout in self.counters_fmt:
            Terminal.set_color(layout["color"])
            sys.stdout.write(
                ("{:" + str(self.limit_digits) + "d}").format(data[layout["value"]])
            )
            Terminal.reset_color()
            if layout != self.counters_fmt[-1]:
                sys.stdout.write("|")
            else:
                sys.stdout.write("] ")

        # Build the bar.
        pct = int(100.0 * current / self.limit)
        sys.stdout.write("{:3d}% ".format(pct))

        barlen = int(1.0 * self.barlen * current / self.limit) - 1
        bar = "=" * barlen + ">" + " " * (self.barlen - barlen - 1)
        sys.stdout.write(bar + "|")

        # Update the bar.
        now = datetime.now()
        dt = now - self.t0
        dt = dt.seconds + dt.microseconds * 1e-6
        sys.stdout.write("{:6.1f}s".format(dt))
        Terminal.clear_right()

        # Force redisplay, since we didn't write a \n.
        sys.stdout.flush()

        self.last_update_time = now

    def poke(self):
        if not self.prior:
            return
        if datetime.now() - self.last_update_time < self.update_granularity():
            return
        self.update(*self.prior)

    def finish(self, complete=True):
        if not self.prior:
            sys.stdout.write(
                "No test run... You can try adding"
                " --run-slow-tests or --run-skipped to run more tests\n"
            )
            return
        final_count = self.limit if complete else self.prior[0]
        self.update(final_count, self.prior[1])
        sys.stdout.write("\n")

    def beginline(self):
        if not self.atLineStart:
            sys.stdout.write("\n")
            self.atLineStart = True

    def message(self, msg):
        self.beginline()
        sys.stdout.write(msg)
        sys.stdout.write("\n")

    @staticmethod
    def conservative_isatty():
        """
        Prefer erring on the side of caution and not using terminal commands if
        the current output stream may be a file.
        """
        return sys.stdout.isatty()