Coverage for src/debputy/commands/debputy_cmd/output.py: 17%
191 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import argparse
2import contextlib
3import itertools
4import os
5import re
6import shutil
7import subprocess
8import sys
9from typing import (
10 Union,
11 Sequence,
12 Iterable,
13 Iterator,
14 IO,
15 Mapping,
16 Tuple,
17 Optional,
18 Any,
19)
21from debputy.util import assume_not_none
23try:
24 import colored
25except ImportError:
26 colored = None
29def _pager() -> Optional[str]:
30 pager = os.environ.get("DEBPUTY_PAGER")
31 if pager is None:
32 pager = os.environ.get("PAGER")
33 if pager is None and shutil.which("less") is not None:
34 pager = "less"
35 return pager
38URL_START = "\033]8;;"
39URL_END = "\033]8;;\a"
40MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]")
42_SUPPORTED_COLORS = {
43 "black",
44 "red",
45 "green",
46 "yellow",
47 "blue",
48 "magenta",
49 "cyan",
50 "white",
51}
52_SUPPORTED_STYLES = {"none", "bold"}
55class OutputStylingBase:
56 def __init__(
57 self,
58 stream: IO[str],
59 output_format: str,
60 *,
61 optimize_for_screen_reader: bool = False,
62 ) -> None:
63 self.stream = stream
64 self.output_format = output_format
65 self.optimize_for_screen_reader = optimize_for_screen_reader
66 self._color_support = None
68 def colored(
69 self,
70 text: str,
71 *,
72 fg: Optional[Union[str]] = None,
73 bg: Optional[str] = None,
74 style: Optional[str] = None,
75 ) -> str:
76 self._check_color(fg)
77 self._check_color(bg)
78 self._check_text_style(style)
79 return text
81 @property
82 def supports_colors(self) -> bool:
83 return False
85 def print_list_table(
86 self,
87 headers: Sequence[Union[str, Tuple[str, str]]],
88 rows: Sequence[Sequence[str]],
89 ) -> None:
90 if rows:
91 if any(len(r) != len(rows[0]) for r in rows):
92 raise ValueError(
93 "Unbalanced table: All rows must have the same column count"
94 )
95 if len(rows[0]) != len(headers):
96 raise ValueError(
97 "Unbalanced table: header list does not agree with row list on number of columns"
98 )
100 if not headers:
101 raise ValueError("No headers provided!?")
103 cadjust = {}
104 header_names = []
105 for c in headers:
106 if isinstance(c, str):
107 header_names.append(c)
108 else:
109 cname, adjust = c
110 header_names.append(cname)
111 cadjust[cname] = adjust
113 if self.output_format == "csv":
114 from csv import writer
116 w = writer(self.stream)
117 w.writerow(header_names)
118 w.writerows(rows)
119 return
121 column_lengths = [
122 max((len(h), max(len(r[i]) for r in rows)))
123 for i, h in enumerate(header_names)
124 ]
125 # divider => "+---+---+-...-+"
126 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+"
127 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths
128 row_format_inner = " | ".join(
129 f"{{CELL_COLOR}}{{:{cadjust.get(cn, '<')}{x}}}{{CELL_COLOR_RESET}}"
130 for cn, x in zip(header_names, column_lengths)
131 )
133 row_format = f"| {row_format_inner} |"
135 if self.supports_colors:
136 c = self._color_support
137 assert c is not None
138 header_color = c.Style.bold
139 header_color_reset = c.Style.reset
140 else:
141 header_color = ""
142 header_color_reset = ""
144 self.print_visual_formatting(divider)
145 self.print(
146 row_format.format(
147 *header_names,
148 CELL_COLOR=header_color,
149 CELL_COLOR_RESET=header_color_reset,
150 )
151 )
152 self.print_visual_formatting(divider)
153 for row in rows:
154 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET=""))
155 self.print_visual_formatting(divider)
157 def print(self, /, string: str = "", **kwargs) -> None:
158 if "file" in kwargs:
159 raise ValueError("Unsupported kwarg file")
160 print(string, file=self.stream, **kwargs)
162 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None:
163 if self.optimize_for_screen_reader:
164 return
165 self.print(format_sequence, **kwargs)
167 def print_for_screen_reader(self, /, text: str, **kwargs) -> None:
168 if not self.optimize_for_screen_reader:
169 return
170 self.print(text, **kwargs)
172 def _check_color(self, color: Optional[str]) -> None:
173 if color is not None and color not in _SUPPORTED_COLORS:
174 raise ValueError(
175 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}"
176 )
178 def _check_text_style(self, style: Optional[str]) -> None:
179 if style is not None and style not in _SUPPORTED_STYLES:
180 raise ValueError(
181 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}"
182 )
184 def render_url(self, link_url: str) -> str:
185 return link_url
188class ANSIOutputStylingBase(OutputStylingBase):
189 def __init__(
190 self,
191 stream: IO[str],
192 output_format: str,
193 *,
194 support_colors: bool = True,
195 support_clickable_urls: bool = True,
196 **kwargs: Any,
197 ) -> None:
198 super().__init__(stream, output_format, **kwargs)
199 self._stream = stream
200 self._color_support = colored
201 self._support_colors = (
202 support_colors if self._color_support is not None else False
203 )
204 self._support_clickable_urls = support_clickable_urls
206 @property
207 def supports_colors(self) -> bool:
208 return self._support_colors
210 def colored(
211 self,
212 text: str,
213 *,
214 fg: Optional[str] = None,
215 bg: Optional[str] = None,
216 style: Optional[str] = None,
217 ) -> str:
218 self._check_color(fg)
219 self._check_color(bg)
220 self._check_text_style(style)
221 if not self.supports_colors:
222 return text
223 _colored = self._color_support
224 codes = []
225 if style is not None:
226 code = getattr(_colored.Style, style)
227 assert code is not None
228 codes.append(code)
229 if fg is not None:
230 code = getattr(_colored.Fore, fg)
231 assert code is not None
232 codes.append(code)
233 if bg is not None:
234 code = getattr(_colored.Back, bg)
235 assert code is not None
236 codes.append(code)
237 if not codes:
238 return text
239 return "".join(codes) + text + _colored.Style.reset
241 def render_url(self, link_url: str) -> str:
242 if not self._support_clickable_urls:
243 return super().render_url(link_url)
244 link_text = link_url
245 if not self.optimize_for_screen_reader and link_url.startswith("man:"):
246 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink
247 # ANSI code works with screen readers, so lets not rewrite the man page link by
248 # default. My fear is that both the link url and the link text gets read out.
249 m = MAN_URL_REWRITE.match(link_url)
250 if m:
251 page, section = m.groups()
252 link_url = f"https://manpages.debian.org/{page}.{section}"
253 return URL_START + f"{link_url}\a{link_text}" + URL_END
256def _output_styling(
257 parsed_args: argparse.Namespace,
258 stream: IO[str],
259) -> OutputStylingBase:
260 output_format = getattr(parsed_args, "output_format", None)
261 if output_format is None:
262 output_format = "text"
263 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != ""
264 if not stream.isatty():
265 return OutputStylingBase(
266 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
267 )
268 return ANSIOutputStylingBase(
269 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
270 )
273@contextlib.contextmanager
274def _stream_to_pager(
275 parsed_args: argparse.Namespace,
276) -> Iterator[Tuple[IO[str], OutputStylingBase]]:
277 fancy_output = _output_styling(parsed_args, sys.stdout)
278 if (
279 not parsed_args.pager
280 or not sys.stdout.isatty()
281 or fancy_output.output_format != "text"
282 ):
283 yield sys.stdout, fancy_output
284 return
286 pager = _pager()
287 if pager is None:
288 yield sys.stdout, fancy_output
289 return
291 env: Mapping[str, str] = os.environ
292 if "LESS" not in env:
293 env_copy = dict(os.environ)
294 env_copy["LESS"] = "-FRSXMQ"
295 env = env_copy
297 cmd = subprocess.Popen(
298 pager,
299 stdin=subprocess.PIPE,
300 encoding="utf-8",
301 env=env,
302 )
303 stdin = assume_not_none(cmd.stdin)
304 try:
305 fancy_output.stream = stdin
306 yield stdin, fancy_output
307 except Exception:
308 stdin.close()
309 cmd.kill()
310 cmd.wait()
311 raise
312 finally:
313 fancy_output.stream = sys.stdin
314 stdin.close()
315 cmd.wait()
318def _normalize_cell(cell: Union[str, Sequence[str]], times: int) -> Iterable[str]:
319 if isinstance(cell, str):
320 return itertools.chain([cell], itertools.repeat("", times=times - 1))
321 if not cell:
322 return itertools.repeat("", times=times)
323 return itertools.chain(cell, itertools.repeat("", times=times - len(cell)))
326def _expand_rows(
327 rows: Sequence[Sequence[Union[str, Sequence[str]]]]
328) -> Iterator[Sequence[str]]:
329 for row in rows:
330 if all(isinstance(c, str) for c in row):
331 yield row
332 else:
333 longest = max(len(c) if isinstance(c, list) else 1 for c in row)
334 cells = [_normalize_cell(c, times=longest) for c in row]
335 yield from zip(*cells)