1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
import functools
from threading import Lock
from .utils import supports_terminal_sequences, write_string
CONTROL_SEQUENCES = {
'DOWN': '\n',
'UP': '\033[A',
'ERASE_LINE': '\033[K',
'RESET': '\033[0m',
}
_COLORS = {
'BLACK': '0',
'RED': '1',
'GREEN': '2',
'YELLOW': '3',
'BLUE': '4',
'PURPLE': '5',
'CYAN': '6',
'WHITE': '7',
}
_TEXT_STYLES = {
'NORMAL': '0',
'BOLD': '1',
'UNDERLINED': '4',
}
def format_text(text, f):
'''
@param f String representation of formatting to apply in the form:
[style] [light] font_color [on [light] bg_color]
E.g. "red", "bold green on light blue"
'''
f = f.upper()
tokens = f.strip().split()
bg_color = ''
if 'ON' in tokens:
if tokens[-1] == 'ON':
raise SyntaxError(f'Empty background format specified in {f!r}')
if tokens[-1] not in _COLORS:
raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
bg_color = f'4{_COLORS[tokens.pop()]}'
if tokens[-1] == 'LIGHT':
bg_color = f'0;10{bg_color[1:]}'
tokens.pop()
if tokens[-1] != 'ON':
raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
bg_color = f'\033[{bg_color}m'
tokens.pop()
if not tokens:
fg_color = ''
elif tokens[-1] not in _COLORS:
raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
else:
fg_color = f'3{_COLORS[tokens.pop()]}'
if tokens and tokens[-1] == 'LIGHT':
fg_color = f'9{fg_color[1:]}'
tokens.pop()
fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
if tokens:
raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
if fg_color or bg_color:
text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
else:
return text
class MultilinePrinterBase:
def __init__(self, stream=None, lines=1):
self.stream = stream
self.maximum = lines - 1
self._HAVE_FULLCAP = supports_terminal_sequences(stream)
def __enter__(self):
return self
def __exit__(self, *args):
self.end()
def print_at_line(self, text, pos):
pass
def end(self):
pass
def _add_line_number(self, text, line):
if self.maximum:
return f'{line + 1}: {text}'
return text
def write(self, *text):
write_string(''.join(text), self.stream)
class QuietMultilinePrinter(MultilinePrinterBase):
pass
class MultilineLogger(MultilinePrinterBase):
def write(self, *text):
self.stream.debug(''.join(text))
def print_at_line(self, text, pos):
# stream is the logger object, not an actual stream
self.write(self._add_line_number(text, pos))
class BreaklineStatusPrinter(MultilinePrinterBase):
def print_at_line(self, text, pos):
self.write(self._add_line_number(text, pos), '\n')
class MultilinePrinter(MultilinePrinterBase):
def __init__(self, stream=None, lines=1, preserve_output=True):
super().__init__(stream, lines)
self.preserve_output = preserve_output
self._lastline = self._lastlength = 0
self._movelock = Lock()
def lock(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self._movelock:
return func(self, *args, **kwargs)
return wrapper
def _move_cursor(self, dest):
current = min(self._lastline, self.maximum)
yield '\r'
distance = dest - current
if distance < 0:
yield CONTROL_SEQUENCES['UP'] * -distance
elif distance > 0:
yield CONTROL_SEQUENCES['DOWN'] * distance
self._lastline = dest
@lock
def print_at_line(self, text, pos):
if self._HAVE_FULLCAP:
self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
return
text = self._add_line_number(text, pos)
textlen = len(text)
if self._lastline == pos:
# move cursor at the start of progress when writing to same line
prefix = '\r'
if self._lastlength > textlen:
text += ' ' * (self._lastlength - textlen)
self._lastlength = textlen
else:
# otherwise, break the line
prefix = '\n'
self._lastlength = textlen
self.write(prefix, text)
self._lastline = pos
@lock
def end(self):
# move cursor to the end of the last line, and write line break
# so that other to_screen calls can precede
text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
if self.preserve_output:
self.write(*text, '\n')
return
if self._HAVE_FULLCAP:
self.write(
*text, CONTROL_SEQUENCES['ERASE_LINE'],
f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
else:
self.write('\r', ' ' * self._lastlength, '\r')
|