Coverage for src/debputy/lsp/vendoring/_deb822_repro/_util.py: 57%
154 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import collections
2import collections.abc
3import logging
4import sys
5import textwrap
6from abc import ABC
8try:
9 from typing import (
10 Optional,
11 Union,
12 Iterable,
13 Callable,
14 TYPE_CHECKING,
15 Iterator,
16 Type,
17 cast,
18 List,
19 Generic,
20 )
21 from debian._util import T
22 from .types import TE, R, TokenOrElement
24 _combine_parts_ret_type = Callable[
25 [Iterable[Union[TokenOrElement, TE]]], Iterable[Union[TokenOrElement, R]]
26 ]
27except ImportError:
28 # pylint: disable=unnecessary-lambda-assignment
29 TYPE_CHECKING = False
30 cast = lambda t, v: v
33if TYPE_CHECKING:
34 from .parsing import Deb822Element
35 from .tokens import Deb822Token
38def print_ast(
39 ast_tree, # type: Union[Iterable[TokenOrElement], 'Deb822Element']
40 *,
41 end_marker_after=5, # type: Optional[int]
42 output_function=None # type: Optional[Callable[[str], None]]
43):
44 # type: (...) -> None
45 """Debugging aid, which can dump a Deb822Element or a list of tokens/elements
47 :param ast_tree: Either a Deb822Element or an iterable Deb822Token/Deb822Element entries
48 (both types may be mixed in the same iterable, which enable it to dump the
49 ast tree at different stages of parse_deb822_file method)
50 :param end_marker_after: The dump will add "end of element" markers if a
51 given element spans at least this many tokens/elements. Can be disabled
52 with by passing None as value. Use 0 for unconditionally marking all
53 elements (note that tokens never get an "end of element" marker as they
54 are not an elements).
55 :param output_function: Callable that receives a single str argument and is responsible
56 for "displaying" that line. The callable may be invoked multiple times (one per line
57 of output). Defaults to logging.info if omitted.
59 """
60 # Avoid circular dependency
61 # pylint: disable=import-outside-toplevel
62 from debian._deb822_repro.parsing import Deb822Element
64 prefix = None
65 if isinstance(ast_tree, Deb822Element):
66 ast_tree = [ast_tree]
67 stack = [(0, "", iter(ast_tree))]
68 current_no = 0
69 if output_function is None:
70 output_function = logging.info
71 while stack:
72 start_no, name, current_iter = stack[-1]
73 for current in current_iter:
74 current_no += 1
75 if prefix is None:
76 prefix = " " * len(stack)
77 if isinstance(current, Deb822Element):
78 stack.append(
79 (current_no, current.__class__.__name__, iter(current.iter_parts()))
80 )
81 output_function(prefix + current.__class__.__name__)
82 prefix = None
83 break
84 output_function(prefix + str(current))
85 else:
86 # current_iter is depleted
87 stack.pop()
88 prefix = None
89 if (
90 end_marker_after is not None
91 and start_no + end_marker_after <= current_no
92 and name
93 ):
94 if prefix is None:
95 prefix = " " * len(stack)
96 output_function(prefix + "# <-- END OF " + name)
99def combine_into_replacement(
100 source_class, # type: Type[TE]
101 replacement_class, # type: Type[R]
102 *,
103 constructor=None # type: Optional[Callable[[List[TE]], R]]
104):
105 # type: (...) -> _combine_parts_ret_type[TE, R]
106 """Combines runs of one type into another type
108 This is primarily useful for transforming tokens (e.g, Comment tokens) into
109 the relevant element (such as the Comment element).
110 """
111 if constructor is None:
112 _constructor = cast("Callable[[List[TE]], R]", replacement_class)
113 else:
114 # Force mypy to see that constructor is no longer optional
115 _constructor = constructor
117 def _impl(token_stream):
118 # type: (Iterable[Union[TokenOrElement, TE]]) -> Iterable[Union[TokenOrElement, R]]
119 tokens = []
120 for token in token_stream:
121 if isinstance(token, source_class):
122 tokens.append(token)
123 continue
125 if tokens:
126 yield _constructor(list(tokens))
127 tokens.clear()
128 yield token
130 if tokens:
131 yield _constructor(tokens)
133 return _impl
136if sys.version_info >= (3, 9) or TYPE_CHECKING: 136 ↛ 141line 136 didn't jump to line 141, because the condition on line 136 was never false
137 _bufferingIterator_Base = collections.abc.Iterator[T]
138else:
139 # Python 3.5 - 3.8 compat - we are not allowed to subscript the abc.Iterator
140 # - use this little hack to work around it
141 class _bufferingIterator_Base(collections.abc.Iterator, Generic[T], ABC):
142 pass
145class BufferingIterator(_bufferingIterator_Base[T], Generic[T]):
147 def __init__(self, stream):
148 # type: (Iterable[T]) -> None
149 self._stream = iter(stream) # type: Iterator[T]
150 self._buffer = collections.deque() # type: collections.deque[T]
151 self._expired = False # type: bool
153 def __next__(self):
154 # type: () -> T
155 if self._buffer:
156 return self._buffer.popleft()
157 if self._expired:
158 raise StopIteration
159 return next(self._stream)
161 def takewhile(self, predicate):
162 # type: (Callable[[T], bool]) -> Iterable[T]
163 """Variant of itertools.takewhile except it does not discard the first non-matching token"""
164 buffer = self._buffer
165 while buffer or self._fill_buffer(5): 165 ↛ exitline 165 didn't return from function 'takewhile', because the condition on line 165 was never false
166 v = buffer[0]
167 if predicate(v):
168 buffer.popleft()
169 yield v
170 else:
171 break
173 def consume_many(self, count):
174 # type: (int) -> List[T]
175 self._fill_buffer(count)
176 buffer = self._buffer
177 if len(buffer) == count:
178 ret = list(buffer)
179 buffer.clear()
180 else:
181 ret = []
182 while buffer and count:
183 ret.append(buffer.popleft())
184 count -= 1
185 return ret
187 def peek_buffer(self):
188 # type: () -> List[T]
189 return list(self._buffer)
191 def peek_find(
192 self,
193 predicate, # type: Callable[[T], bool]
194 limit=None, # type: Optional[int]
195 ):
196 # type: (...) -> Optional[int]
197 buffer = self._buffer
198 i = 0
199 while limit is None or i < limit: 199 ↛ 208line 199 didn't jump to line 208, because the condition on line 199 was never false
200 if i >= len(buffer):
201 self._fill_buffer(i + 5)
202 if i >= len(buffer):
203 return None
204 v = buffer[i]
205 if predicate(v):
206 return i + 1
207 i += 1
208 return None
210 def _fill_buffer(self, number):
211 # type: (int) -> bool
212 if not self._expired:
213 while len(self._buffer) < number:
214 try:
215 self._buffer.append(next(self._stream))
216 except StopIteration:
217 self._expired = True
218 break
219 return bool(self._buffer)
221 def peek(self):
222 # type: () -> Optional[T]
223 return self.peek_at(1)
225 def peek_at(self, tokens_ahead):
226 # type: (int) -> Optional[T]
227 self._fill_buffer(tokens_ahead)
228 return (
229 self._buffer[tokens_ahead - 1]
230 if len(self._buffer) >= tokens_ahead
231 else None
232 )
234 def peek_many(self, number):
235 # type: (int) -> List[T]
236 self._fill_buffer(number)
237 buffer = self._buffer
238 if len(buffer) == number:
239 ret = list(buffer)
240 elif number:
241 ret = []
242 for t in buffer:
243 ret.append(t)
244 number -= 1
245 if not number:
246 break
247 else:
248 ret = []
249 return ret
252def len_check_iterator(
253 content, # type: str
254 stream, # type: Iterable[TE]
255 content_len=None, # type: Optional[int]
256):
257 # type: (...) -> Iterable[TE]
258 """Flatten a parser's output into tokens and verify it covers the entire line/text"""
259 if content_len is None: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true
260 content_len = len(content)
261 # Fail-safe to ensure none of the value parsers incorrectly parse a value.
262 covered = 0
263 for token_or_element in stream:
264 # We use the AttributeError to discriminate between elements and tokens
265 # The cast()s are here to assist / workaround mypy not realizing that.
266 try:
267 tokens = cast("Deb822Element", token_or_element).iter_tokens()
268 except AttributeError:
269 token = cast("Deb822Token", token_or_element)
270 covered += len(token.text)
271 else:
272 for token in tokens:
273 covered += len(token.text)
274 yield token_or_element
275 if covered != content_len: 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true
276 if covered < content_len:
277 msg = textwrap.dedent(
278 """\
279 Value parser did not fully cover the entire line with tokens (
280 missing range {covered}..{content_len}). Occurred when parsing "{content}"
281 """
282 ).format(covered=covered, content_len=content_len, line=content)
283 raise ValueError(msg)
284 msg = textwrap.dedent(
285 """\
286 Value parser emitted tokens for more text than was present? Should have
287 emitted {content_len} characters, got {covered}. Occurred when parsing
288 "{content}"
289 """
290 ).format(covered=covered, content_len=content_len, content=content)
291 raise ValueError(msg)