summaryrefslogtreecommitdiffstats
path: root/yt_dlp/minicurses.py
blob: 7db02cb59c64b9656460b505554dc571aee6b1dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import functools
from threading import Lock

from .utils import supports_terminal_sequences, write_string

CONTROL_SEQUENCES = {
    'DOWN': '\n',
    'UP': '\033[A',
    'ERASE_LINE': '\033[K',
    'RESET': '\033[0m',
}


_COLORS = {
    'BLACK': '0',
    'RED': '1',
    'GREEN': '2',
    'YELLOW': '3',
    'BLUE': '4',
    'PURPLE': '5',
    'CYAN': '6',
    'WHITE': '7',
}


_TEXT_STYLES = {
    'NORMAL': '0',
    'BOLD': '1',
    'UNDERLINED': '4',
}


def format_text(text, f):
    '''
    @param f    String representation of formatting to apply in the form:
                [style] [light] font_color [on [light] bg_color]
                E.g. "red", "bold green on light blue"
    '''
    f = f.upper()
    tokens = f.strip().split()

    bg_color = ''
    if 'ON' in tokens:
        if tokens[-1] == 'ON':
            raise SyntaxError(f'Empty background format specified in {f!r}')
        if tokens[-1] not in _COLORS:
            raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
        bg_color = f'4{_COLORS[tokens.pop()]}'
        if tokens[-1] == 'LIGHT':
            bg_color = f'0;10{bg_color[1:]}'
            tokens.pop()
        if tokens[-1] != 'ON':
            raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
        bg_color = f'\033[{bg_color}m'
        tokens.pop()

    if not tokens:
        fg_color = ''
    elif tokens[-1] not in _COLORS:
        raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
    else:
        fg_color = f'3{_COLORS[tokens.pop()]}'
        if tokens and tokens[-1] == 'LIGHT':
            fg_color = f'9{fg_color[1:]}'
            tokens.pop()
        fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
        fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
        if tokens:
            raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')

    if fg_color or bg_color:
        text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
        return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
    else:
        return text


class MultilinePrinterBase:
    def __init__(self, stream=None, lines=1):
        self.stream = stream
        self.maximum = lines - 1
        self._HAVE_FULLCAP = supports_terminal_sequences(stream)

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.end()

    def print_at_line(self, text, pos):
        pass

    def end(self):
        pass

    def _add_line_number(self, text, line):
        if self.maximum:
            return f'{line + 1}: {text}'
        return text

    def write(self, *text):
        write_string(''.join(text), self.stream)


class QuietMultilinePrinter(MultilinePrinterBase):
    pass


class MultilineLogger(MultilinePrinterBase):
    def write(self, *text):
        self.stream.debug(''.join(text))

    def print_at_line(self, text, pos):
        # stream is the logger object, not an actual stream
        self.write(self._add_line_number(text, pos))


class BreaklineStatusPrinter(MultilinePrinterBase):
    def print_at_line(self, text, pos):
        self.write(self._add_line_number(text, pos), '\n')


class MultilinePrinter(MultilinePrinterBase):
    def __init__(self, stream=None, lines=1, preserve_output=True):
        super().__init__(stream, lines)
        self.preserve_output = preserve_output
        self._lastline = self._lastlength = 0
        self._movelock = Lock()

    def lock(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            with self._movelock:
                return func(self, *args, **kwargs)
        return wrapper

    def _move_cursor(self, dest):
        current = min(self._lastline, self.maximum)
        yield '\r'
        distance = dest - current
        if distance < 0:
            yield CONTROL_SEQUENCES['UP'] * -distance
        elif distance > 0:
            yield CONTROL_SEQUENCES['DOWN'] * distance
        self._lastline = dest

    @lock
    def print_at_line(self, text, pos):
        if self._HAVE_FULLCAP:
            self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
            return

        text = self._add_line_number(text, pos)
        textlen = len(text)
        if self._lastline == pos:
            # move cursor at the start of progress when writing to same line
            prefix = '\r'
            if self._lastlength > textlen:
                text += ' ' * (self._lastlength - textlen)
            self._lastlength = textlen
        else:
            # otherwise, break the line
            prefix = '\n'
            self._lastlength = textlen
        self.write(prefix, text)
        self._lastline = pos

    @lock
    def end(self):
        # move cursor to the end of the last line, and write line break
        # so that other to_screen calls can precede
        text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
        if self.preserve_output:
            self.write(*text, '\n')
            return

        if self._HAVE_FULLCAP:
            self.write(
                *text, CONTROL_SEQUENCES['ERASE_LINE'],
                f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
        else:
            self.write('\r', ' ' * self._lastlength, '\r')