diff options
Diffstat (limited to 'src/debputy/commands/debputy_cmd/output.py')
-rw-r--r-- | src/debputy/commands/debputy_cmd/output.py | 335 |
1 files changed, 335 insertions, 0 deletions
diff --git a/src/debputy/commands/debputy_cmd/output.py b/src/debputy/commands/debputy_cmd/output.py new file mode 100644 index 0000000..131338a --- /dev/null +++ b/src/debputy/commands/debputy_cmd/output.py @@ -0,0 +1,335 @@ +import argparse +import contextlib +import itertools +import os +import re +import shutil +import subprocess +import sys +from typing import ( + Union, + Sequence, + Iterable, + Iterator, + IO, + Mapping, + Tuple, + Optional, + Any, +) + +from debputy.util import assume_not_none + +try: + import colored +except ImportError: + colored = None + + +def _pager() -> Optional[str]: + pager = os.environ.get("DEBPUTY_PAGER") + if pager is None: + pager = os.environ.get("PAGER") + if pager is None and shutil.which("less") is not None: + pager = "less" + return pager + + +URL_START = "\033]8;;" +URL_END = "\033]8;;\a" +MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]") + +_SUPPORTED_COLORS = { + "black", + "red", + "green", + "yellow", + "blue", + "magenta", + "cyan", + "white", +} +_SUPPORTED_STYLES = {"none", "bold"} + + +class OutputStylingBase: + def __init__( + self, + stream: IO[str], + output_format: str, + *, + optimize_for_screen_reader: bool = False, + ) -> None: + self.stream = stream + self.output_format = output_format + self.optimize_for_screen_reader = optimize_for_screen_reader + self._color_support = None + + def colored( + self, + text: str, + *, + fg: Optional[Union[str]] = None, + bg: Optional[str] = None, + style: Optional[str] = None, + ) -> str: + self._check_color(fg) + self._check_color(bg) + self._check_text_style(style) + return text + + @property + def supports_colors(self) -> bool: + return False + + def print_list_table( + self, + headers: Sequence[Union[str, Tuple[str, str]]], + rows: Sequence[Sequence[str]], + ) -> None: + if rows: + if any(len(r) != len(rows[0]) for r in rows): + raise ValueError( + "Unbalanced table: All rows must have the same column count" + ) + if len(rows[0]) != len(headers): + raise ValueError( + "Unbalanced table: header list does not agree with row list on number of columns" + ) + + if not headers: + raise ValueError("No headers provided!?") + + cadjust = {} + header_names = [] + for c in headers: + if isinstance(c, str): + header_names.append(c) + else: + cname, adjust = c + header_names.append(cname) + cadjust[cname] = adjust + + if self.output_format == "csv": + from csv import writer + + w = writer(self.stream) + w.writerow(header_names) + w.writerows(rows) + return + + column_lengths = [ + max((len(h), max(len(r[i]) for r in rows))) + for i, h in enumerate(header_names) + ] + # divider => "+---+---+-...-+" + divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+" + # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths + row_format_inner = " | ".join( + f"{{CELL_COLOR}}{{:{cadjust.get(cn, '<')}{x}}}{{CELL_COLOR_RESET}}" + for cn, x in zip(header_names, column_lengths) + ) + + row_format = f"| {row_format_inner} |" + + if self.supports_colors: + c = self._color_support + assert c is not None + header_color = c.Style.bold + header_color_reset = c.Style.reset + else: + header_color = "" + header_color_reset = "" + + self.print_visual_formatting(divider) + self.print( + row_format.format( + *header_names, + CELL_COLOR=header_color, + CELL_COLOR_RESET=header_color_reset, + ) + ) + self.print_visual_formatting(divider) + for row in rows: + self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET="")) + self.print_visual_formatting(divider) + + def print(self, /, string: str = "", **kwargs) -> None: + if "file" in kwargs: + raise ValueError("Unsupported kwarg file") + print(string, file=self.stream, **kwargs) + + def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None: + if self.optimize_for_screen_reader: + return + self.print(format_sequence, **kwargs) + + def print_for_screen_reader(self, /, text: str, **kwargs) -> None: + if not self.optimize_for_screen_reader: + return + self.print(text, **kwargs) + + def _check_color(self, color: Optional[str]) -> None: + if color is not None and color not in _SUPPORTED_COLORS: + raise ValueError( + f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}" + ) + + def _check_text_style(self, style: Optional[str]) -> None: + if style is not None and style not in _SUPPORTED_STYLES: + raise ValueError( + f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}" + ) + + def render_url(self, link_url: str) -> str: + return link_url + + +class ANSIOutputStylingBase(OutputStylingBase): + def __init__( + self, + stream: IO[str], + output_format: str, + *, + support_colors: bool = True, + support_clickable_urls: bool = True, + **kwargs: Any, + ) -> None: + super().__init__(stream, output_format, **kwargs) + self._stream = stream + self._color_support = colored + self._support_colors = ( + support_colors if self._color_support is not None else False + ) + self._support_clickable_urls = support_clickable_urls + + @property + def supports_colors(self) -> bool: + return self._support_colors + + def colored( + self, + text: str, + *, + fg: Optional[str] = None, + bg: Optional[str] = None, + style: Optional[str] = None, + ) -> str: + self._check_color(fg) + self._check_color(bg) + self._check_text_style(style) + if not self.supports_colors: + return text + _colored = self._color_support + codes = [] + if style is not None: + code = getattr(_colored.Style, style) + assert code is not None + codes.append(code) + if fg is not None: + code = getattr(_colored.Fore, fg) + assert code is not None + codes.append(code) + if bg is not None: + code = getattr(_colored.Back, bg) + assert code is not None + codes.append(code) + if not codes: + return text + return "".join(codes) + text + _colored.Style.reset + + def render_url(self, link_url: str) -> str: + if not self._support_clickable_urls: + return super().render_url(link_url) + link_text = link_url + if not self.optimize_for_screen_reader and link_url.startswith("man:"): + # Rewrite manpage to a clickable link by default. I am not sure how the hyperlink + # ANSI code works with screen readers, so lets not rewrite the manpage link by + # default. My fear is that both the link url and the link text gets read out. + m = MAN_URL_REWRITE.match(link_url) + if m: + page, section = m.groups() + link_url = f"https://manpages.debian.org/{page}.{section}" + return URL_START + f"{link_url}\a{link_text}" + URL_END + + +def _output_styling( + parsed_args: argparse.Namespace, + stream: IO[str], +) -> OutputStylingBase: + output_format = getattr(parsed_args, "output_format", None) + if output_format is None: + output_format = "text" + optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != "" + if not stream.isatty(): + return OutputStylingBase( + stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader + ) + return ANSIOutputStylingBase( + stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader + ) + + +@contextlib.contextmanager +def _stream_to_pager( + parsed_args: argparse.Namespace, +) -> Iterator[Tuple[IO[str], OutputStylingBase]]: + fancy_output = _output_styling(parsed_args, sys.stdout) + if ( + not parsed_args.pager + or not sys.stdout.isatty() + or fancy_output.output_format != "text" + ): + yield sys.stdout, fancy_output + return + + pager = _pager() + if pager is None: + yield sys.stdout, fancy_output + return + + env: Mapping[str, str] = os.environ + if "LESS" not in env: + env_copy = dict(os.environ) + env_copy["LESS"] = "-FRSXMQ" + env = env_copy + + cmd = subprocess.Popen( + pager, + stdin=subprocess.PIPE, + encoding="utf-8", + env=env, + ) + stdin = assume_not_none(cmd.stdin) + try: + fancy_output.stream = stdin + yield stdin, fancy_output + except Exception: + stdin.close() + cmd.kill() + cmd.wait() + raise + finally: + fancy_output.stream = sys.stdin + stdin.close() + cmd.wait() + + +def _normalize_cell(cell: Union[str, Sequence[str]], times: int) -> Iterable[str]: + if isinstance(cell, str): + return itertools.chain([cell], itertools.repeat("", times=times - 1)) + if not cell: + return itertools.repeat("", times=times) + return itertools.chain(cell, itertools.repeat("", times=times - len(cell))) + + +def _expand_rows( + rows: Sequence[Sequence[Union[str, Sequence[str]]]] +) -> Iterator[Sequence[str]]: + for row in rows: + if all(isinstance(c, str) for c in row): + yield row + else: + longest = max(len(c) if isinstance(c, list) else 1 for c in row) + cells = [_normalize_cell(c, times=longest) for c in row] + yield from zip(*cells) |