Coverage for src/debputy/commands/debputy_cmd/output.py: 17%

191 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import argparse 

2import contextlib 

3import itertools 

4import os 

5import re 

6import shutil 

7import subprocess 

8import sys 

9from typing import ( 

10 Union, 

11 Sequence, 

12 Iterable, 

13 Iterator, 

14 IO, 

15 Mapping, 

16 Tuple, 

17 Optional, 

18 Any, 

19) 

20 

21from debputy.util import assume_not_none 

22 

23try: 

24 import colored 

25except ImportError: 

26 colored = None 

27 

28 

29def _pager() -> Optional[str]: 

30 pager = os.environ.get("DEBPUTY_PAGER") 

31 if pager is None: 

32 pager = os.environ.get("PAGER") 

33 if pager is None and shutil.which("less") is not None: 

34 pager = "less" 

35 return pager 

36 

37 

38URL_START = "\033]8;;" 

39URL_END = "\033]8;;\a" 

40MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]") 

41 

42_SUPPORTED_COLORS = { 

43 "black", 

44 "red", 

45 "green", 

46 "yellow", 

47 "blue", 

48 "magenta", 

49 "cyan", 

50 "white", 

51} 

52_SUPPORTED_STYLES = {"none", "bold"} 

53 

54 

55class OutputStylingBase: 

56 def __init__( 

57 self, 

58 stream: IO[str], 

59 output_format: str, 

60 *, 

61 optimize_for_screen_reader: bool = False, 

62 ) -> None: 

63 self.stream = stream 

64 self.output_format = output_format 

65 self.optimize_for_screen_reader = optimize_for_screen_reader 

66 self._color_support = None 

67 

68 def colored( 

69 self, 

70 text: str, 

71 *, 

72 fg: Optional[Union[str]] = None, 

73 bg: Optional[str] = None, 

74 style: Optional[str] = None, 

75 ) -> str: 

76 self._check_color(fg) 

77 self._check_color(bg) 

78 self._check_text_style(style) 

79 return text 

80 

81 @property 

82 def supports_colors(self) -> bool: 

83 return False 

84 

85 def print_list_table( 

86 self, 

87 headers: Sequence[Union[str, Tuple[str, str]]], 

88 rows: Sequence[Sequence[str]], 

89 ) -> None: 

90 if rows: 

91 if any(len(r) != len(rows[0]) for r in rows): 

92 raise ValueError( 

93 "Unbalanced table: All rows must have the same column count" 

94 ) 

95 if len(rows[0]) != len(headers): 

96 raise ValueError( 

97 "Unbalanced table: header list does not agree with row list on number of columns" 

98 ) 

99 

100 if not headers: 

101 raise ValueError("No headers provided!?") 

102 

103 cadjust = {} 

104 header_names = [] 

105 for c in headers: 

106 if isinstance(c, str): 

107 header_names.append(c) 

108 else: 

109 cname, adjust = c 

110 header_names.append(cname) 

111 cadjust[cname] = adjust 

112 

113 if self.output_format == "csv": 

114 from csv import writer 

115 

116 w = writer(self.stream) 

117 w.writerow(header_names) 

118 w.writerows(rows) 

119 return 

120 

121 column_lengths = [ 

122 max((len(h), max(len(r[i]) for r in rows))) 

123 for i, h in enumerate(header_names) 

124 ] 

125 # divider => "+---+---+-...-+" 

126 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+" 

127 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths 

128 row_format_inner = " | ".join( 

129 f"{{CELL_COLOR}}{{:{cadjust.get(cn, '<')}{x}}}{{CELL_COLOR_RESET}}" 

130 for cn, x in zip(header_names, column_lengths) 

131 ) 

132 

133 row_format = f"| {row_format_inner} |" 

134 

135 if self.supports_colors: 

136 c = self._color_support 

137 assert c is not None 

138 header_color = c.Style.bold 

139 header_color_reset = c.Style.reset 

140 else: 

141 header_color = "" 

142 header_color_reset = "" 

143 

144 self.print_visual_formatting(divider) 

145 self.print( 

146 row_format.format( 

147 *header_names, 

148 CELL_COLOR=header_color, 

149 CELL_COLOR_RESET=header_color_reset, 

150 ) 

151 ) 

152 self.print_visual_formatting(divider) 

153 for row in rows: 

154 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET="")) 

155 self.print_visual_formatting(divider) 

156 

157 def print(self, /, string: str = "", **kwargs) -> None: 

158 if "file" in kwargs: 

159 raise ValueError("Unsupported kwarg file") 

160 print(string, file=self.stream, **kwargs) 

161 

162 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None: 

163 if self.optimize_for_screen_reader: 

164 return 

165 self.print(format_sequence, **kwargs) 

166 

167 def print_for_screen_reader(self, /, text: str, **kwargs) -> None: 

168 if not self.optimize_for_screen_reader: 

169 return 

170 self.print(text, **kwargs) 

171 

172 def _check_color(self, color: Optional[str]) -> None: 

173 if color is not None and color not in _SUPPORTED_COLORS: 

174 raise ValueError( 

175 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}" 

176 ) 

177 

178 def _check_text_style(self, style: Optional[str]) -> None: 

179 if style is not None and style not in _SUPPORTED_STYLES: 

180 raise ValueError( 

181 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}" 

182 ) 

183 

184 def render_url(self, link_url: str) -> str: 

185 return link_url 

186 

187 

188class ANSIOutputStylingBase(OutputStylingBase): 

189 def __init__( 

190 self, 

191 stream: IO[str], 

192 output_format: str, 

193 *, 

194 support_colors: bool = True, 

195 support_clickable_urls: bool = True, 

196 **kwargs: Any, 

197 ) -> None: 

198 super().__init__(stream, output_format, **kwargs) 

199 self._stream = stream 

200 self._color_support = colored 

201 self._support_colors = ( 

202 support_colors if self._color_support is not None else False 

203 ) 

204 self._support_clickable_urls = support_clickable_urls 

205 

206 @property 

207 def supports_colors(self) -> bool: 

208 return self._support_colors 

209 

210 def colored( 

211 self, 

212 text: str, 

213 *, 

214 fg: Optional[str] = None, 

215 bg: Optional[str] = None, 

216 style: Optional[str] = None, 

217 ) -> str: 

218 self._check_color(fg) 

219 self._check_color(bg) 

220 self._check_text_style(style) 

221 if not self.supports_colors: 

222 return text 

223 _colored = self._color_support 

224 codes = [] 

225 if style is not None: 

226 code = getattr(_colored.Style, style) 

227 assert code is not None 

228 codes.append(code) 

229 if fg is not None: 

230 code = getattr(_colored.Fore, fg) 

231 assert code is not None 

232 codes.append(code) 

233 if bg is not None: 

234 code = getattr(_colored.Back, bg) 

235 assert code is not None 

236 codes.append(code) 

237 if not codes: 

238 return text 

239 return "".join(codes) + text + _colored.Style.reset 

240 

241 def render_url(self, link_url: str) -> str: 

242 if not self._support_clickable_urls: 

243 return super().render_url(link_url) 

244 link_text = link_url 

245 if not self.optimize_for_screen_reader and link_url.startswith("man:"): 

246 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink 

247 # ANSI code works with screen readers, so lets not rewrite the man page link by 

248 # default. My fear is that both the link url and the link text gets read out. 

249 m = MAN_URL_REWRITE.match(link_url) 

250 if m: 

251 page, section = m.groups() 

252 link_url = f"https://manpages.debian.org/{page}.{section}" 

253 return URL_START + f"{link_url}\a{link_text}" + URL_END 

254 

255 

256def _output_styling( 

257 parsed_args: argparse.Namespace, 

258 stream: IO[str], 

259) -> OutputStylingBase: 

260 output_format = getattr(parsed_args, "output_format", None) 

261 if output_format is None: 

262 output_format = "text" 

263 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != "" 

264 if not stream.isatty(): 

265 return OutputStylingBase( 

266 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader 

267 ) 

268 return ANSIOutputStylingBase( 

269 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader 

270 ) 

271 

272 

273@contextlib.contextmanager 

274def _stream_to_pager( 

275 parsed_args: argparse.Namespace, 

276) -> Iterator[Tuple[IO[str], OutputStylingBase]]: 

277 fancy_output = _output_styling(parsed_args, sys.stdout) 

278 if ( 

279 not parsed_args.pager 

280 or not sys.stdout.isatty() 

281 or fancy_output.output_format != "text" 

282 ): 

283 yield sys.stdout, fancy_output 

284 return 

285 

286 pager = _pager() 

287 if pager is None: 

288 yield sys.stdout, fancy_output 

289 return 

290 

291 env: Mapping[str, str] = os.environ 

292 if "LESS" not in env: 

293 env_copy = dict(os.environ) 

294 env_copy["LESS"] = "-FRSXMQ" 

295 env = env_copy 

296 

297 cmd = subprocess.Popen( 

298 pager, 

299 stdin=subprocess.PIPE, 

300 encoding="utf-8", 

301 env=env, 

302 ) 

303 stdin = assume_not_none(cmd.stdin) 

304 try: 

305 fancy_output.stream = stdin 

306 yield stdin, fancy_output 

307 except Exception: 

308 stdin.close() 

309 cmd.kill() 

310 cmd.wait() 

311 raise 

312 finally: 

313 fancy_output.stream = sys.stdin 

314 stdin.close() 

315 cmd.wait() 

316 

317 

318def _normalize_cell(cell: Union[str, Sequence[str]], times: int) -> Iterable[str]: 

319 if isinstance(cell, str): 

320 return itertools.chain([cell], itertools.repeat("", times=times - 1)) 

321 if not cell: 

322 return itertools.repeat("", times=times) 

323 return itertools.chain(cell, itertools.repeat("", times=times - len(cell))) 

324 

325 

326def _expand_rows( 

327 rows: Sequence[Sequence[Union[str, Sequence[str]]]] 

328) -> Iterator[Sequence[str]]: 

329 for row in rows: 

330 if all(isinstance(c, str) for c in row): 

331 yield row 

332 else: 

333 longest = max(len(c) if isinstance(c, list) else 1 for c in row) 

334 cells = [_normalize_cell(c, times=longest) for c in row] 

335 yield from zip(*cells)