Coverage for src/debputy/lsp/vendoring/_deb822_repro/_util.py: 57%

154 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import collections 

2import collections.abc 

3import logging 

4import sys 

5import textwrap 

6from abc import ABC 

7 

8try: 

9 from typing import ( 

10 Optional, 

11 Union, 

12 Iterable, 

13 Callable, 

14 TYPE_CHECKING, 

15 Iterator, 

16 Type, 

17 cast, 

18 List, 

19 Generic, 

20 ) 

21 from debian._util import T 

22 from .types import TE, R, TokenOrElement 

23 

24 _combine_parts_ret_type = Callable[ 

25 [Iterable[Union[TokenOrElement, TE]]], Iterable[Union[TokenOrElement, R]] 

26 ] 

27except ImportError: 

28 # pylint: disable=unnecessary-lambda-assignment 

29 TYPE_CHECKING = False 

30 cast = lambda t, v: v 

31 

32 

33if TYPE_CHECKING: 

34 from .parsing import Deb822Element 

35 from .tokens import Deb822Token 

36 

37 

38def print_ast( 

39 ast_tree, # type: Union[Iterable[TokenOrElement], 'Deb822Element'] 

40 *, 

41 end_marker_after=5, # type: Optional[int] 

42 output_function=None # type: Optional[Callable[[str], None]] 

43): 

44 # type: (...) -> None 

45 """Debugging aid, which can dump a Deb822Element or a list of tokens/elements 

46 

47 :param ast_tree: Either a Deb822Element or an iterable Deb822Token/Deb822Element entries 

48 (both types may be mixed in the same iterable, which enable it to dump the 

49 ast tree at different stages of parse_deb822_file method) 

50 :param end_marker_after: The dump will add "end of element" markers if a 

51 given element spans at least this many tokens/elements. Can be disabled 

52 with by passing None as value. Use 0 for unconditionally marking all 

53 elements (note that tokens never get an "end of element" marker as they 

54 are not an elements). 

55 :param output_function: Callable that receives a single str argument and is responsible 

56 for "displaying" that line. The callable may be invoked multiple times (one per line 

57 of output). Defaults to logging.info if omitted. 

58 

59 """ 

60 # Avoid circular dependency 

61 # pylint: disable=import-outside-toplevel 

62 from debian._deb822_repro.parsing import Deb822Element 

63 

64 prefix = None 

65 if isinstance(ast_tree, Deb822Element): 

66 ast_tree = [ast_tree] 

67 stack = [(0, "", iter(ast_tree))] 

68 current_no = 0 

69 if output_function is None: 

70 output_function = logging.info 

71 while stack: 

72 start_no, name, current_iter = stack[-1] 

73 for current in current_iter: 

74 current_no += 1 

75 if prefix is None: 

76 prefix = " " * len(stack) 

77 if isinstance(current, Deb822Element): 

78 stack.append( 

79 (current_no, current.__class__.__name__, iter(current.iter_parts())) 

80 ) 

81 output_function(prefix + current.__class__.__name__) 

82 prefix = None 

83 break 

84 output_function(prefix + str(current)) 

85 else: 

86 # current_iter is depleted 

87 stack.pop() 

88 prefix = None 

89 if ( 

90 end_marker_after is not None 

91 and start_no + end_marker_after <= current_no 

92 and name 

93 ): 

94 if prefix is None: 

95 prefix = " " * len(stack) 

96 output_function(prefix + "# <-- END OF " + name) 

97 

98 

99def combine_into_replacement( 

100 source_class, # type: Type[TE] 

101 replacement_class, # type: Type[R] 

102 *, 

103 constructor=None # type: Optional[Callable[[List[TE]], R]] 

104): 

105 # type: (...) -> _combine_parts_ret_type[TE, R] 

106 """Combines runs of one type into another type 

107 

108 This is primarily useful for transforming tokens (e.g, Comment tokens) into 

109 the relevant element (such as the Comment element). 

110 """ 

111 if constructor is None: 

112 _constructor = cast("Callable[[List[TE]], R]", replacement_class) 

113 else: 

114 # Force mypy to see that constructor is no longer optional 

115 _constructor = constructor 

116 

117 def _impl(token_stream): 

118 # type: (Iterable[Union[TokenOrElement, TE]]) -> Iterable[Union[TokenOrElement, R]] 

119 tokens = [] 

120 for token in token_stream: 

121 if isinstance(token, source_class): 

122 tokens.append(token) 

123 continue 

124 

125 if tokens: 

126 yield _constructor(list(tokens)) 

127 tokens.clear() 

128 yield token 

129 

130 if tokens: 

131 yield _constructor(tokens) 

132 

133 return _impl 

134 

135 

136if sys.version_info >= (3, 9) or TYPE_CHECKING: 136 ↛ 141line 136 didn't jump to line 141, because the condition on line 136 was never false

137 _bufferingIterator_Base = collections.abc.Iterator[T] 

138else: 

139 # Python 3.5 - 3.8 compat - we are not allowed to subscript the abc.Iterator 

140 # - use this little hack to work around it 

141 class _bufferingIterator_Base(collections.abc.Iterator, Generic[T], ABC): 

142 pass 

143 

144 

145class BufferingIterator(_bufferingIterator_Base[T], Generic[T]): 

146 

147 def __init__(self, stream): 

148 # type: (Iterable[T]) -> None 

149 self._stream = iter(stream) # type: Iterator[T] 

150 self._buffer = collections.deque() # type: collections.deque[T] 

151 self._expired = False # type: bool 

152 

153 def __next__(self): 

154 # type: () -> T 

155 if self._buffer: 

156 return self._buffer.popleft() 

157 if self._expired: 

158 raise StopIteration 

159 return next(self._stream) 

160 

161 def takewhile(self, predicate): 

162 # type: (Callable[[T], bool]) -> Iterable[T] 

163 """Variant of itertools.takewhile except it does not discard the first non-matching token""" 

164 buffer = self._buffer 

165 while buffer or self._fill_buffer(5): 165 ↛ exitline 165 didn't return from function 'takewhile', because the condition on line 165 was never false

166 v = buffer[0] 

167 if predicate(v): 

168 buffer.popleft() 

169 yield v 

170 else: 

171 break 

172 

173 def consume_many(self, count): 

174 # type: (int) -> List[T] 

175 self._fill_buffer(count) 

176 buffer = self._buffer 

177 if len(buffer) == count: 

178 ret = list(buffer) 

179 buffer.clear() 

180 else: 

181 ret = [] 

182 while buffer and count: 

183 ret.append(buffer.popleft()) 

184 count -= 1 

185 return ret 

186 

187 def peek_buffer(self): 

188 # type: () -> List[T] 

189 return list(self._buffer) 

190 

191 def peek_find( 

192 self, 

193 predicate, # type: Callable[[T], bool] 

194 limit=None, # type: Optional[int] 

195 ): 

196 # type: (...) -> Optional[int] 

197 buffer = self._buffer 

198 i = 0 

199 while limit is None or i < limit: 199 ↛ 208line 199 didn't jump to line 208, because the condition on line 199 was never false

200 if i >= len(buffer): 

201 self._fill_buffer(i + 5) 

202 if i >= len(buffer): 

203 return None 

204 v = buffer[i] 

205 if predicate(v): 

206 return i + 1 

207 i += 1 

208 return None 

209 

210 def _fill_buffer(self, number): 

211 # type: (int) -> bool 

212 if not self._expired: 

213 while len(self._buffer) < number: 

214 try: 

215 self._buffer.append(next(self._stream)) 

216 except StopIteration: 

217 self._expired = True 

218 break 

219 return bool(self._buffer) 

220 

221 def peek(self): 

222 # type: () -> Optional[T] 

223 return self.peek_at(1) 

224 

225 def peek_at(self, tokens_ahead): 

226 # type: (int) -> Optional[T] 

227 self._fill_buffer(tokens_ahead) 

228 return ( 

229 self._buffer[tokens_ahead - 1] 

230 if len(self._buffer) >= tokens_ahead 

231 else None 

232 ) 

233 

234 def peek_many(self, number): 

235 # type: (int) -> List[T] 

236 self._fill_buffer(number) 

237 buffer = self._buffer 

238 if len(buffer) == number: 

239 ret = list(buffer) 

240 elif number: 

241 ret = [] 

242 for t in buffer: 

243 ret.append(t) 

244 number -= 1 

245 if not number: 

246 break 

247 else: 

248 ret = [] 

249 return ret 

250 

251 

252def len_check_iterator( 

253 content, # type: str 

254 stream, # type: Iterable[TE] 

255 content_len=None, # type: Optional[int] 

256): 

257 # type: (...) -> Iterable[TE] 

258 """Flatten a parser's output into tokens and verify it covers the entire line/text""" 

259 if content_len is None: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

260 content_len = len(content) 

261 # Fail-safe to ensure none of the value parsers incorrectly parse a value. 

262 covered = 0 

263 for token_or_element in stream: 

264 # We use the AttributeError to discriminate between elements and tokens 

265 # The cast()s are here to assist / workaround mypy not realizing that. 

266 try: 

267 tokens = cast("Deb822Element", token_or_element).iter_tokens() 

268 except AttributeError: 

269 token = cast("Deb822Token", token_or_element) 

270 covered += len(token.text) 

271 else: 

272 for token in tokens: 

273 covered += len(token.text) 

274 yield token_or_element 

275 if covered != content_len: 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true

276 if covered < content_len: 

277 msg = textwrap.dedent( 

278 """\ 

279 Value parser did not fully cover the entire line with tokens ( 

280 missing range {covered}..{content_len}). Occurred when parsing "{content}" 

281 """ 

282 ).format(covered=covered, content_len=content_len, line=content) 

283 raise ValueError(msg) 

284 msg = textwrap.dedent( 

285 """\ 

286 Value parser emitted tokens for more text than was present? Should have 

287 emitted {content_len} characters, got {covered}. Occurred when parsing 

288 "{content}" 

289 """ 

290 ).format(covered=covered, content_len=content_len, content=content) 

291 raise ValueError(msg)