summaryrefslogtreecommitdiffstats
path: root/src/debputy/commands/debputy_cmd/output.py
blob: 131338aeeb999d0f3b743016c17f1e66cd70f191 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import argparse
import contextlib
import itertools
import os
import re
import shutil
import subprocess
import sys
from typing import (
    Union,
    Sequence,
    Iterable,
    Iterator,
    IO,
    Mapping,
    Tuple,
    Optional,
    Any,
)

from debputy.util import assume_not_none

try:
    import colored
except ImportError:
    colored = None


def _pager() -> Optional[str]:
    pager = os.environ.get("DEBPUTY_PAGER")
    if pager is None:
        pager = os.environ.get("PAGER")
    if pager is None and shutil.which("less") is not None:
        pager = "less"
    return pager


URL_START = "\033]8;;"
URL_END = "\033]8;;\a"
MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]")

_SUPPORTED_COLORS = {
    "black",
    "red",
    "green",
    "yellow",
    "blue",
    "magenta",
    "cyan",
    "white",
}
_SUPPORTED_STYLES = {"none", "bold"}


class OutputStylingBase:
    def __init__(
        self,
        stream: IO[str],
        output_format: str,
        *,
        optimize_for_screen_reader: bool = False,
    ) -> None:
        self.stream = stream
        self.output_format = output_format
        self.optimize_for_screen_reader = optimize_for_screen_reader
        self._color_support = None

    def colored(
        self,
        text: str,
        *,
        fg: Optional[Union[str]] = None,
        bg: Optional[str] = None,
        style: Optional[str] = None,
    ) -> str:
        self._check_color(fg)
        self._check_color(bg)
        self._check_text_style(style)
        return text

    @property
    def supports_colors(self) -> bool:
        return False

    def print_list_table(
        self,
        headers: Sequence[Union[str, Tuple[str, str]]],
        rows: Sequence[Sequence[str]],
    ) -> None:
        if rows:
            if any(len(r) != len(rows[0]) for r in rows):
                raise ValueError(
                    "Unbalanced table: All rows must have the same column count"
                )
            if len(rows[0]) != len(headers):
                raise ValueError(
                    "Unbalanced table: header list does not agree with row list on number of columns"
                )

        if not headers:
            raise ValueError("No headers provided!?")

        cadjust = {}
        header_names = []
        for c in headers:
            if isinstance(c, str):
                header_names.append(c)
            else:
                cname, adjust = c
                header_names.append(cname)
                cadjust[cname] = adjust

        if self.output_format == "csv":
            from csv import writer

            w = writer(self.stream)
            w.writerow(header_names)
            w.writerows(rows)
            return

        column_lengths = [
            max((len(h), max(len(r[i]) for r in rows)))
            for i, h in enumerate(header_names)
        ]
        # divider => "+---+---+-...-+"
        divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+"
        # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths
        row_format_inner = " | ".join(
            f"{{CELL_COLOR}}{{:{cadjust.get(cn, '<')}{x}}}{{CELL_COLOR_RESET}}"
            for cn, x in zip(header_names, column_lengths)
        )

        row_format = f"| {row_format_inner} |"

        if self.supports_colors:
            c = self._color_support
            assert c is not None
            header_color = c.Style.bold
            header_color_reset = c.Style.reset
        else:
            header_color = ""
            header_color_reset = ""

        self.print_visual_formatting(divider)
        self.print(
            row_format.format(
                *header_names,
                CELL_COLOR=header_color,
                CELL_COLOR_RESET=header_color_reset,
            )
        )
        self.print_visual_formatting(divider)
        for row in rows:
            self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET=""))
        self.print_visual_formatting(divider)

    def print(self, /, string: str = "", **kwargs) -> None:
        if "file" in kwargs:
            raise ValueError("Unsupported kwarg file")
        print(string, file=self.stream, **kwargs)

    def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None:
        if self.optimize_for_screen_reader:
            return
        self.print(format_sequence, **kwargs)

    def print_for_screen_reader(self, /, text: str, **kwargs) -> None:
        if not self.optimize_for_screen_reader:
            return
        self.print(text, **kwargs)

    def _check_color(self, color: Optional[str]) -> None:
        if color is not None and color not in _SUPPORTED_COLORS:
            raise ValueError(
                f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}"
            )

    def _check_text_style(self, style: Optional[str]) -> None:
        if style is not None and style not in _SUPPORTED_STYLES:
            raise ValueError(
                f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}"
            )

    def render_url(self, link_url: str) -> str:
        return link_url


class ANSIOutputStylingBase(OutputStylingBase):
    def __init__(
        self,
        stream: IO[str],
        output_format: str,
        *,
        support_colors: bool = True,
        support_clickable_urls: bool = True,
        **kwargs: Any,
    ) -> None:
        super().__init__(stream, output_format, **kwargs)
        self._stream = stream
        self._color_support = colored
        self._support_colors = (
            support_colors if self._color_support is not None else False
        )
        self._support_clickable_urls = support_clickable_urls

    @property
    def supports_colors(self) -> bool:
        return self._support_colors

    def colored(
        self,
        text: str,
        *,
        fg: Optional[str] = None,
        bg: Optional[str] = None,
        style: Optional[str] = None,
    ) -> str:
        self._check_color(fg)
        self._check_color(bg)
        self._check_text_style(style)
        if not self.supports_colors:
            return text
        _colored = self._color_support
        codes = []
        if style is not None:
            code = getattr(_colored.Style, style)
            assert code is not None
            codes.append(code)
        if fg is not None:
            code = getattr(_colored.Fore, fg)
            assert code is not None
            codes.append(code)
        if bg is not None:
            code = getattr(_colored.Back, bg)
            assert code is not None
            codes.append(code)
        if not codes:
            return text
        return "".join(codes) + text + _colored.Style.reset

    def render_url(self, link_url: str) -> str:
        if not self._support_clickable_urls:
            return super().render_url(link_url)
        link_text = link_url
        if not self.optimize_for_screen_reader and link_url.startswith("man:"):
            # Rewrite manpage to a clickable link by default. I am not sure how the hyperlink
            # ANSI code works with screen readers, so lets not rewrite the manpage link by
            # default. My fear is that both the link url and the link text gets read out.
            m = MAN_URL_REWRITE.match(link_url)
            if m:
                page, section = m.groups()
                link_url = f"https://manpages.debian.org/{page}.{section}"
        return URL_START + f"{link_url}\a{link_text}" + URL_END


def _output_styling(
    parsed_args: argparse.Namespace,
    stream: IO[str],
) -> OutputStylingBase:
    output_format = getattr(parsed_args, "output_format", None)
    if output_format is None:
        output_format = "text"
    optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != ""
    if not stream.isatty():
        return OutputStylingBase(
            stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
        )
    return ANSIOutputStylingBase(
        stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
    )


@contextlib.contextmanager
def _stream_to_pager(
    parsed_args: argparse.Namespace,
) -> Iterator[Tuple[IO[str], OutputStylingBase]]:
    fancy_output = _output_styling(parsed_args, sys.stdout)
    if (
        not parsed_args.pager
        or not sys.stdout.isatty()
        or fancy_output.output_format != "text"
    ):
        yield sys.stdout, fancy_output
        return

    pager = _pager()
    if pager is None:
        yield sys.stdout, fancy_output
        return

    env: Mapping[str, str] = os.environ
    if "LESS" not in env:
        env_copy = dict(os.environ)
        env_copy["LESS"] = "-FRSXMQ"
        env = env_copy

    cmd = subprocess.Popen(
        pager,
        stdin=subprocess.PIPE,
        encoding="utf-8",
        env=env,
    )
    stdin = assume_not_none(cmd.stdin)
    try:
        fancy_output.stream = stdin
        yield stdin, fancy_output
    except Exception:
        stdin.close()
        cmd.kill()
        cmd.wait()
        raise
    finally:
        fancy_output.stream = sys.stdin
    stdin.close()
    cmd.wait()


def _normalize_cell(cell: Union[str, Sequence[str]], times: int) -> Iterable[str]:
    if isinstance(cell, str):
        return itertools.chain([cell], itertools.repeat("", times=times - 1))
    if not cell:
        return itertools.repeat("", times=times)
    return itertools.chain(cell, itertools.repeat("", times=times - len(cell)))


def _expand_rows(
    rows: Sequence[Sequence[Union[str, Sequence[str]]]]
) -> Iterator[Sequence[str]]:
    for row in rows:
        if all(isinstance(c, str) for c in row):
            yield row
        else:
            longest = max(len(c) if isinstance(c, list) else 1 for c in row)
            cells = [_normalize_cell(c, times=longest) for c in row]
            yield from zip(*cells)