summaryrefslogtreecommitdiffstats
path: root/src/debputy/lsp/vendoring/_deb822_repro/parsing.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/lsp/vendoring/_deb822_repro/parsing.py')
-rw-r--r--src/debputy/lsp/vendoring/_deb822_repro/parsing.py3497
1 files changed, 3497 insertions, 0 deletions
diff --git a/src/debputy/lsp/vendoring/_deb822_repro/parsing.py b/src/debputy/lsp/vendoring/_deb822_repro/parsing.py
new file mode 100644
index 0000000..13e59b1
--- /dev/null
+++ b/src/debputy/lsp/vendoring/_deb822_repro/parsing.py
@@ -0,0 +1,3497 @@
+# -*- coding: utf-8 -*- vim: fileencoding=utf-8 :
+
+import collections.abc
+import contextlib
+import sys
+import textwrap
+import weakref
+from abc import ABC
+from types import TracebackType
+from weakref import ReferenceType
+
+from ._util import (
+ combine_into_replacement,
+ BufferingIterator,
+ len_check_iterator,
+)
+from .formatter import (
+ FormatterContentToken,
+ one_value_per_line_trailing_separator,
+ format_field,
+)
+from .locatable import Locatable, START_POSITION, Position, Range
+from .tokens import (
+ Deb822Token,
+ Deb822ValueToken,
+ Deb822SemanticallySignificantWhiteSpace,
+ Deb822SpaceSeparatorToken,
+ Deb822CommentToken,
+ Deb822WhitespaceToken,
+ Deb822ValueContinuationToken,
+ Deb822NewlineAfterValueToken,
+ Deb822CommaToken,
+ Deb822FieldNameToken,
+ Deb822FieldSeparatorToken,
+ Deb822ErrorToken,
+ tokenize_deb822_file,
+ comma_split_tokenizer,
+ whitespace_split_tokenizer,
+)
+from .types import AmbiguousDeb822FieldKeyError, SyntaxOrParseError
+from debian._util import (
+ resolve_ref,
+ LinkedList,
+ LinkedListNode,
+ OrderedSet,
+ _strI,
+ default_field_sort_key,
+)
+
+try:
+ from typing import (
+ Iterable,
+ Iterator,
+ List,
+ Union,
+ Dict,
+ Optional,
+ Callable,
+ Any,
+ Generic,
+ Type,
+ Tuple,
+ IO,
+ cast,
+ overload,
+ Mapping,
+ TYPE_CHECKING,
+ Sequence,
+ )
+ from debian._util import T
+
+ # for some reason, pylint does not see that Commentish is used in typing
+ from .types import ( # pylint: disable=unused-import
+ ST,
+ VE,
+ TE,
+ ParagraphKey,
+ TokenOrElement,
+ Commentish,
+ ParagraphKeyBase,
+ FormatterCallback,
+ )
+
+ if TYPE_CHECKING:
+ StreamingValueParser = Callable[
+ [Deb822Token, BufferingIterator[Deb822Token]], VE
+ ]
+ StrToValueParser = Callable[[str], Iterable[Union["Deb822Token", VE]]]
+ KVPNode = LinkedListNode["Deb822KeyValuePairElement"]
+ else:
+ StreamingValueParser = None
+ StrToValueParser = None
+ KVPNode = None
+except ImportError:
+ if not TYPE_CHECKING:
+ # pylint: disable=unnecessary-lambda-assignment
+ cast = lambda t, v: v
+ overload = lambda f: None
+
+
+class ValueReference(Generic[TE]):
+ """Reference to a value inside a Deb822 paragraph
+
+ This is useful for cases where want to modify values "in-place" or maybe
+ conditionally remove a value after looking at it.
+
+ ValueReferences can be invalidated by various changes or actions performed
+ to the underlying provider of the value reference. As an example, sorting
+ a list of values will generally invalidate all ValueReferences related to
+ that list.
+
+ The ValueReference will raise validity issues where it detects them but most
+ of the time it will not notice. As a means to this end, the ValueReference
+ will *not* keep a strong reference to the underlying value. This enables it
+ to detect when the container goes out of scope. However, keep in mind that
+ the timeliness of garbage collection is implementation defined (e.g., pypy
+ does not use ref-counting).
+ """
+
+ __slots__ = (
+ "_node",
+ "_render",
+ "_value_factory",
+ "_removal_handler",
+ "_mutation_notifier",
+ )
+
+ def __init__(
+ self,
+ node, # type: LinkedListNode[TE]
+ render, # type: Callable[[TE], str]
+ value_factory, # type: Callable[[str], TE]
+ removal_handler, # type: Callable[[LinkedListNode[TokenOrElement]], None]
+ mutation_notifier, # type: Optional[Callable[[], None]]
+ ):
+ self._node = weakref.ref(
+ node
+ ) # type: Optional[ReferenceType[LinkedListNode[TE]]]
+ self._render = render
+ self._value_factory = value_factory
+ self._removal_handler = removal_handler
+ self._mutation_notifier = mutation_notifier
+
+ def _resolve_node(self):
+ # type: () -> LinkedListNode[TE]
+ # NB: We check whether the "ref" itself is None (instead of the ref resolving to None)
+ # This enables us to tell the difference between "known removal" vs. "garbage collected"
+ if self._node is None:
+ raise RuntimeError("Cannot use ValueReference after remove()")
+ node = self._node()
+ if node is None:
+ raise RuntimeError("ValueReference is invalid (garbage collected)")
+ return node
+
+ @property
+ def value(self):
+ # type: () -> str
+ """Resolve the reference into a str"""
+ return self._render(self._resolve_node().value)
+
+ @value.setter
+ def value(self, new_value):
+ # type: (str) -> None
+ """Update the reference value
+
+ Updating the value via this method will *not* invalidate the reference (or other
+ references to the same container).
+
+ This can raise an exception if the new value does not follow the requirements
+ for the referenced values. As an example, values in whitespace separated
+ lists cannot contain spaces and would trigger an exception.
+ """
+ self._resolve_node().value = self._value_factory(new_value)
+ if self._mutation_notifier is not None:
+ self._mutation_notifier()
+
+ @property
+ def locatable(self):
+ # type: () -> Locatable
+ """Reference to a locatable that can be used to determine where this value is"""
+ return self._resolve_node().value
+
+ def remove(self):
+ # type: () -> None
+ """Remove the underlying value
+
+ This will invalidate the ValueReference (and any other ValueReferences pointing
+ to that exact value). The validity of other ValueReferences to that container
+ remains unaffected.
+ """
+ self._removal_handler(
+ cast("LinkedListNode[TokenOrElement]", self._resolve_node())
+ )
+ self._node = None
+
+
+if sys.version_info >= (3, 9) or TYPE_CHECKING:
+ _Deb822ParsedTokenList_ContextManager = contextlib.AbstractContextManager[T]
+else:
+ # Python 3.5 - 3.8 compat - we are not allowed to subscript the abc.Iterator
+ # - use this little hack to work around it
+ # Note that Python 3.5 is so old that it does not have AbstractContextManager,
+ # so we re-implement it here.
+ class _Deb822ParsedTokenList_ContextManager(Generic[T]):
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return None
+
+
+class Deb822ParsedTokenList(
+ Generic[VE, ST],
+ _Deb822ParsedTokenList_ContextManager["Deb822ParsedTokenList[VE, ST]"],
+):
+
+ def __init__(
+ self,
+ kvpair_element, # type: 'Deb822KeyValuePairElement'
+ interpreted_value_element, # type: Deb822InterpretationProxyElement
+ vtype, # type: Type[VE]
+ stype, # type: Type[ST]
+ str2value_parser, # type: StrToValueParser[VE]
+ default_separator_factory, # type: Callable[[], ST]
+ render, # type: Callable[[VE], str]
+ ):
+ # type: (...) -> None
+ self._kvpair_element = kvpair_element
+ self._proxy_element = interpreted_value_element
+ self._token_list = LinkedList(interpreted_value_element.parts)
+ self._vtype = vtype
+ self._stype = stype
+ self._str2value_parser = str2value_parser
+ self._default_separator_factory = default_separator_factory
+ self._value_factory = _parser_to_value_factory(str2value_parser, vtype)
+ self._render = render
+ self._format_preserve_original_formatting = True
+ self._formatter = (
+ one_value_per_line_trailing_separator
+ ) # type: FormatterCallback
+ self._changed = False
+ self.__continuation_line_char = None # type: Optional[str]
+ assert self._token_list
+ last_token = self._token_list.tail
+
+ if last_token is not None and isinstance(
+ last_token, Deb822NewlineAfterValueToken
+ ):
+ # We always remove the last newline (if present), because then
+ # adding values will happen after the last value rather than on
+ # a new line by default.
+ #
+ # On write, we always ensure the value ends on a newline (even
+ # if it did not before). This is simpler and should be a
+ # non-issue in practise.
+ self._token_list.pop()
+
+ def __iter__(self):
+ # type: () -> Iterator[str]
+ yield from (self._render(v) for v in self.value_parts)
+
+ def __bool__(self):
+ # type: () -> bool
+ return next(iter(self), None) is not None
+
+ def __exit__(
+ self,
+ exc_type, # type: Optional[Type[BaseException]]
+ exc_val, # type: Optional[BaseException]
+ exc_tb, # type: Optional[TracebackType]
+ ):
+ # type: (...) -> Optional[bool]
+ if exc_type is None and self._changed:
+ self._update_field()
+ return super().__exit__(exc_type, exc_val, exc_tb)
+
+ @property
+ def value_parts(self):
+ # type: () -> Iterator[VE]
+ yield from (v for v in self._token_list if isinstance(v, self._vtype))
+
+ def _mark_changed(self):
+ # type: () -> None
+ self._changed = True
+
+ def iter_value_references(self):
+ # type: () -> Iterator[ValueReference[VE]]
+ """Iterate over all values in the list (as ValueReferences)
+
+ This is useful for doing inplace modification of the values or even
+ streaming removal of field values. It is in general also more
+ efficient when more than one value is updated or removed.
+ """
+ yield from (
+ ValueReference(
+ cast("LinkedListNode[VE]", n),
+ self._render,
+ self._value_factory,
+ self._remove_node,
+ self._mark_changed,
+ )
+ for n in self._token_list.iter_nodes()
+ if isinstance(n.value, self._vtype)
+ )
+
+ def append_separator(self, space_after_separator=True):
+ # type: (bool) -> None
+
+ separator_token = self._default_separator_factory()
+ if separator_token.is_whitespace:
+ space_after_separator = False
+
+ self._changed = True
+ self._append_continuation_line_token_if_necessary()
+ self._token_list.append(separator_token)
+
+ if space_after_separator and not separator_token.is_whitespace:
+ self._token_list.append(Deb822WhitespaceToken(" "))
+
+ def replace(self, orig_value, new_value):
+ # type: (str, str) -> None
+ """Replace the first instance of a value with another
+
+ This method will *not* affect the validity of ValueReferences.
+ """
+ vtype = self._vtype
+ for node in self._token_list.iter_nodes():
+ if isinstance(node.value, vtype) and self._render(node.value) == orig_value:
+ node.value = self._value_factory(new_value)
+ self._changed = True
+ break
+ else:
+ raise ValueError("list.replace(x, y): x not in list")
+
+ def remove(self, value):
+ # type: (str) -> None
+ """Remove the first instance of a value
+
+ Removal will invalidate ValueReferences to the value being removed.
+ ValueReferences to other values will be unaffected.
+ """
+ vtype = self._vtype
+ for node in self._token_list.iter_nodes():
+ if isinstance(node.value, vtype) and self._render(node.value) == value:
+ node_to_remove = node
+ break
+ else:
+ raise ValueError("list.remove(x): x not in list")
+
+ return self._remove_node(node_to_remove)
+
+ def _remove_node(self, node_to_remove):
+ # type: (LinkedListNode[TokenOrElement]) -> None
+ vtype = self._vtype
+ self._changed = True
+
+ # We naively want to remove the node and every thing to the left of it
+ # until the previous value. That is the basic idea for now (ignoring
+ # special-cases for now).
+ #
+ # Example:
+ #
+ # """
+ # Multiline-Keywords: bar[
+ # # Comment about foo
+ # foo]
+ # baz
+ # Keywords: bar[ foo] baz
+ # Comma-List: bar[, foo], baz,
+ # Multiline-Comma-List: bar[,
+ # # Comment about foo
+ # foo],
+ # baz,
+ # """
+ #
+ # Assuming we want to remove "foo" for the lists, the []-markers
+ # show what we aim to remove. This has the nice side-effect of
+ # preserving whether nor not the value has a trailing separator.
+ # Note that we do *not* attempt to repair missing separators but
+ # it may fix duplicated separators by "accident".
+ #
+ # Now, there are two special cases to be aware of, where this approach
+ # has short comings:
+ #
+ # 1) If foo is the only value (in which case, "delete everything"
+ # is the only option).
+ # 2) If foo is the first value
+ # 3) If foo is not the only value on the line and we see a comment
+ # inside the deletion range.
+ #
+ # For 2) + 3), we attempt to flip and range to delete and every
+ # thing after it (up to but exclusion "baz") instead. This
+ # definitely fixes 3), but 2) has yet another corner case, namely:
+ #
+ # """
+ # Multiline-Comma-List: foo,
+ # # Remark about bar
+ # bar,
+ # Another-Case: foo
+ # # Remark, also we use leading separator
+ # , bar
+ # """
+ #
+ # The options include:
+ #
+ # A) Discard the comment - brain-dead simple
+ # B) Hoist the comment up to a field comment, but then what if the
+ # field already has a comment?
+ # C) Clear the first value line leaving just the newline and
+ # replace the separator before "bar" (if present) with a space.
+ # (leaving you with the value of the form "\n# ...\n bar")
+ #
+
+ first_value_on_lhs = None # type: Optional[LinkedListNode[TokenOrElement]]
+ first_value_on_rhs = None # type: Optional[LinkedListNode[TokenOrElement]]
+ comment_before_previous_value = False
+ comment_before_next_value = False
+ for past_node in node_to_remove.iter_previous(skip_current=True):
+ past_token = past_node.value
+ if isinstance(past_token, Deb822Token) and past_token.is_comment:
+ comment_before_previous_value = True
+ continue
+ if isinstance(past_token, vtype):
+ first_value_on_lhs = past_node
+ break
+
+ for future_node in node_to_remove.iter_next(skip_current=True):
+ future_token = future_node.value
+ if isinstance(future_token, Deb822Token) and future_token.is_comment:
+ comment_before_next_value = True
+ continue
+ if isinstance(future_token, vtype):
+ first_value_on_rhs = future_node
+ break
+
+ if first_value_on_rhs is None and first_value_on_lhs is None:
+ # This was the last value, just remove everything.
+ self._token_list.clear()
+ return
+
+ if first_value_on_lhs is not None and not comment_before_previous_value:
+ # Delete left
+ delete_lhs_of_node = True
+ elif first_value_on_rhs is not None and not comment_before_next_value:
+ # Delete right
+ delete_lhs_of_node = False
+ else:
+ # There is a comment on either side (or no value on one and a
+ # comment and the other). Keep it simple, we just delete to
+ # one side (preferring deleting to left if possible).
+ delete_lhs_of_node = first_value_on_lhs is not None
+
+ if delete_lhs_of_node:
+ first_remain_lhs = first_value_on_lhs
+ first_remain_rhs = node_to_remove.next_node
+ else:
+ first_remain_lhs = node_to_remove.previous_node
+ first_remain_rhs = first_value_on_rhs
+
+ # Actual deletion - with some manual labour to update HEAD/TAIL of
+ # the list in case we do a "delete everything left/right this node".
+ if first_remain_lhs is None:
+ self._token_list.head_node = first_remain_rhs
+ if first_remain_rhs is None:
+ self._token_list.tail_node = first_remain_lhs
+ LinkedListNode.link_nodes(first_remain_lhs, first_remain_rhs)
+
+ def append(self, value):
+ # type: (str) -> None
+ vt = self._value_factory(value)
+ self.append_value(vt)
+
+ def append_value(self, vt):
+ # type: (VE) -> None
+ value_parts = self._token_list
+ if value_parts:
+ needs_separator = False
+ stype = self._stype
+ vtype = self._vtype
+ for t in reversed(value_parts):
+ if isinstance(t, vtype):
+ needs_separator = True
+ break
+ if isinstance(t, stype):
+ break
+
+ if needs_separator:
+ self.append_separator()
+ else:
+ # Looks nicer if there is a space before the very first value
+ self._token_list.append(Deb822WhitespaceToken(" "))
+ self._append_continuation_line_token_if_necessary()
+ self._changed = True
+ value_parts.append(vt)
+
+ def _previous_is_newline(self):
+ # type: () -> bool
+ tail = self._token_list.tail
+ return tail is not None and tail.convert_to_text().endswith("\n")
+
+ def append_newline(self):
+ # type: () -> None
+ if self._previous_is_newline():
+ raise ValueError(
+ "Cannot add a newline after a token that ends on a newline"
+ )
+ self._token_list.append(Deb822NewlineAfterValueToken())
+
+ def append_comment(self, comment_text):
+ # type: (str) -> None
+ tail = self._token_list.tail
+ if tail is None or not tail.convert_to_text().endswith("\n"):
+ self.append_newline()
+ comment_token = Deb822CommentToken(_format_comment(comment_text))
+ self._token_list.append(comment_token)
+
+ @property
+ def _continuation_line_char(self):
+ # type: () -> str
+ char = self.__continuation_line_char
+ if char is None:
+ # Use ' ' by default but match the existing field if possible.
+ char = " "
+ for token in self._token_list:
+ if isinstance(token, Deb822ValueContinuationToken):
+ char = token.text
+ break
+ self.__continuation_line_char = char
+ return char
+
+ def _append_continuation_line_token_if_necessary(self):
+ # type: () -> None
+ tail = self._token_list.tail
+ if tail is not None and tail.convert_to_text().endswith("\n"):
+ self._token_list.append(
+ Deb822ValueContinuationToken(self._continuation_line_char)
+ )
+
+ def reformat_when_finished(self):
+ # type: () -> None
+ self._enable_reformatting()
+ self._changed = True
+
+ def _enable_reformatting(self):
+ # type: () -> None
+ self._format_preserve_original_formatting = False
+
+ def no_reformatting_when_finished(self):
+ # type: () -> None
+ self._format_preserve_original_formatting = True
+
+ def value_formatter(
+ self,
+ formatter, # type: FormatterCallback
+ force_reformat=False, # type: bool
+ ):
+ # type: (...) -> None
+ """Use a custom formatter when formatting the value
+
+ :param formatter: A formatter (see debian._deb822_repro.formatter.format_field
+ for details)
+ :param force_reformat: If True, always reformat the field even if there are
+ no (other) changes performed. By default, fields are only reformatted if
+ they are changed.
+ """
+ self._formatter = formatter
+ self._format_preserve_original_formatting = False
+ if force_reformat:
+ self._changed = True
+
+ def clear(self):
+ # type: () -> None
+ """Like list.clear() - removes all content (including comments and spaces)"""
+ if self._token_list:
+ self._changed = True
+ self._token_list.clear()
+
+ def _iter_content_as_tokens(self):
+ # type: () -> Iterable[Deb822Token]
+ for te in self._token_list:
+ if isinstance(te, Deb822Element):
+ yield from te.iter_tokens()
+ else:
+ yield te
+
+ def _generate_reformatted_field_content(self):
+ # type: () -> str
+ separator_token = self._default_separator_factory()
+ vtype = self._vtype
+ stype = self._stype
+ token_list = self._token_list
+
+ def _token_iter():
+ # type: () -> Iterator[FormatterContentToken]
+ text = "" # type: str
+ for te in token_list:
+ if isinstance(te, Deb822Token):
+ if te.is_comment:
+ yield FormatterContentToken.comment_token(te.text)
+ elif isinstance(te, stype):
+ text = te.text
+ yield FormatterContentToken.separator_token(text)
+ else:
+ assert isinstance(te, vtype)
+ text = te.convert_to_text()
+ yield FormatterContentToken.value_token(text)
+
+ return format_field(
+ self._formatter,
+ self._kvpair_element.field_name,
+ FormatterContentToken.separator_token(separator_token.text),
+ _token_iter(),
+ )
+
+ def _generate_field_content(self):
+ # type: () -> str
+ return "".join(t.text for t in self._iter_content_as_tokens())
+
+ def _update_field(self):
+ # type: () -> None
+ kvpair_element = self._kvpair_element
+ field_name = kvpair_element.field_name
+ token_list = self._token_list
+ tail = token_list.tail
+ had_tokens = False
+
+ for t in self._iter_content_as_tokens():
+ had_tokens = True
+ if not t.is_comment and not t.is_whitespace:
+ break
+ else:
+ if had_tokens:
+ raise ValueError(
+ "Field must be completely empty or have content "
+ "(i.e. non-whitespace and non-comments)"
+ )
+ if tail is not None:
+ if isinstance(tail, Deb822Token) and tail.is_comment:
+ raise ValueError("Fields must not end on a comment")
+ if not tail.convert_to_text().endswith("\n"):
+ # Always end on a newline
+ self.append_newline()
+
+ if self._format_preserve_original_formatting:
+ value_text = self._generate_field_content()
+ text = ":".join((field_name, value_text))
+ else:
+ text = self._generate_reformatted_field_content()
+
+ new_content = text.splitlines(keepends=True)
+ else:
+ # Special-case for the empty list which will be mapped to
+ # an empty field. Always end on a newline (avoids errors
+ # if there is a field after this)
+ new_content = [field_name + ":\n"]
+
+ # As absurd as it might seem, it is easier to just use the parser to
+ # construct the AST correctly
+ deb822_file = parse_deb822_file(iter(new_content))
+ error_token = deb822_file.find_first_error_element()
+ if error_token:
+ # _print_ast(deb822_file)
+ raise ValueError("Syntax error in new field value for " + field_name)
+ paragraph = next(iter(deb822_file))
+ assert isinstance(paragraph, Deb822NoDuplicateFieldsParagraphElement)
+ new_kvpair_element = paragraph.get_kvpair_element(field_name)
+ assert new_kvpair_element is not None
+ kvpair_element.value_element = new_kvpair_element.value_element
+ self._changed = False
+
+ def sort_elements(
+ self,
+ *,
+ key=None, # type: Optional[Callable[[VE], Any]]
+ reverse=False, # type: bool
+ ):
+ # type: (...) -> None
+ """Sort the elements (abstract values) in this list.
+
+ This method will sort the logical values of the list. It will
+ attempt to preserve comments associated with a given value where
+ possible. Whether space and separators are preserved depends on
+ the contents of the field as well as the formatting settings.
+
+ Sorting (without reformatting) is likely to leave you with "awkward"
+ whitespace. Therefore, you almost always want to apply reformatting
+ such as the reformat_when_finished() method.
+
+ Sorting will invalidate all ValueReferences.
+ """
+ comment_start_node = None
+ vtype = self._vtype
+ stype = self._stype
+
+ def key_func(x):
+ # type: (Tuple[VE, List[TokenOrElement]]) -> Any
+ if key:
+ return key(x[0])
+ return x[0].convert_to_text()
+
+ parts = []
+
+ for node in self._token_list.iter_nodes():
+ value = node.value
+ if isinstance(value, Deb822Token) and value.is_comment:
+ if comment_start_node is None:
+ comment_start_node = node
+ continue
+
+ if isinstance(value, vtype):
+ comments = []
+ if comment_start_node is not None:
+ for keep_node in comment_start_node.iter_next(skip_current=False):
+ if keep_node is node:
+ break
+ comments.append(keep_node.value)
+ parts.append((value, comments))
+ comment_start_node = None
+
+ parts.sort(key=key_func, reverse=reverse)
+
+ self._changed = True
+ self._token_list.clear()
+ first_value = True
+
+ separator_is_space = self._default_separator_factory().is_whitespace
+
+ for value, comments in parts:
+ if first_value:
+ first_value = False
+ if comments:
+ # While unlikely, there could be a separator between the comments.
+ # It would be in the way and we remove it.
+ comments = [x for x in comments if not isinstance(x, stype)]
+ # Comments cannot start the field, so inject a newline to
+ # work around that
+ self.append_newline()
+ else:
+ if not separator_is_space and not any(
+ isinstance(x, stype) for x in comments
+ ):
+ # While unlikely, you can hide a comma between two comments and expect
+ # us to preserve it. However, the more common case is that the separator
+ # appeared before the comments and was thus omitted (leaving us to re-add
+ # it here).
+ self.append_separator(space_after_separator=False)
+ if comments:
+ self.append_newline()
+ else:
+ self._token_list.append(Deb822WhitespaceToken(" "))
+
+ self._token_list.extend(comments)
+ self.append_value(value)
+
+ def sort(
+ self,
+ *,
+ key=None, # type: Optional[Callable[[str], Any]]
+ **kwargs, # type: Any
+ ):
+ # type: (...) -> None
+ """Sort the values (rendered as str) in this list.
+
+ This method will sort the logical values of the list. It will
+ attempt to preserve comments associated with a given value where
+ possible. Whether space and separators are preserved depends on
+ the contents of the field as well as the formatting settings.
+
+ Sorting (without reformatting) is likely to leave you with "awkward"
+ whitespace. Therefore, you almost always want to apply reformatting
+ such as the reformat_when_finished() method.
+
+ Sorting will invalidate all ValueReferences.
+ """
+ if key is not None:
+ render = self._render
+ kwargs["key"] = lambda vt: key(render(vt))
+ self.sort_elements(**kwargs)
+
+
+class Interpretation(Generic[T]):
+
+ def interpret(
+ self,
+ kvpair_element, # type: Deb822KeyValuePairElement
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> T
+ raise NotImplementedError # pragma: no cover
+
+
+class GenericContentBasedInterpretation(Interpretation[T], Generic[T, VE]):
+
+ def __init__(
+ self,
+ tokenizer, # type: Callable[[str], Iterable['Deb822Token']]
+ value_parser, # type: StreamingValueParser[VE]
+ ):
+ # type: (...) -> None
+ super().__init__()
+ self._tokenizer = tokenizer
+ self._value_parser = value_parser
+
+ def _high_level_interpretation(
+ self,
+ kvpair_element, # type: Deb822KeyValuePairElement
+ proxy_element, # type: Deb822InterpretationProxyElement
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> T
+ raise NotImplementedError # pragma: no cover
+
+ def _parse_stream(
+ self, buffered_iterator # type: BufferingIterator[Deb822Token]
+ ):
+ # type: (...) -> Iterable[Union[Deb822Token, VE]]
+
+ value_parser = self._value_parser
+ for token in buffered_iterator:
+ if isinstance(token, Deb822ValueToken):
+ yield value_parser(token, buffered_iterator)
+ else:
+ yield token
+
+ def _parse_kvpair(
+ self, kvpair # type: Deb822KeyValuePairElement
+ ):
+ # type: (...) -> Deb822InterpretationProxyElement
+ value_element = kvpair.value_element
+ content = value_element.convert_to_text()
+ token_list = [] # type: List['TokenOrElement']
+ token_list.extend(self._parse_str(content))
+ return Deb822InterpretationProxyElement(value_element, token_list)
+
+ def _parse_str(self, content):
+ # type: (str) -> Iterable[Union[Deb822Token, VE]]
+ content_len = len(content)
+ biter = BufferingIterator(
+ len_check_iterator(
+ content,
+ self._tokenizer(content),
+ content_len=content_len,
+ )
+ )
+ yield from len_check_iterator(
+ content,
+ self._parse_stream(biter),
+ content_len=content_len,
+ )
+
+ def interpret(
+ self,
+ kvpair_element, # type: Deb822KeyValuePairElement
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> T
+ proxy_element = self._parse_kvpair(kvpair_element)
+ return self._high_level_interpretation(
+ kvpair_element,
+ proxy_element,
+ discard_comments_on_read=discard_comments_on_read,
+ )
+
+
+def _parser_to_value_factory(
+ parser, # type: StrToValueParser[VE]
+ vtype, # type: Type[VE]
+):
+ # type: (...) -> Callable[[str], VE]
+ def _value_factory(v):
+ # type: (str) -> VE
+ if v == "":
+ raise ValueError("The empty string is not a value")
+ token_iter = iter(parser(v))
+ t1 = next(token_iter, None) # type: Optional[Union[TokenOrElement]]
+ t2 = next(token_iter, None)
+ assert t1 is not None, (
+ 'Bad parser - it returned None (or no TE) for "' + v + '"'
+ )
+ if t2 is not None:
+ msg = textwrap.dedent(
+ """\
+ The input "{v}" should have been exactly one element, but the parser provided at
+ least two. This can happen with unnecessary leading/trailing whitespace
+ or including commas the value for a comma list.
+ """
+ ).format(v=v)
+ raise ValueError(msg)
+ if not isinstance(t1, vtype):
+ if isinstance(t1, Deb822Token) and (t1.is_comment or t1.is_whitespace):
+ raise ValueError(
+ 'The input "{v}" is whitespace or a comment: Expected a value'
+ )
+ msg = (
+ 'The input "{v}" should have produced a element of type {vtype_name}, but'
+ " instead it produced {t1}"
+ )
+ raise ValueError(msg.format(v=v, vtype_name=vtype.__name__, t1=t1))
+
+ assert len(t1.convert_to_text()) == len(v), (
+ "Bad tokenizer - the token did not cover the input text"
+ " exactly ({t1_len} != {v_len}".format(
+ t1_len=len(t1.convert_to_text()), v_len=len(v)
+ )
+ )
+ return t1
+
+ return _value_factory
+
+
+class ListInterpretation(
+ GenericContentBasedInterpretation[Deb822ParsedTokenList[VE, ST], VE]
+):
+
+ def __init__(
+ self,
+ tokenizer, # type: Callable[[str], Iterable['Deb822Token']]
+ value_parser, # type: StreamingValueParser[VE]
+ vtype, # type: Type[VE]
+ stype, # type: Type[ST]
+ default_separator_factory, # type: Callable[[], ST]
+ render_factory, # type: Callable[[bool], Callable[[VE], str]]
+ ):
+ # type: (...) -> None
+ super().__init__(tokenizer, value_parser)
+ self._vtype = vtype
+ self._stype = stype
+ self._default_separator_factory = default_separator_factory
+ self._render_factory = render_factory
+
+ def _high_level_interpretation(
+ self,
+ kvpair_element, # type: Deb822KeyValuePairElement
+ proxy_element, # type: Deb822InterpretationProxyElement
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> Deb822ParsedTokenList[VE, ST]
+ return Deb822ParsedTokenList(
+ kvpair_element,
+ proxy_element,
+ self._vtype,
+ self._stype,
+ self._parse_str,
+ self._default_separator_factory,
+ self._render_factory(discard_comments_on_read),
+ )
+
+
+def _parse_whitespace_list_value(token, _):
+ # type: (Deb822Token, BufferingIterator[Deb822Token]) -> Deb822ParsedValueElement
+ return Deb822ParsedValueElement([token])
+
+
+def _is_comma_token(v):
+ # type: (TokenOrElement) -> bool
+ # Consume tokens until the next comma
+ return isinstance(v, Deb822CommaToken)
+
+
+def _parse_comma_list_value(token, buffered_iterator):
+ # type: (Deb822Token, BufferingIterator[Deb822Token]) -> Deb822ParsedValueElement
+ comma_offset = buffered_iterator.peek_find(_is_comma_token)
+ value_parts = [token]
+ if comma_offset is not None:
+ # The value is followed by a comma and now we know where it ends
+ value_parts.extend(buffered_iterator.peek_many(comma_offset - 1))
+ else:
+ # The value is the last value there is. Consume all remaining tokens
+ # and then trim from the right.
+ value_parts.extend(buffered_iterator.peek_buffer())
+ while value_parts and not isinstance(value_parts[-1], Deb822ValueToken):
+ value_parts.pop()
+
+ buffered_iterator.consume_many(len(value_parts) - 1)
+ return Deb822ParsedValueElement(value_parts)
+
+
+def _parse_uploaders_list_value(token, buffered_iterator):
+ # type: (Deb822Token, BufferingIterator[Deb822Token]) -> Deb822ParsedValueElement
+
+ # This is similar to _parse_comma_list_value *except* that there is an extra special
+ # case. Namely comma only counts as a true separator if it follows ">"
+ value_parts = [token]
+ comma_offset = -1 # type: Optional[int]
+ while comma_offset is not None:
+ comma_offset = buffered_iterator.peek_find(_is_comma_token)
+ if comma_offset is not None:
+ # The value is followed by a comma. Verify that this is a terminating
+ # comma (comma may appear in the name or email)
+ #
+ # We include value_parts[-1] to easily cope with the common case of
+ # "foo <a@b.com>," where we will have 0 peeked element to examine.
+ peeked_elements = [value_parts[-1]]
+ peeked_elements.extend(buffered_iterator.peek_many(comma_offset - 1))
+ comma_was_separator = False
+ i = len(peeked_elements) - 1
+ while i >= 0:
+ token = peeked_elements[i]
+ if isinstance(token, Deb822ValueToken):
+ if token.text.endswith(">"):
+ # The comma terminates the value
+ value_parts.extend(buffered_iterator.consume_many(i))
+ assert isinstance(
+ value_parts[-1], Deb822ValueToken
+ ) and value_parts[-1].text.endswith(">"), "Got: " + str(
+ value_parts
+ )
+ comma_was_separator = True
+ break
+ i -= 1
+ if comma_was_separator:
+ break
+ value_parts.extend(buffered_iterator.consume_many(comma_offset))
+ assert isinstance(value_parts[-1], Deb822CommaToken)
+ else:
+ # The value is the last value there is. Consume all remaining tokens
+ # and then trim from the right.
+ remaining_part = buffered_iterator.peek_buffer()
+ consume_elements = len(remaining_part)
+ value_parts.extend(remaining_part)
+ while value_parts and not isinstance(value_parts[-1], Deb822ValueToken):
+ value_parts.pop()
+ consume_elements -= 1
+ buffered_iterator.consume_many(consume_elements)
+
+ return Deb822ParsedValueElement(value_parts)
+
+
+class Deb822Element(Locatable):
+ """Composite elements (consists of 1 or more tokens)"""
+
+ __slots__ = ("_parent_element", "_full_size_cache", "__weakref__")
+
+ def __init__(self):
+ # type: () -> None
+ self._parent_element = None # type: Optional[ReferenceType['Deb822Element']]
+ self._full_size_cache = None # type: Optional[Range]
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ raise NotImplementedError # pragma: no cover
+
+ def iter_parts_of_type(self, only_element_or_token_type):
+ # type: (Type[TE]) -> Iterable[TE]
+ for part in self.iter_parts():
+ if isinstance(part, only_element_or_token_type):
+ yield part
+
+ def iter_tokens(self):
+ # type: () -> Iterable[Deb822Token]
+ for part in self.iter_parts():
+ # Control check to catch bugs early
+ assert part._parent_element is not None
+ if isinstance(part, Deb822Element):
+ yield from part.iter_tokens()
+ else:
+ yield part
+
+ def iter_recurse(
+ self, *, only_element_or_token_type=None # type: Optional[Type[TE]]
+ ):
+ # type: (...) -> Iterable[TE]
+ for part in self.iter_parts():
+ if only_element_or_token_type is None or isinstance(
+ part, only_element_or_token_type
+ ):
+ yield cast("TE", part)
+ if isinstance(part, Deb822Element):
+ yield from part.iter_recurse(
+ only_element_or_token_type=only_element_or_token_type
+ )
+
+ @property
+ def is_error(self):
+ # type: () -> bool
+ return False
+
+ @property
+ def is_comment(self):
+ # type: () -> bool
+ return False
+
+ @property
+ def parent_element(self):
+ # type: () -> Optional[Deb822Element]
+ return resolve_ref(self._parent_element)
+
+ @parent_element.setter
+ def parent_element(self, new_parent):
+ # type: (Optional[Deb822Element]) -> None
+ self._parent_element = (
+ weakref.ref(new_parent) if new_parent is not None else None
+ )
+
+ def _init_parent_of_parts(self):
+ # type: () -> None
+ for part in self.iter_parts():
+ part.parent_element = self
+
+ # Deliberately not a "text" property, to signal that it is not necessary cheap.
+ def convert_to_text(self):
+ # type: () -> str
+ return "".join(t.text for t in self.iter_tokens())
+
+ def clear_parent_if_parent(self, parent):
+ # type: (Deb822Element) -> None
+ if parent is self.parent_element:
+ self._parent_element = None
+
+ def size(self, *, skip_leading_comments: bool = True) -> Range:
+ size_cache = self._full_size_cache
+ if size_cache is None:
+ size_cache = Range.from_position_and_sizes(
+ START_POSITION,
+ (p.size(skip_leading_comments=False) for p in self.iter_parts()),
+ )
+ self._full_size_cache = size_cache
+ return size_cache
+
+
+class Deb822InterpretationProxyElement(Deb822Element):
+
+ __slots__ = ("parts",)
+
+ def __init__(
+ self, real_element: Deb822Element, parts: List[TokenOrElement]
+ ) -> None:
+ super().__init__()
+ self.parent_element = real_element
+ self.parts = parts
+ for p in parts:
+ p.parent_element = self
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ return iter(self.parts)
+
+ def position_in_parent(self, *, skip_leading_comments: bool = True) -> Position:
+ parent = self.parent_element
+ if parent is None:
+ raise RuntimeError("parent was garbage collected")
+ return parent.position_in_parent()
+
+ def position_in_file(self, *, skip_leading_comments: bool = True) -> Position:
+ parent = self.parent_element
+ if parent is None:
+ raise RuntimeError("parent was garbage collected")
+ return parent.position_in_file()
+
+ def size(self, *, skip_leading_comments: bool = True) -> Range:
+ # Same as parent except we never use a cache.
+ sizes = (p.size(skip_leading_comments=False) for p in self.iter_parts())
+ return Range.from_position_and_sizes(START_POSITION, sizes)
+
+
+class Deb822ErrorElement(Deb822Element):
+ """Element representing elements or tokens that are out of place
+
+ Commonly, it will just be instances of Deb822ErrorToken, but it can be other
+ things. As an example if a parser discovers out of order elements/tokens,
+ it can bundle them in a Deb822ErrorElement to signal that the sequence of
+ elements/tokens are invalid (even if the tokens themselves are valid).
+ """
+
+ __slots__ = ("_parts",)
+
+ def __init__(self, parts):
+ # type: (Sequence[TokenOrElement]) -> None
+ super().__init__()
+ self._parts = tuple(parts)
+ self._init_parent_of_parts()
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._parts
+
+ @property
+ def is_error(self):
+ # type: () -> bool
+ return True
+
+
+class Deb822ValueLineElement(Deb822Element):
+ """Consists of one "line" of a value"""
+
+ __slots__ = (
+ "_comment_element",
+ "_continuation_line_token",
+ "_leading_whitespace_token",
+ "_value_tokens",
+ "_trailing_whitespace_token",
+ "_newline_token",
+ )
+
+ def __init__(
+ self,
+ comment_element, # type: Optional[Deb822CommentElement]
+ continuation_line_token, # type: Optional[Deb822ValueContinuationToken]
+ leading_whitespace_token, # type: Optional[Deb822WhitespaceToken]
+ value_parts, # type: List[TokenOrElement]
+ trailing_whitespace_token, # type: Optional[Deb822WhitespaceToken]
+ # only optional if it is the last line of the file and the file does not
+ # end with a newline.
+ newline_token, # type: Optional[Deb822WhitespaceToken]
+ ):
+ # type: (...) -> None
+ super().__init__()
+ if comment_element is not None and continuation_line_token is None:
+ raise ValueError("Only continuation lines can have comments")
+ self._comment_element = comment_element # type: Optional[Deb822CommentElement]
+ self._continuation_line_token = continuation_line_token
+ self._leading_whitespace_token = (
+ leading_whitespace_token
+ ) # type: Optional[Deb822WhitespaceToken]
+ self._value_tokens = value_parts # type: List[TokenOrElement]
+ self._trailing_whitespace_token = trailing_whitespace_token
+ self._newline_token = newline_token # type: Optional[Deb822WhitespaceToken]
+ self._init_parent_of_parts()
+
+ @property
+ def comment_element(self):
+ # type: () -> Optional[Deb822CommentElement]
+ return self._comment_element
+
+ @property
+ def continuation_line_token(self):
+ # type: () -> Optional[Deb822ValueContinuationToken]
+ return self._continuation_line_token
+
+ @property
+ def newline_token(self):
+ # type: () -> Optional[Deb822WhitespaceToken]
+ return self._newline_token
+
+ def add_newline_if_missing(self):
+ # type: () -> bool
+ if self._newline_token is None:
+ self._newline_token = Deb822NewlineAfterValueToken()
+ self._newline_token.parent_element = self
+ self._full_size_cache = None
+ return True
+ return False
+
+ def _iter_content_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ if self._leading_whitespace_token:
+ yield self._leading_whitespace_token
+ yield from self._value_tokens
+ if self._trailing_whitespace_token:
+ yield self._trailing_whitespace_token
+
+ def _iter_content_tokens(self):
+ # type: () -> Iterable[Deb822Token]
+ for part in self._iter_content_parts():
+ if isinstance(part, Deb822Element):
+ yield from part.iter_tokens()
+ else:
+ yield part
+
+ def convert_content_to_text(self):
+ # type: () -> str
+ if (
+ len(self._value_tokens) == 1
+ and not self._leading_whitespace_token
+ and not self._trailing_whitespace_token
+ and isinstance(self._value_tokens[0], Deb822Token)
+ ):
+ # By default, we get a single value spanning the entire line
+ # (minus continuation line and newline, but we are supposed to
+ # exclude those)
+ return self._value_tokens[0].text
+
+ return "".join(t.text for t in self._iter_content_tokens())
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ if self._comment_element:
+ yield self._comment_element
+ if self._continuation_line_token:
+ yield self._continuation_line_token
+ yield from self._iter_content_parts()
+ if self._newline_token:
+ yield self._newline_token
+
+ def size(self, *, skip_leading_comments: bool = True) -> Range:
+ if skip_leading_comments:
+ return Range.from_position_and_sizes(
+ START_POSITION,
+ (
+ p.size(skip_leading_comments=False)
+ for p in self.iter_parts()
+ if not p.is_comment
+ ),
+ )
+ return super().size(skip_leading_comments=skip_leading_comments)
+
+ def position_in_parent(self, *, skip_leading_comments: bool = True) -> Position:
+ base_pos = super().position_in_parent(skip_leading_comments=False)
+ if skip_leading_comments:
+ for p in self.iter_parts():
+ if p.is_comment:
+ continue
+ non_comment_pos = p.position_in_parent(skip_leading_comments=False)
+ base_pos = non_comment_pos.relative_to(base_pos)
+ return base_pos
+
+
+class Deb822ValueElement(Deb822Element):
+ __slots__ = ("_value_entry_elements",)
+
+ def __init__(self, value_entry_elements):
+ # type: (Sequence[Deb822ValueLineElement]) -> None
+ super().__init__()
+ # Split over two lines due to line length issues
+ v = tuple(value_entry_elements)
+ self._value_entry_elements = v # type: Sequence[Deb822ValueLineElement]
+ self._init_parent_of_parts()
+
+ @property
+ def value_lines(self):
+ # type: () -> Sequence[Deb822ValueLineElement]
+ """Read-only list of value entries"""
+ return self._value_entry_elements
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._value_entry_elements
+
+ def add_final_newline_if_missing(self):
+ # type: () -> bool
+ if self._value_entry_elements:
+ changed = self._value_entry_elements[-1].add_newline_if_missing()
+ if changed:
+ self._full_size_cache = None
+ return changed
+ return False
+
+
+class Deb822ParsedValueElement(Deb822Element):
+
+ __slots__ = ("_text_cached", "_text_no_comments_cached", "_token_list")
+
+ def __init__(self, tokens):
+ # type: (List[Deb822Token]) -> None
+ super().__init__()
+ self._token_list = tokens
+ self._init_parent_of_parts()
+ if not isinstance(tokens[0], Deb822ValueToken) or not isinstance(
+ tokens[-1], Deb822ValueToken
+ ):
+ raise ValueError(
+ self.__class__.__name__ + " MUST start and end on a Deb822ValueToken"
+ )
+ if len(tokens) == 1:
+ token = tokens[0]
+ self._text_cached = token.text # type: Optional[str]
+ self._text_no_comments_cached = token.text # type: Optional[str]
+ else:
+ self._text_cached = None
+ self._text_no_comments_cached = None
+
+ def convert_to_text(self):
+ # type: () -> str
+ if self._text_no_comments_cached is None:
+ self._text_no_comments_cached = super().convert_to_text()
+ return self._text_no_comments_cached
+
+ def convert_to_text_without_comments(self):
+ # type: () -> str
+ if self._text_no_comments_cached is None:
+ self._text_no_comments_cached = "".join(
+ t.text for t in self.iter_tokens() if not t.is_comment
+ )
+ return self._text_no_comments_cached
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._token_list
+
+
+class Deb822CommentElement(Deb822Element):
+ __slots__ = ("_comment_tokens",)
+
+ def __init__(self, comment_tokens):
+ # type: (Sequence[Deb822CommentToken]) -> None
+ super().__init__()
+ self._comment_tokens = tuple(
+ comment_tokens
+ ) # type: Sequence[Deb822CommentToken]
+ if not comment_tokens: # pragma: no cover
+ raise ValueError("Comment elements must have at least one comment token")
+ self._init_parent_of_parts()
+
+ @property
+ def is_comment(self):
+ # type: () -> bool
+ return True
+
+ def __len__(self):
+ # type: () -> int
+ return len(self._comment_tokens)
+
+ def __getitem__(self, item):
+ # type: (int) -> Deb822CommentToken
+ return self._comment_tokens[item]
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._comment_tokens
+
+
+class Deb822KeyValuePairElement(Deb822Element):
+ __slots__ = (
+ "_comment_element",
+ "_field_token",
+ "_separator_token",
+ "_value_element",
+ )
+
+ def __init__(
+ self,
+ comment_element, # type: Optional[Deb822CommentElement]
+ field_token, # type: Deb822FieldNameToken
+ separator_token, # type: Deb822FieldSeparatorToken
+ value_element, # type: Deb822ValueElement
+ ):
+ # type: (...) -> None
+ super().__init__()
+ self._comment_element = comment_element # type: Optional[Deb822CommentElement]
+ self._field_token = field_token # type: Deb822FieldNameToken
+ self._separator_token = separator_token # type: Deb822FieldSeparatorToken
+ self._value_element = value_element # type: Deb822ValueElement
+ self._init_parent_of_parts()
+
+ @property
+ def field_name(self):
+ # type: () -> _strI
+ return self.field_token.text
+
+ @property
+ def field_token(self):
+ # type: () -> Deb822FieldNameToken
+ return self._field_token
+
+ @property
+ def value_element(self):
+ # type: () -> Deb822ValueElement
+ return self._value_element
+
+ @value_element.setter
+ def value_element(self, new_value):
+ # type: (Deb822ValueElement) -> None
+ self._full_size_cache = None
+ self._value_element.clear_parent_if_parent(self)
+ self._value_element = new_value
+ new_value.parent_element = self
+
+ def interpret_as(
+ self,
+ interpreter, # type: Interpretation[T]
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> T
+ return interpreter.interpret(
+ self, discard_comments_on_read=discard_comments_on_read
+ )
+
+ @property
+ def comment_element(self):
+ # type: () -> Optional[Deb822CommentElement]
+ return self._comment_element
+
+ @comment_element.setter
+ def comment_element(self, value):
+ # type: (Optional[Deb822CommentElement]) -> None
+ self._full_size_cache = None
+ if value is not None:
+ if not value[-1].text.endswith("\n"):
+ raise ValueError("Field comments must end with a newline")
+ if self._comment_element:
+ self._comment_element.clear_parent_if_parent(self)
+ if value is not None:
+ value.parent_element = self
+ self._comment_element = value
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ if self._comment_element:
+ yield self._comment_element
+ yield self._field_token
+ yield self._separator_token
+ yield self._value_element
+
+ def position_in_parent(
+ self,
+ *,
+ skip_leading_comments: bool = True,
+ ) -> Position:
+ position = super().position_in_parent(skip_leading_comments=False)
+ if skip_leading_comments:
+ if self._comment_element:
+ field_pos = self._field_token.position_in_parent()
+ position = field_pos.relative_to(position)
+ return position
+
+ def size(self, *, skip_leading_comments: bool = True) -> Range:
+ if skip_leading_comments:
+ return Range.from_position_and_sizes(
+ START_POSITION,
+ (
+ p.size(skip_leading_comments=False)
+ for p in self.iter_parts()
+ if not p.is_comment
+ ),
+ )
+ return super().size(skip_leading_comments=False)
+
+
+def _format_comment(c):
+ # type: (str) -> str
+ if c == "":
+ # Special-case: Empty strings are mapped to an empty comment line
+ return "#\n"
+ if "\n" in c[:-1]:
+ raise ValueError("Comment lines must not have embedded newlines")
+ if not c.endswith("\n"):
+ c = c.rstrip() + "\n"
+ if not c.startswith("#"):
+ c = "# " + c.lstrip()
+ return c
+
+
+def _unpack_key(
+ item, # type: ParagraphKey
+ raise_if_indexed=False, # type: bool
+):
+ # type: (...) -> Tuple[_strI, Optional[int], Optional[Deb822FieldNameToken]]
+ index = None # type: Optional[int]
+ name_token = None # type: Optional[Deb822FieldNameToken]
+ if isinstance(item, tuple):
+ key, index = item
+ if raise_if_indexed:
+ # Fudge "(key, 0)" into a "key" callers to defensively support
+ # both paragraph styles with the same key.
+ if index != 0:
+ msg = 'Cannot resolve key "{key}" with index {index}. The key is not indexed'
+ raise KeyError(msg.format(key=key, index=index))
+ index = None
+ key = _strI(key)
+ else:
+ index = None
+ if isinstance(item, Deb822FieldNameToken):
+ name_token = item
+ key = name_token.text
+ else:
+ key = _strI(item)
+
+ return key, index, name_token
+
+
+def _convert_value_lines_to_lines(
+ value_lines, # type: Iterable[Deb822ValueLineElement]
+ strip_comments, # type: bool
+):
+ # type: (...) -> Iterable[str]
+ if not strip_comments:
+ yield from (v.convert_to_text() for v in value_lines)
+ else:
+ for element in value_lines:
+ yield "".join(x.text for x in element.iter_tokens() if not x.is_comment)
+
+
+if sys.version_info >= (3, 9) or TYPE_CHECKING:
+ _ParagraphMapping_Base = collections.abc.Mapping[ParagraphKey, T]
+else:
+ # Python 3.5 - 3.8 compat - we are not allowed to subscript the abc.Iterator
+ # - use this little hack to work around it
+ class _ParagraphMapping_Base(collections.abc.Mapping, Generic[T], ABC):
+ pass
+
+
+# Deb822ParagraphElement uses this Mixin (by having `_paragraph` return self).
+# Therefore, the Mixin needs to call the "proper" methods on the paragraph to
+# avoid doing infinite recursion.
+class AutoResolvingMixin(Generic[T], _ParagraphMapping_Base[T]):
+
+ @property
+ def _auto_resolve_ambiguous_fields(self):
+ # type: () -> bool
+ return True
+
+ @property
+ def _paragraph(self):
+ # type: () -> Deb822ParagraphElement
+ raise NotImplementedError # pragma: no cover
+
+ def __len__(self):
+ # type: () -> int
+ return self._paragraph.kvpair_count
+
+ def __contains__(self, item):
+ # type: (object) -> bool
+ return self._paragraph.contains_kvpair_element(item)
+
+ def __iter__(self):
+ # type: () -> Iterator[ParagraphKey]
+ return iter(self._paragraph.iter_keys())
+
+ def __getitem__(self, item):
+ # type: (ParagraphKey) -> T
+ if self._auto_resolve_ambiguous_fields and isinstance(item, str):
+ v = self._paragraph.get_kvpair_element((item, 0))
+ else:
+ v = self._paragraph.get_kvpair_element(item)
+ assert v is not None
+ return self._interpret_value(item, v)
+
+ def __delitem__(self, item):
+ # type: (ParagraphKey) -> None
+ self._paragraph.remove_kvpair_element(item)
+
+ def _interpret_value(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> T
+ raise NotImplementedError # pragma: no cover
+
+
+# Deb822ParagraphElement uses this Mixin (by having `_paragraph` return self).
+# Therefore, the Mixin needs to call the "proper" methods on the paragraph to
+# avoid doing infinite recursion.
+class Deb822ParagraphToStrWrapperMixin(AutoResolvingMixin[str], ABC):
+
+ @property
+ def _auto_map_initial_line_whitespace(self):
+ # type: () -> bool
+ return True
+
+ @property
+ def _discard_comments_on_read(self):
+ # type: () -> bool
+ return True
+
+ @property
+ def _auto_map_final_newline_in_multiline_values(self):
+ # type: () -> bool
+ return True
+
+ @property
+ def _preserve_field_comments_on_field_updates(self):
+ # type: () -> bool
+ return True
+
+ def _convert_value_to_str(self, kvpair_element):
+ # type: (Deb822KeyValuePairElement) -> str
+ value_element = kvpair_element.value_element
+ value_entries = value_element.value_lines
+ if len(value_entries) == 1:
+ # Special case single line entry (e.g. "Package: foo") as they never
+ # have comments and we can do some parts more efficient.
+ value_entry = value_entries[0]
+ t = value_entry.convert_to_text()
+ if self._auto_map_initial_line_whitespace:
+ t = t.strip()
+ return t
+
+ if self._auto_map_initial_line_whitespace or self._discard_comments_on_read:
+ converter = _convert_value_lines_to_lines(
+ value_entries,
+ self._discard_comments_on_read,
+ )
+
+ auto_map_space = self._auto_map_initial_line_whitespace
+
+ # Because we know there are more than one line, we can unconditionally inject
+ # the newline after the first line
+ as_text = "".join(
+ line.strip() + "\n" if auto_map_space and i == 1 else line
+ for i, line in enumerate(converter, start=1)
+ )
+ else:
+ # No rewrite necessary.
+ as_text = value_element.convert_to_text()
+
+ if self._auto_map_final_newline_in_multiline_values and as_text[-1] == "\n":
+ as_text = as_text[:-1]
+ return as_text
+
+ def __setitem__(self, item, value):
+ # type: (ParagraphKey, str) -> None
+ keep_comments = (
+ self._preserve_field_comments_on_field_updates
+ ) # type: Optional[bool]
+ comment = None
+ if keep_comments and self._auto_resolve_ambiguous_fields:
+ # For ambiguous fields, we have to resolve the original field as
+ # the set_field_* methods do not cope with ambiguous fields. This
+ # means we might as well clear the keep_comments flag as we have
+ # resolved the comment.
+ keep_comments = None
+ key_lookup = item
+ if isinstance(item, str):
+ key_lookup = (item, 0)
+ orig_kvpair = self._paragraph.get_kvpair_element(key_lookup, use_get=True)
+ if orig_kvpair is not None:
+ comment = orig_kvpair.comment_element
+
+ if self._auto_map_initial_line_whitespace:
+ try:
+ idx = value.index("\n")
+ except ValueError:
+ idx = -1
+ if idx == -1 or idx == len(value):
+ self._paragraph.set_field_to_simple_value(
+ item,
+ value.strip(),
+ preserve_original_field_comment=keep_comments,
+ field_comment=comment,
+ )
+ return
+ # Regenerate the first line with normalized whitespace if necessary
+ first_line, rest = value.split("\n", 1)
+ if first_line and first_line[:1] not in ("\t", " "):
+ value = "".join((" ", first_line.strip(), "\n", rest))
+ else:
+ value = "".join((first_line, "\n", rest))
+ if not value.endswith("\n"):
+ if not self._auto_map_final_newline_in_multiline_values:
+ raise ValueError(
+ "Values must end with a newline (or be single line"
+ " values and use the auto whitespace mapping feature)"
+ )
+ value += "\n"
+ self._paragraph.set_field_from_raw_string(
+ item,
+ value,
+ preserve_original_field_comment=keep_comments,
+ field_comment=comment,
+ )
+
+ def _interpret_value(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> str
+ # mypy is a bit dense and cannot see that T == str
+ return self._convert_value_to_str(value)
+
+
+class AbstractDeb822ParagraphWrapper(AutoResolvingMixin[T], ABC):
+
+ def __init__(
+ self,
+ paragraph, # type: Deb822ParagraphElement
+ *,
+ auto_resolve_ambiguous_fields=False, # type: bool
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> None
+ self.__paragraph = paragraph
+ self.__auto_resolve_ambiguous_fields = auto_resolve_ambiguous_fields
+ self.__discard_comments_on_read = discard_comments_on_read
+
+ @property
+ def _paragraph(self):
+ # type: () -> Deb822ParagraphElement
+ return self.__paragraph
+
+ @property
+ def _discard_comments_on_read(self):
+ # type: () -> bool
+ return self.__discard_comments_on_read
+
+ @property
+ def _auto_resolve_ambiguous_fields(self):
+ # type: () -> bool
+ return self.__auto_resolve_ambiguous_fields
+
+
+class Deb822InterpretingParagraphWrapper(AbstractDeb822ParagraphWrapper[T]):
+
+ def __init__(
+ self,
+ paragraph, # type: Deb822ParagraphElement
+ interpretation, # type: Interpretation[T]
+ *,
+ auto_resolve_ambiguous_fields=False, # type: bool
+ discard_comments_on_read=True, # type: bool
+ ):
+ # type: (...) -> None
+ super().__init__(
+ paragraph,
+ auto_resolve_ambiguous_fields=auto_resolve_ambiguous_fields,
+ discard_comments_on_read=discard_comments_on_read,
+ )
+ self._interpretation = interpretation
+
+ def _interpret_value(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> T
+ return self._interpretation.interpret(value)
+
+
+class Deb822DictishParagraphWrapper(
+ AbstractDeb822ParagraphWrapper[str], Deb822ParagraphToStrWrapperMixin
+):
+
+ def __init__(
+ self,
+ paragraph, # type: Deb822ParagraphElement
+ *,
+ discard_comments_on_read=True, # type: bool
+ auto_map_initial_line_whitespace=True, # type: bool
+ auto_resolve_ambiguous_fields=False, # type: bool
+ preserve_field_comments_on_field_updates=True, # type: bool
+ auto_map_final_newline_in_multiline_values=True, # type: bool
+ ):
+ # type: (...) -> None
+ super().__init__(
+ paragraph,
+ auto_resolve_ambiguous_fields=auto_resolve_ambiguous_fields,
+ discard_comments_on_read=discard_comments_on_read,
+ )
+ self.__auto_map_initial_line_whitespace = auto_map_initial_line_whitespace
+ self.__preserve_field_comments_on_field_updates = (
+ preserve_field_comments_on_field_updates
+ )
+ self.__auto_map_final_newline_in_multiline_values = (
+ auto_map_final_newline_in_multiline_values
+ )
+
+ @property
+ def _auto_map_initial_line_whitespace(self):
+ # type: () -> bool
+ return self.__auto_map_initial_line_whitespace
+
+ @property
+ def _preserve_field_comments_on_field_updates(self):
+ # type: () -> bool
+ return self.__preserve_field_comments_on_field_updates
+
+ @property
+ def _auto_map_final_newline_in_multiline_values(self):
+ # type: () -> bool
+ return self.__auto_map_final_newline_in_multiline_values
+
+
+class Deb822ParagraphElement(Deb822Element, Deb822ParagraphToStrWrapperMixin, ABC):
+
+ @classmethod
+ def new_empty_paragraph(cls):
+ # type: () -> Deb822ParagraphElement
+ return Deb822NoDuplicateFieldsParagraphElement([], OrderedSet())
+
+ @classmethod
+ def from_dict(cls, mapping):
+ # type: (Mapping[str, str]) -> Deb822ParagraphElement
+ paragraph = cls.new_empty_paragraph()
+ for k, v in mapping.items():
+ paragraph[k] = v
+ return paragraph
+
+ @classmethod
+ def from_kvpairs(cls, kvpair_elements):
+ # type: (List[Deb822KeyValuePairElement]) -> Deb822ParagraphElement
+ if not kvpair_elements:
+ raise ValueError(
+ "A paragraph must consist of at least one field/value pair"
+ )
+ kvpair_order = OrderedSet(kv.field_name for kv in kvpair_elements)
+ if len(kvpair_order) == len(kvpair_elements):
+ # Each field occurs at most once, which is good because that
+ # means it is a valid paragraph and we can use the optimized
+ # implementation.
+ return Deb822NoDuplicateFieldsParagraphElement(
+ kvpair_elements, kvpair_order
+ )
+ # Fallback implementation, that can cope with the repeated field names
+ # at the cost of complexity.
+ return Deb822DuplicateFieldsParagraphElement(kvpair_elements)
+
+ @property
+ def has_duplicate_fields(self):
+ # type: () -> bool
+ """Tell whether this paragraph has duplicate fields"""
+ return False
+
+ def as_interpreted_dict_view(
+ self,
+ interpretation, # type: Interpretation[T]
+ *,
+ auto_resolve_ambiguous_fields=True, # type: bool
+ ):
+ # type: (...) -> Deb822InterpretingParagraphWrapper[T]
+ r"""Provide a Dict-like view of the paragraph
+
+ This method returns a dict-like object representing this paragraph and
+ is useful for accessing fields in a given interpretation. It is possible
+ to use multiple versions of this dict-like view with different interpretations
+ on the same paragraph at the same time (for different fields).
+
+ >>> example_deb822_paragraph = '''
+ ... Package: foo
+ ... # Field comment (because it becomes just before a field)
+ ... Architecture: amd64
+ ... # Inline comment (associated with the next line)
+ ... i386
+ ... # We also support arm
+ ... arm64
+ ... armel
+ ... '''
+ >>> dfile = parse_deb822_file(example_deb822_paragraph.splitlines())
+ >>> paragraph = next(iter(dfile))
+ >>> list_view = paragraph.as_interpreted_dict_view(LIST_SPACE_SEPARATED_INTERPRETATION)
+ >>> # With the defaults, you only deal with the semantic values
+ >>> # - no leading or trailing whitespace on the first part of the value
+ >>> list(list_view["Package"])
+ ['foo']
+ >>> with list_view["Architecture"] as arch_list:
+ ... orig_arch_list = list(arch_list)
+ ... arch_list.replace('i386', 'kfreebsd-amd64')
+ >>> orig_arch_list
+ ['amd64', 'i386', 'arm64', 'armel']
+ >>> list(list_view["Architecture"])
+ ['amd64', 'kfreebsd-amd64', 'arm64', 'armel']
+ >>> print(paragraph.dump(), end='')
+ Package: foo
+ # Field comment (because it becomes just before a field)
+ Architecture: amd64
+ # Inline comment (associated with the next line)
+ kfreebsd-amd64
+ # We also support arm
+ arm64
+ armel
+ >>> # Format preserved and architecture replaced
+ >>> with list_view["Architecture"] as arch_list:
+ ... # Prettify the result as sorting will cause awkward whitespace
+ ... arch_list.reformat_when_finished()
+ ... arch_list.sort()
+ >>> print(paragraph.dump(), end='')
+ Package: foo
+ # Field comment (because it becomes just before a field)
+ Architecture: amd64
+ # We also support arm
+ arm64
+ armel
+ # Inline comment (associated with the next line)
+ kfreebsd-amd64
+ >>> list(list_view["Architecture"])
+ ['amd64', 'arm64', 'armel', 'kfreebsd-amd64']
+ >>> # Format preserved and architecture values sorted
+
+ :param interpretation: Decides how the field values are interpreted. As an example,
+ use LIST_SPACE_SEPARATED_INTERPRETATION for fields such as Architecture in the
+ debian/control file.
+ :param auto_resolve_ambiguous_fields: This parameter is only relevant for paragraphs
+ that contain the same field multiple times (these are generally invalid). If the
+ caller requests an ambiguous field from an invalid paragraph via a plain field name,
+ the return dict-like object will refuse to resolve the field (not knowing which
+ version to pick). This parameter (if set to True) instead changes the error into
+ assuming the caller wants the *first* variant.
+ """
+ return Deb822InterpretingParagraphWrapper(
+ self,
+ interpretation,
+ auto_resolve_ambiguous_fields=auto_resolve_ambiguous_fields,
+ )
+
+ def configured_view(
+ self,
+ *,
+ discard_comments_on_read=True, # type: bool
+ auto_map_initial_line_whitespace=True, # type: bool
+ auto_resolve_ambiguous_fields=True, # type: bool
+ preserve_field_comments_on_field_updates=True, # type: bool
+ auto_map_final_newline_in_multiline_values=True, # type: bool
+ ):
+ # type: (...) -> Deb822DictishParagraphWrapper
+ r"""Provide a Dict[str, str]-like view of this paragraph with non-standard parameters
+
+ This method returns a dict-like object representing this paragraph that is
+ optionally configured differently from the default view.
+
+ >>> example_deb822_paragraph = '''
+ ... Package: foo
+ ... # Field comment (because it becomes just before a field)
+ ... Depends: libfoo,
+ ... # Inline comment (associated with the next line)
+ ... libbar,
+ ... '''
+ >>> dfile = parse_deb822_file(example_deb822_paragraph.splitlines())
+ >>> paragraph = next(iter(dfile))
+ >>> # With the defaults, you only deal with the semantic values
+ >>> # - no leading or trailing whitespace on the first part of the value
+ >>> paragraph["Package"]
+ 'foo'
+ >>> # - no inline comments in multiline values (but whitespace will be present
+ >>> # subsequent lines.)
+ >>> print(paragraph["Depends"])
+ libfoo,
+ libbar,
+ >>> paragraph['Foo'] = 'bar'
+ >>> paragraph.get('Foo')
+ 'bar'
+ >>> paragraph.get('Unknown-Field') is None
+ True
+ >>> # But you get asymmetric behaviour with set vs. get
+ >>> paragraph['Foo'] = ' bar\n'
+ >>> paragraph['Foo']
+ 'bar'
+ >>> paragraph['Bar'] = ' bar\n#Comment\n another value\n'
+ >>> # Note that the whitespace on the first line has been normalized.
+ >>> print("Bar: " + paragraph['Bar'])
+ Bar: bar
+ another value
+ >>> # The comment is present (in case you where wondering)
+ >>> print(paragraph.get_kvpair_element('Bar').convert_to_text(), end='')
+ Bar: bar
+ #Comment
+ another value
+ >>> # On the other hand, you can choose to see the values as they are
+ >>> # - We will just reset the paragraph as a "nothing up my sleeve"
+ >>> dfile = parse_deb822_file(example_deb822_paragraph.splitlines())
+ >>> paragraph = next(iter(dfile))
+ >>> nonstd_dictview = paragraph.configured_view(
+ ... discard_comments_on_read=False,
+ ... auto_map_initial_line_whitespace=False,
+ ... # For paragraphs with duplicate fields, you can choose to get an error
+ ... # rather than the dict picking the first value available.
+ ... auto_resolve_ambiguous_fields=False,
+ ... auto_map_final_newline_in_multiline_values=False,
+ ... )
+ >>> # Because we have reset the state, Foo and Bar are no longer there.
+ >>> 'Bar' not in paragraph and 'Foo' not in paragraph
+ True
+ >>> # We can now see the comments (discard_comments_on_read=False)
+ >>> # (The leading whitespace in front of "libfoo" is due to
+ >>> # auto_map_initial_line_whitespace=False)
+ >>> print(nonstd_dictview["Depends"], end='')
+ libfoo,
+ # Inline comment (associated with the next line)
+ libbar,
+ >>> # And all the optional whitespace on the first value line
+ >>> # (auto_map_initial_line_whitespace=False)
+ >>> nonstd_dictview["Package"] == ' foo\n'
+ True
+ >>> # ... which will give you symmetric behaviour with set vs. get
+ >>> nonstd_dictview['Foo'] = ' bar \n'
+ >>> nonstd_dictview['Foo']
+ ' bar \n'
+ >>> nonstd_dictview['Bar'] = ' bar \n#Comment\n another value\n'
+ >>> nonstd_dictview['Bar']
+ ' bar \n#Comment\n another value\n'
+ >>> # But then you get no help either.
+ >>> try:
+ ... nonstd_dictview["Baz"] = "foo"
+ ... except ValueError:
+ ... print("Rejected")
+ Rejected
+ >>> # With auto_map_initial_line_whitespace=False, you have to include minimum a newline
+ >>> nonstd_dictview["Baz"] = "foo\n"
+ >>> # The absence of leading whitespace gives you the terse variant at the expensive
+ >>> # readability
+ >>> paragraph.get_kvpair_element('Baz').convert_to_text()
+ 'Baz:foo\n'
+ >>> # But because they are views, changes performed via one view is visible in the other
+ >>> paragraph['Foo']
+ 'bar'
+ >>> # The views show the values according to their own rules. Therefore, there is an
+ >>> # asymmetric between paragraph['Foo'] and nonstd_dictview['Foo']
+ >>> # Nevertheless, you can read or write the fields via either - enabling you to use
+ >>> # the view that best suit your use-case for the given field.
+ >>> 'Baz' in paragraph and nonstd_dictview.get('Baz') is not None
+ True
+ >>> # Deletion via the view also works
+ >>> del nonstd_dictview['Baz']
+ >>> 'Baz' not in paragraph and nonstd_dictview.get('Baz') is None
+ True
+
+
+ :param discard_comments_on_read: When getting a field value from the dict,
+ this parameter decides how in-line comments are handled. When setting
+ the value, inline comments are still allowed and will be retained.
+ However, keep in mind that this option makes getter and setter assymetric
+ as a "get" following a "set" with inline comments will omit the comments
+ even if they are there (see the code example).
+ :param auto_map_initial_line_whitespace: Special-case the first value line
+ by trimming unnecessary whitespace leaving only the value. For single-line
+ values, all space including newline is pruned. For multi-line values, the
+ newline is preserved / needed to distinguish the first line from the
+ following lines. When setting a value, this option normalizes the
+ whitespace of the initial line of the value field.
+ When this option is set to True makes the dictionary behave more like the
+ original Deb822 module.
+ :param preserve_field_comments_on_field_updates: Whether to preserve the field
+ comments when mutating the field.
+ :param auto_resolve_ambiguous_fields: This parameter is only relevant for paragraphs
+ that contain the same field multiple times (these are generally invalid). If the
+ caller requests an ambiguous field from an invalid paragraph via a plain field name,
+ the return dict-like object will refuse to resolve the field (not knowing which
+ version to pick). This parameter (if set to True) instead changes the error into
+ assuming the caller wants the *first* variant.
+ :param auto_map_final_newline_in_multiline_values: This parameter controls whether
+ a multiline field with have / need a trailing newline. If True, the trailing
+ newline is hidden on get and automatically added in set (if missing).
+ When this option is set to True makes the dictionary behave more like the
+ original Deb822 module.
+ """
+ return Deb822DictishParagraphWrapper(
+ self,
+ discard_comments_on_read=discard_comments_on_read,
+ auto_map_initial_line_whitespace=auto_map_initial_line_whitespace,
+ auto_resolve_ambiguous_fields=auto_resolve_ambiguous_fields,
+ preserve_field_comments_on_field_updates=preserve_field_comments_on_field_updates,
+ auto_map_final_newline_in_multiline_values=auto_map_final_newline_in_multiline_values,
+ )
+
+ @property
+ def _paragraph(self):
+ # type: () -> Deb822ParagraphElement
+ return self
+
+ def order_last(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "last" in the paragraph"""
+ raise NotImplementedError # pragma: no cover
+
+ def order_first(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "first" in the paragraph"""
+ raise NotImplementedError # pragma: no cover
+
+ def order_before(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly after the reference field in the paragraph
+
+ The reference field must be present."""
+ raise NotImplementedError # pragma: no cover
+
+ def order_after(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly before the reference field in the paragraph
+
+ The reference field must be present.
+ """
+ raise NotImplementedError # pragma: no cover
+
+ @property
+ def kvpair_count(self):
+ # type: () -> int
+ raise NotImplementedError # pragma: no cover
+
+ def iter_keys(self):
+ # type: () -> Iterable[ParagraphKey]
+ raise NotImplementedError # pragma: no cover
+
+ def contains_kvpair_element(self, item):
+ # type: (object) -> bool
+ raise NotImplementedError # pragma: no cover
+
+ def get_kvpair_element(
+ self,
+ item, # type: ParagraphKey
+ use_get=False, # type: bool
+ ):
+ # type: (...) -> Optional[Deb822KeyValuePairElement]
+ raise NotImplementedError # pragma: no cover
+
+ def set_kvpair_element(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> None
+ raise NotImplementedError # pragma: no cover
+
+ def remove_kvpair_element(self, key):
+ # type: (ParagraphKey) -> None
+ raise NotImplementedError # pragma: no cover
+
+ def sort_fields(
+ self, key=None # type: Optional[Callable[[str], Any]]
+ ):
+ # type: (...) -> None
+ """Re-order all fields
+
+ :param key: Provide a key function (same semantics as for sorted). Keep in mind that
+ the module preserve the cases for field names - in generally, callers are recommended
+ to use "lower()" to normalize the case.
+ """
+ raise NotImplementedError # pragma: no cover
+
+ def set_field_to_simple_value(
+ self,
+ item, # type: ParagraphKey
+ simple_value, # type: str
+ *,
+ preserve_original_field_comment=None, # type: Optional[bool]
+ field_comment=None, # type: Optional[Commentish]
+ ):
+ # type: (...) -> None
+ r"""Sets a field in this paragraph to a simple "word" or "phrase"
+
+ In many cases, it is better for callers to just use the paragraph as
+ if it was a dictionary. However, this method does enable to you choose
+ the field comment (if any), which can be a reason for using it.
+
+ This is suitable for "simple" fields like "Package". Example:
+
+ >>> example_deb822_paragraph = '''
+ ... Package: foo
+ ... '''
+ >>> dfile = parse_deb822_file(example_deb822_paragraph.splitlines())
+ >>> p = next(iter(dfile))
+ >>> p.set_field_to_simple_value("Package", "mscgen")
+ >>> p.set_field_to_simple_value("Architecture", "linux-any kfreebsd-any",
+ ... field_comment=['Only ported to linux and kfreebsd'])
+ >>> p.set_field_to_simple_value("Priority", "optional")
+ >>> print(p.dump(), end='')
+ Package: mscgen
+ # Only ported to linux and kfreebsd
+ Architecture: linux-any kfreebsd-any
+ Priority: optional
+ >>> # Values are formatted nicely by default, but it does not work with
+ >>> # multi-line values
+ >>> p.set_field_to_simple_value("Foo", "bar\nbin\n")
+ Traceback (most recent call last):
+ ...
+ ValueError: Cannot use set_field_to_simple_value for values with newlines
+
+ :param item: Name of the field to set. If the paragraph already
+ contains the field, then it will be replaced. If the field exists,
+ then it will preserve its order in the paragraph. Otherwise, it is
+ added to the end of the paragraph.
+ Note this can be a "paragraph key", which enables you to control
+ *which* instance of a field is being replaced (in case of duplicate
+ fields).
+ :param simple_value: The text to use as the value. The value must not
+ contain newlines. Leading and trailing will be stripped but space
+ within the value is preserved. The value cannot contain comments
+ (i.e. if the "#" token appears in the value, then it is considered
+ a value rather than "start of a comment)
+ :param preserve_original_field_comment: See the description for the
+ parameter with the same name in the set_field_from_raw_string method.
+ :param field_comment: See the description for the parameter with the same
+ name in the set_field_from_raw_string method.
+ """
+ if "\n" in simple_value:
+ raise ValueError(
+ "Cannot use set_field_to_simple_value for values with newlines"
+ )
+
+ # Reformat it with a leading space and trailing newline. The latter because it is
+ # necessary if there are any fields after it and the former because it looks nicer so
+ # have single space after the field separator
+ stripped = simple_value.strip()
+ if stripped:
+ raw_value = " " + stripped + "\n"
+ else:
+ # Special-case for empty values
+ raw_value = "\n"
+ self.set_field_from_raw_string(
+ item,
+ raw_value,
+ preserve_original_field_comment=preserve_original_field_comment,
+ field_comment=field_comment,
+ )
+
+ def set_field_from_raw_string(
+ self,
+ item, # type: ParagraphKey
+ raw_string_value, # type: str
+ *,
+ preserve_original_field_comment=None, # type: Optional[bool]
+ field_comment=None, # type: Optional[Commentish]
+ ):
+ # type: (...) -> None
+ """Sets a field in this paragraph to a given text value
+
+ In many cases, it is better for callers to just use the paragraph as
+ if it was a dictionary. However, this method does enable to you choose
+ the field comment (if any) and lets to have a higher degree of control
+ over whitespace (on the first line), which can be a reason for using it.
+
+ Example usage:
+
+ >>> example_deb822_paragraph = '''
+ ... Package: foo
+ ... '''
+ >>> dfile = parse_deb822_file(example_deb822_paragraph.splitlines())
+ >>> p = next(iter(dfile))
+ >>> raw_value = '''
+ ... Build-Depends: debhelper-compat (= 12),
+ ... some-other-bd,
+ ... # Comment
+ ... another-bd,
+ ... '''.lstrip() # Remove leading newline, but *not* the trailing newline
+ >>> fname, new_value = raw_value.split(':', 1)
+ >>> p.set_field_from_raw_string(fname, new_value)
+ >>> print(p.dump(), end='')
+ Package: foo
+ Build-Depends: debhelper-compat (= 12),
+ some-other-bd,
+ # Comment
+ another-bd,
+ >>> # Format preserved
+
+ :param item: Name of the field to set. If the paragraph already
+ contains the field, then it will be replaced. Otherwise, it is
+ added to the end of the paragraph.
+ Note this can be a "paragraph key", which enables you to control
+ *which* instance of a field is being replaced (in case of duplicate
+ fields).
+ :param raw_string_value: The text to use as the value. The text must
+ be valid deb822 syntax and is used *exactly* as it is given.
+ Accordingly, multi-line values must include mandatory leading space
+ on continuation lines, newlines after the value, etc. On the
+ flip-side, any optional space or comments will be included.
+
+ Note that the first line will *never* be read as a comment (if the
+ first line of the value starts with a "#" then it will result
+ in "Field-Name:#..." which is parsed as a value starting with "#"
+ rather than a comment).
+ :param preserve_original_field_comment: If True, then if there is an
+ existing field and that has a comment, then the comment will remain
+ after this operation. This is the default is the `field_comment`
+ parameter is omitted.
+ Note that if the parameter is True and the item is ambiguous, this
+ will raise an AmbiguousDeb822FieldKeyError. When the parameter is
+ omitted, the ambiguity is resolved automatically and if the resolved
+ field has a comment then that will be preserved (assuming
+ field_comment is None).
+ :param field_comment: If not None, add or replace the comment for
+ the field. Each string in the list will become one comment
+ line (inserted directly before the field name). Will appear in the
+ same order as they do in the list.
+
+ If you want complete control over the formatting of the comments,
+ then ensure that each line start with "#" and end with "\\n" before
+ the call. Otherwise, leading/trailing whitespace is normalized
+ and the missing "#"/"\\n" character is inserted.
+ """
+
+ new_content = [] # type: List[str]
+ if preserve_original_field_comment is not None:
+ if field_comment is not None:
+ raise ValueError(
+ 'The "preserve_original_field_comment" conflicts with'
+ ' "field_comment" parameter'
+ )
+ elif field_comment is not None:
+ if not isinstance(field_comment, Deb822CommentElement):
+ new_content.extend(_format_comment(x) for x in field_comment)
+ field_comment = None
+ preserve_original_field_comment = False
+
+ field_name, _, _ = _unpack_key(item)
+
+ cased_field_name = field_name
+ try:
+ original = self.get_kvpair_element(item, use_get=True)
+ except AmbiguousDeb822FieldKeyError:
+ if preserve_original_field_comment:
+ # If we were asked to preserve the original comment, then we
+ # require a strict lookup
+ raise
+ original = self.get_kvpair_element((field_name, 0), use_get=True)
+
+ if preserve_original_field_comment is None:
+ # We simplify preserve_original_field_comment after the lookup of the field.
+ # Otherwise, we can get ambiguous key errors when updating an ambiguous field
+ # when the caller did not explicitly ask for that behaviour.
+ preserve_original_field_comment = True
+
+ if original:
+ # If we already have the field, then preserve the original case
+ cased_field_name = original.field_name
+ raw = ":".join((cased_field_name, raw_string_value))
+ raw_lines = raw.splitlines(keepends=True)
+ for i, line in enumerate(raw_lines, start=1):
+ if not line.endswith("\n"):
+ raise ValueError(
+ "Line {i} in new value was missing trailing newline".format(i=i)
+ )
+ if i != 1 and line[0] not in (" ", "\t", "#"):
+ msg = (
+ "Line {i} in new value was invalid. It must either start"
+ ' with " " space (continuation line) or "#" (comment line).'
+ ' The line started with "{line}"'
+ )
+ raise ValueError(msg.format(i=i, line=line[0]))
+ if len(raw_lines) > 1 and raw_lines[-1].startswith("#"):
+ raise ValueError("The last line in a value field cannot be a comment")
+ new_content.extend(raw_lines)
+ # As absurd as it might seem, it is easier to just use the parser to
+ # construct the AST correctly
+ deb822_file = parse_deb822_file(iter(new_content))
+ error_token = deb822_file.find_first_error_element()
+ if error_token:
+ raise ValueError("Syntax error in new field value for " + field_name)
+ paragraph = next(iter(deb822_file))
+ assert isinstance(paragraph, Deb822NoDuplicateFieldsParagraphElement)
+ value = paragraph.get_kvpair_element(field_name)
+ assert value is not None
+ if preserve_original_field_comment:
+ if original:
+ value.comment_element = original.comment_element
+ original.comment_element = None
+ elif field_comment is not None:
+ value.comment_element = field_comment
+ self.set_kvpair_element(item, value)
+
+ @overload
+ def dump(
+ self, fd # type: IO[bytes]
+ ):
+ # type: (...) -> None
+ pass
+
+ @overload
+ def dump(self):
+ # type: () -> str
+ pass
+
+ def dump(
+ self, fd=None # type: Optional[IO[bytes]]
+ ):
+ # type: (...) -> Optional[str]
+ if fd is None:
+ return "".join(t.text for t in self.iter_tokens())
+ for token in self.iter_tokens():
+ fd.write(token.text.encode("utf-8"))
+ return None
+
+
+class Deb822NoDuplicateFieldsParagraphElement(Deb822ParagraphElement):
+ """Paragraph implementation optimized for valid deb822 files
+
+ When there are no duplicated fields, we can use simpler and faster
+ datastructures for common operations.
+ """
+
+ def __init__(
+ self,
+ kvpair_elements, # type: List[Deb822KeyValuePairElement]
+ kvpair_order, # type: OrderedSet
+ ):
+ # type: (...) -> None
+ super().__init__()
+ self._kvpair_elements = {kv.field_name: kv for kv in kvpair_elements}
+ self._kvpair_order = kvpair_order
+ self._init_parent_of_parts()
+
+ @property
+ def kvpair_count(self):
+ # type: () -> int
+ return len(self._kvpair_elements)
+
+ def order_last(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "last" in the paragraph"""
+ unpacked_field, _, _ = _unpack_key(field, raise_if_indexed=True)
+ self._kvpair_order.order_last(unpacked_field)
+
+ def order_first(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "first" in the paragraph"""
+ unpacked_field, _, _ = _unpack_key(field, raise_if_indexed=True)
+ self._kvpair_order.order_first(unpacked_field)
+
+ def order_before(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly after the reference field in the paragraph
+
+ The reference field must be present."""
+ unpacked_field, _, _ = _unpack_key(field, raise_if_indexed=True)
+ unpacked_ref_field, _, _ = _unpack_key(reference_field, raise_if_indexed=True)
+ self._kvpair_order.order_before(unpacked_field, unpacked_ref_field)
+
+ def order_after(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly before the reference field in the paragraph
+
+ The reference field must be present.
+ """
+ unpacked_field, _, _ = _unpack_key(field, raise_if_indexed=True)
+ unpacked_ref_field, _, _ = _unpack_key(reference_field, raise_if_indexed=True)
+ self._kvpair_order.order_after(unpacked_field, unpacked_ref_field)
+
+ # Overload to narrow the type to just str.
+ def __iter__(self):
+ # type: () -> Iterator[str]
+ return iter(str(k) for k in self._kvpair_order)
+
+ def iter_keys(self):
+ # type: () -> Iterable[str]
+ yield from (str(k) for k in self._kvpair_order)
+
+ def remove_kvpair_element(self, key):
+ # type: (ParagraphKey) -> None
+ self._full_size_cache = None
+ key, _, _ = _unpack_key(key, raise_if_indexed=True)
+ del self._kvpair_elements[key]
+ self._kvpair_order.remove(key)
+
+ def contains_kvpair_element(self, item):
+ # type: (object) -> bool
+ if not isinstance(item, (str, tuple, Deb822FieldNameToken)):
+ return False
+ item = cast("ParagraphKey", item)
+ key, _, _ = _unpack_key(item, raise_if_indexed=True)
+ return key in self._kvpair_elements
+
+ def get_kvpair_element(
+ self,
+ item, # type: ParagraphKey
+ use_get=False, # type: bool
+ ):
+ # type: (...) -> Optional[Deb822KeyValuePairElement]
+ item, _, _ = _unpack_key(item, raise_if_indexed=True)
+ if use_get:
+ return self._kvpair_elements.get(item)
+ return self._kvpair_elements[item]
+
+ def set_kvpair_element(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> None
+ key, _, _ = _unpack_key(key, raise_if_indexed=True)
+ if isinstance(key, Deb822FieldNameToken):
+ if key is not value.field_token:
+ raise ValueError(
+ "Key is a Deb822FieldNameToken, but not *the* Deb822FieldNameToken"
+ " for the value"
+ )
+ key = value.field_name
+ else:
+ if key != value.field_name:
+ raise ValueError(
+ "Cannot insert value under a different field value than field name"
+ " from its Deb822FieldNameToken implies"
+ )
+ # Use the string from the Deb822FieldNameToken as we need to keep that in memory either
+ # way
+ key = value.field_name
+ original_value = self._kvpair_elements.get(key)
+ self._full_size_cache = None
+ self._kvpair_elements[key] = value
+ self._kvpair_order.append(key)
+ if original_value is not None:
+ original_value.parent_element = None
+ value.parent_element = self
+
+ def sort_fields(self, key=None):
+ # type: (Optional[Callable[[str], Any]]) -> None
+ """Re-order all fields
+
+ :param key: Provide a key function (same semantics as for sorted). Keep in mind that
+ the module preserve the cases for field names - in generally, callers are recommended
+ to use "lower()" to normalize the case.
+ """
+ for last_field_name in reversed(self._kvpair_order):
+ last_kvpair = self._kvpair_elements[cast("_strI", last_field_name)]
+ if last_kvpair.value_element.add_final_newline_if_missing():
+ self._full_size_cache = None
+ break
+
+ if key is None:
+ key = default_field_sort_key
+
+ self._kvpair_order = OrderedSet(sorted(self._kvpair_order, key=key))
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from (
+ self._kvpair_elements[x]
+ for x in cast("Iterable[_strI]", self._kvpair_order)
+ )
+
+
+class Deb822DuplicateFieldsParagraphElement(Deb822ParagraphElement):
+
+ def __init__(self, kvpair_elements):
+ # type: (List[Deb822KeyValuePairElement]) -> None
+ super().__init__()
+ self._kvpair_order = LinkedList() # type: LinkedList[Deb822KeyValuePairElement]
+ self._kvpair_elements = {} # type: Dict[_strI, List[KVPNode]]
+ self._init_kvpair_fields(kvpair_elements)
+ self._init_parent_of_parts()
+
+ @property
+ def has_duplicate_fields(self):
+ # type: () -> bool
+ # Most likely, the answer is "True" but if the caller "fixes" the problem
+ # then this can return "False"
+ return len(self._kvpair_order) > len(self._kvpair_elements)
+
+ def _init_kvpair_fields(self, kvpairs):
+ # type: (Iterable[Deb822KeyValuePairElement]) -> None
+ assert not self._kvpair_order
+ assert not self._kvpair_elements
+ for kv in kvpairs:
+ field_name = kv.field_name
+ node = self._kvpair_order.append(kv)
+ if field_name not in self._kvpair_elements:
+ self._kvpair_elements[field_name] = [node]
+ else:
+ self._kvpair_elements[field_name].append(node)
+
+ def _nodes_being_relocated(self, field):
+ # type: (ParagraphKey) -> Tuple[List[KVPNode], List[KVPNode]]
+ key, index, name_token = _unpack_key(field)
+ nodes = self._kvpair_elements[key]
+ nodes_being_relocated = []
+
+ if name_token is not None or index is not None:
+ single_node = self._resolve_to_single_node(nodes, key, index, name_token)
+ assert single_node is not None
+ nodes_being_relocated.append(single_node)
+ else:
+ nodes_being_relocated = nodes
+ return nodes, nodes_being_relocated
+
+ def order_last(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "last" in the paragraph"""
+ nodes, nodes_being_relocated = self._nodes_being_relocated(field)
+ assert len(nodes_being_relocated) == 1 or len(nodes) == len(
+ nodes_being_relocated
+ )
+
+ kvpair_order = self._kvpair_order
+ for node in nodes_being_relocated:
+ if kvpair_order.tail_node is node:
+ # Special case for relocating a single node that happens to be the last.
+ continue
+ kvpair_order.remove_node(node)
+ # assertion for mypy
+ assert kvpair_order.tail_node is not None
+ kvpair_order.insert_node_after(node, kvpair_order.tail_node)
+
+ if (
+ len(nodes_being_relocated) == 1
+ and nodes_being_relocated[0] is not nodes[-1]
+ ):
+ single_node = nodes_being_relocated[0]
+ nodes.remove(single_node)
+ nodes.append(single_node)
+
+ def order_first(self, field):
+ # type: (ParagraphKey) -> None
+ """Re-order the given field so it is "first" in the paragraph"""
+ nodes, nodes_being_relocated = self._nodes_being_relocated(field)
+ assert len(nodes_being_relocated) == 1 or len(nodes) == len(
+ nodes_being_relocated
+ )
+
+ kvpair_order = self._kvpair_order
+ for node in nodes_being_relocated:
+ if kvpair_order.head_node is node:
+ # Special case for relocating a single node that happens to be the first.
+ continue
+ kvpair_order.remove_node(node)
+ # assertion for mypy
+ assert kvpair_order.head_node is not None
+ kvpair_order.insert_node_before(node, kvpair_order.head_node)
+
+ if len(nodes_being_relocated) == 1 and nodes_being_relocated[0] is not nodes[0]:
+ single_node = nodes_being_relocated[0]
+ nodes.remove(single_node)
+ nodes.insert(0, single_node)
+
+ def order_before(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly after the reference field in the paragraph
+
+ The reference field must be present."""
+ nodes, nodes_being_relocated = self._nodes_being_relocated(field)
+ assert len(nodes_being_relocated) == 1 or len(nodes) == len(
+ nodes_being_relocated
+ )
+ # For "before" we always use the "first" variant as reference in case of doubt
+ _, reference_nodes = self._nodes_being_relocated(reference_field)
+ reference_node = reference_nodes[0]
+ if reference_node in nodes_being_relocated:
+ raise ValueError("Cannot re-order a field relative to itself")
+
+ kvpair_order = self._kvpair_order
+ for node in nodes_being_relocated:
+ kvpair_order.remove_node(node)
+ kvpair_order.insert_node_before(node, reference_node)
+
+ if len(nodes_being_relocated) == 1 and len(nodes) > 1:
+ # Regenerate the (new) relative field order.
+ field_name = nodes_being_relocated[0].value.field_name
+ self._regenerate_relative_kvapir_order(field_name)
+
+ def order_after(self, field, reference_field):
+ # type: (ParagraphKey, ParagraphKey) -> None
+ """Re-order the given field so appears directly before the reference field in the paragraph
+
+ The reference field must be present.
+ """
+ nodes, nodes_being_relocated = self._nodes_being_relocated(field)
+ assert len(nodes_being_relocated) == 1 or len(nodes) == len(
+ nodes_being_relocated
+ )
+ _, reference_nodes = self._nodes_being_relocated(reference_field)
+ # For "after" we always use the "last" variant as reference in case of doubt
+ reference_node = reference_nodes[-1]
+ if reference_node in nodes_being_relocated:
+ raise ValueError("Cannot re-order a field relative to itself")
+
+ kvpair_order = self._kvpair_order
+ # Use "reversed" to preserve the relative order of the nodes assuming a bulk reorder
+ for node in reversed(nodes_being_relocated):
+ kvpair_order.remove_node(node)
+ kvpair_order.insert_node_after(node, reference_node)
+
+ if len(nodes_being_relocated) == 1 and len(nodes) > 1:
+ # Regenerate the (new) relative field order.
+ field_name = nodes_being_relocated[0].value.field_name
+ self._regenerate_relative_kvapir_order(field_name)
+
+ def _regenerate_relative_kvapir_order(self, field_name):
+ # type: (_strI) -> None
+ nodes = []
+ for node in self._kvpair_order.iter_nodes():
+ if node.value.field_name == field_name:
+ nodes.append(node)
+ self._kvpair_elements[field_name] = nodes
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._kvpair_order
+
+ @property
+ def kvpair_count(self):
+ # type: () -> int
+ return len(self._kvpair_order)
+
+ def iter_keys(self):
+ # type: () -> Iterable[ParagraphKey]
+ yield from (kv.field_name for kv in self._kvpair_order)
+
+ def _resolve_to_single_node(
+ self,
+ nodes, # type: List[KVPNode]
+ key, # type: str
+ index, # type: Optional[int]
+ name_token, # type: Optional[Deb822FieldNameToken]
+ use_get=False, # type: bool
+ ):
+ # type: (...) -> Optional[KVPNode]
+ if index is None:
+ if len(nodes) != 1:
+ if name_token is not None:
+ node = self._find_node_via_name_token(name_token, nodes)
+ if node is not None:
+ return node
+ msg = (
+ "Ambiguous key {key} - the field appears {res_len} times. Use"
+ " ({key}, index) to denote which instance of the field you want. (Index"
+ " can be 0..{res_len_1} or e.g. -1 to denote the last field)"
+ )
+ raise AmbiguousDeb822FieldKeyError(
+ msg.format(key=key, res_len=len(nodes), res_len_1=len(nodes) - 1)
+ )
+ index = 0
+ try:
+ return nodes[index]
+ except IndexError:
+ if use_get:
+ return None
+ msg = 'Field "{key}" was present but the index "{index}" was invalid.'
+ raise KeyError(msg.format(key=key, index=index))
+
+ def get_kvpair_element(
+ self,
+ item, # type: ParagraphKey
+ use_get=False, # type: bool
+ ):
+ # type: (...) -> Optional[Deb822KeyValuePairElement]
+ key, index, name_token = _unpack_key(item)
+ if use_get:
+ nodes = self._kvpair_elements.get(key)
+ if nodes is None:
+ return None
+ else:
+ nodes = self._kvpair_elements[key]
+ node = self._resolve_to_single_node(
+ nodes, key, index, name_token, use_get=use_get
+ )
+ if node is not None:
+ return node.value
+ return None
+
+ @staticmethod
+ def _find_node_via_name_token(
+ name_token, # type: Deb822FieldNameToken
+ elements, # type: Iterable[KVPNode]
+ ):
+ # type: (...) -> Optional[KVPNode]
+ # if we are given a name token, then it is non-ambiguous if we have exactly
+ # that name token in our list of nodes. It will be an O(n) lookup but we
+ # probably do not have that many duplicate fields (and even if do, it is not
+ # exactly a valid file, so there little reason to optimize for it)
+ for node in elements:
+ if name_token is node.value.field_token:
+ return node
+ return None
+
+ def contains_kvpair_element(self, item):
+ # type: (object) -> bool
+ if not isinstance(item, (str, tuple, Deb822FieldNameToken)):
+ return False
+ item = cast("ParagraphKey", item)
+ try:
+ return self.get_kvpair_element(item, use_get=True) is not None
+ except AmbiguousDeb822FieldKeyError:
+ return True
+
+ def set_kvpair_element(self, key, value):
+ # type: (ParagraphKey, Deb822KeyValuePairElement) -> None
+ key, index, name_token = _unpack_key(key)
+ if name_token:
+ if name_token is not value.field_token:
+ original_nodes = self._kvpair_elements.get(value.field_name)
+ original_node = None
+ if original_nodes is not None:
+ original_node = self._find_node_via_name_token(
+ name_token, original_nodes
+ )
+
+ if original_node is None:
+ raise ValueError(
+ "Key is a Deb822FieldNameToken, but not *the*"
+ " Deb822FieldNameToken for the value nor the"
+ " Deb822FieldNameToken for an existing field in the paragraph"
+ )
+ # Primarily for mypy's sake
+ assert original_nodes is not None
+ # Rely on the index-based code below to handle update.
+ index = original_nodes.index(original_node)
+ key = value.field_name
+ else:
+ if key != value.field_name:
+ raise ValueError(
+ "Cannot insert value under a different field value than field name"
+ " from its Deb822FieldNameToken implies"
+ )
+ # Use the string from the Deb822FieldNameToken as it is a _strI and has the same value
+ # (memory optimization)
+ key = value.field_name
+ self._full_size_cache = None
+ original_nodes = self._kvpair_elements.get(key)
+ if original_nodes is None or not original_nodes:
+ if index is not None and index != 0:
+ msg = (
+ "Cannot replace field ({key}, {index}) as the field does not exist"
+ " in the first place. Please index-less key or ({key}, 0) if you"
+ " want to add the field."
+ )
+ raise KeyError(msg.format(key=key, index=index))
+ node = self._kvpair_order.append(value)
+ if key not in self._kvpair_elements:
+ self._kvpair_elements[key] = [node]
+ else:
+ self._kvpair_elements[key].append(node)
+ return
+
+ replace_all = False
+ if index is None:
+ replace_all = True
+ node = original_nodes[0]
+ if len(original_nodes) != 1:
+ self._kvpair_elements[key] = [node]
+ else:
+ # We insist on there being an original node, which as a side effect ensures
+ # you cannot add additional copies of the field. This means that you cannot
+ # make the problem worse.
+ node = original_nodes[index]
+
+ # Replace the value of the existing node plus do a little dance
+ # for the parent element part.
+ node.value.parent_element = None
+ value.parent_element = self
+ node.value = value
+
+ if replace_all and len(original_nodes) != 1:
+ # If we were in a replace-all mode, discard any remaining nodes
+ for n in original_nodes[1:]:
+ n.value.parent_element = None
+ self._kvpair_order.remove_node(n)
+
+ def remove_kvpair_element(self, key):
+ # type: (ParagraphKey) -> None
+ key, idx, name_token = _unpack_key(key)
+ field_list = self._kvpair_elements[key]
+
+ if name_token is None and idx is None:
+ self._full_size_cache = None
+ # Remove all case
+ for node in field_list:
+ node.value.parent_element = None
+ self._kvpair_order.remove_node(node)
+ del self._kvpair_elements[key]
+ return
+
+ if name_token is not None:
+ # Indirection between original_node and node for mypy's sake
+ original_node = self._find_node_via_name_token(name_token, field_list)
+ if original_node is None:
+ msg = 'The field "{key}" is present but key used to access it is not.'
+ raise KeyError(msg.format(key=key))
+ node = original_node
+ else:
+ assert idx is not None
+ try:
+ node = field_list[idx]
+ except KeyError:
+ msg = 'The field "{key}" is present, but the index "{idx}" was invalid.'
+ raise KeyError(msg.format(key=key, idx=idx))
+
+ self._full_size_cache = None
+ if len(field_list) == 1:
+ del self._kvpair_elements[key]
+ else:
+ field_list.remove(node)
+ node.value.parent_element = None
+ self._kvpair_order.remove_node(node)
+
+ def sort_fields(self, key=None):
+ # type: (Optional[Callable[[str], Any]]) -> None
+ """Re-order all fields
+
+ :param key: Provide a key function (same semantics as for sorted). Keep in mind that
+ the module preserve the cases for field names - in generally, callers are recommended
+ to use "lower()" to normalize the case.
+ """
+
+ if key is None:
+ key = default_field_sort_key
+
+ # Work around mypy that cannot seem to shred the Optional notion
+ # without this little indirection
+ key_impl = key
+
+ def _actual_key(kvpair):
+ # type: (Deb822KeyValuePairElement) -> Any
+ return key_impl(kvpair.field_name)
+
+ for last_kvpair in reversed(self._kvpair_order):
+ if last_kvpair.value_element.add_final_newline_if_missing():
+ self._full_size_cache = None
+ break
+
+ sorted_kvpair_list = sorted(self._kvpair_order, key=_actual_key)
+ self._kvpair_order = LinkedList()
+ self._kvpair_elements = {}
+ self._init_kvpair_fields(sorted_kvpair_list)
+
+
+class Deb822FileElement(Deb822Element):
+ """Represents the entire deb822 file"""
+
+ def __init__(self, token_and_elements):
+ # type: (LinkedList[TokenOrElement]) -> None
+ super().__init__()
+ self._token_and_elements = token_and_elements
+ self._init_parent_of_parts()
+
+ @classmethod
+ def new_empty_file(cls):
+ # type: () -> Deb822FileElement
+ """Creates a new Deb822FileElement with no contents
+
+ Note that a deb822 file must be non-empty to be considered valid
+ """
+ return cls(LinkedList())
+
+ @property
+ def is_valid_file(self):
+ # type: () -> bool
+ """Returns true if the file is valid
+
+ Invalid elements include error elements (Deb822ErrorElement) but also
+ issues such as paragraphs with duplicate fields or "empty" files
+ (a valid deb822 file contains at least one paragraph).
+ """
+ had_paragraph = False
+ for paragraph in self:
+ had_paragraph = True
+ if not paragraph or paragraph.has_duplicate_fields:
+ return False
+
+ if not had_paragraph:
+ return False
+
+ return self.find_first_error_element() is None
+
+ def find_first_error_element(self):
+ # type: () -> Optional[Deb822ErrorElement]
+ """Returns the first Deb822ErrorElement (or None) in the file"""
+ return next(
+ iter(self.iter_recurse(only_element_or_token_type=Deb822ErrorElement)), None
+ )
+
+ def __iter__(self):
+ # type: () -> Iterator[Deb822ParagraphElement]
+ return iter(self.iter_parts_of_type(Deb822ParagraphElement))
+
+ def iter_parts(self):
+ # type: () -> Iterable[TokenOrElement]
+ yield from self._token_and_elements
+
+ def insert(self, idx, para):
+ # type: (int, Deb822ParagraphElement) -> None
+ """Inserts a paragraph into the file at the given "index" of paragraphs
+
+ Note that if the index is between two paragraphs containing a "free
+ floating" comment (e.g. paragrah/start-of-file, empty line, comment,
+ empty line, paragraph) then it is unspecified which "side" of the
+ comment the new paragraph will appear and this may change between
+ versions of python-debian.
+
+
+ >>> original = '''
+ ... Package: libfoo-dev
+ ... Depends: libfoo1 (= ${binary:Version}), ${shlib:Depends}, ${misc:Depends}
+ ... '''.lstrip()
+ >>> deb822_file = parse_deb822_file(original.splitlines())
+ >>> para1 = Deb822ParagraphElement.new_empty_paragraph()
+ >>> para1["Source"] = "foo"
+ >>> para1["Build-Depends"] = "debhelper-compat (= 13)"
+ >>> para2 = Deb822ParagraphElement.new_empty_paragraph()
+ >>> para2["Package"] = "libfoo1"
+ >>> para2["Depends"] = "${shlib:Depends}, ${misc:Depends}"
+ >>> deb822_file.insert(0, para1)
+ >>> deb822_file.insert(1, para2)
+ >>> expected = '''
+ ... Source: foo
+ ... Build-Depends: debhelper-compat (= 13)
+ ...
+ ... Package: libfoo1
+ ... Depends: ${shlib:Depends}, ${misc:Depends}
+ ...
+ ... Package: libfoo-dev
+ ... Depends: libfoo1 (= ${binary:Version}), ${shlib:Depends}, ${misc:Depends}
+ ... '''.lstrip()
+ >>> deb822_file.dump() == expected
+ True
+ """
+
+ anchor_node = None
+ needs_newline = True
+ self._full_size_cache = None
+ if idx == 0:
+ # Special-case, if idx is 0, then we insert it before everything else.
+ # This is mostly a cosmetic choice for corner cases involving free-floating
+ # comments in the file.
+ if not self._token_and_elements:
+ self.append(para)
+ return
+ anchor_node = self._token_and_elements.head_node
+ needs_newline = bool(self._token_and_elements)
+ else:
+ i = 0
+ for node in self._token_and_elements.iter_nodes():
+ entry = node.value
+ if isinstance(entry, Deb822ParagraphElement):
+ i += 1
+ if idx == i - 1:
+ anchor_node = node
+ break
+
+ if anchor_node is None:
+ # Empty list or idx after the last paragraph both degenerate into append
+ self.append(para)
+ else:
+ if needs_newline:
+ # Remember to inject the "separating" newline between two paragraphs
+ nl_token = self._set_parent(Deb822WhitespaceToken("\n"))
+ anchor_node = self._token_and_elements.insert_before(
+ nl_token, anchor_node
+ )
+ self._token_and_elements.insert_before(self._set_parent(para), anchor_node)
+
+ def append(self, paragraph):
+ # type: (Deb822ParagraphElement) -> None
+ """Appends a paragraph to the file
+
+ >>> deb822_file = Deb822FileElement.new_empty_file()
+ >>> para1 = Deb822ParagraphElement.new_empty_paragraph()
+ >>> para1["Source"] = "foo"
+ >>> para1["Build-Depends"] = "debhelper-compat (= 13)"
+ >>> para2 = Deb822ParagraphElement.new_empty_paragraph()
+ >>> para2["Package"] = "foo"
+ >>> para2["Depends"] = "${shlib:Depends}, ${misc:Depends}"
+ >>> deb822_file.append(para1)
+ >>> deb822_file.append(para2)
+ >>> expected = '''
+ ... Source: foo
+ ... Build-Depends: debhelper-compat (= 13)
+ ...
+ ... Package: foo
+ ... Depends: ${shlib:Depends}, ${misc:Depends}
+ ... '''.lstrip()
+ >>> deb822_file.dump() == expected
+ True
+ """
+ tail_element = self._token_and_elements.tail
+ if paragraph.parent_element is not None:
+ if paragraph.parent_element is self:
+ raise ValueError("Paragraph is already a part of this file")
+ raise ValueError("Paragraph is already part of another Deb822File")
+
+ self._full_size_cache = None
+ # We need a separating newline if there is not a whitespace token at the end of the file.
+ # Note the special case where the file ends on a comment; here we insert a whitespace too
+ # to be sure. Otherwise, we would have to check that there is an empty line before that
+ # comment and that is too much effort.
+ if tail_element and not isinstance(tail_element, Deb822WhitespaceToken):
+ self._token_and_elements.append(
+ self._set_parent(Deb822WhitespaceToken("\n"))
+ )
+ self._token_and_elements.append(self._set_parent(paragraph))
+
+ def remove(self, paragraph):
+ # type: (Deb822ParagraphElement) -> None
+ if paragraph.parent_element is not self:
+ raise ValueError("Paragraph is part of a different file")
+ node = None
+ for node in self._token_and_elements.iter_nodes():
+ if node.value is paragraph:
+ break
+ if node is None:
+ raise RuntimeError("unable to find paragraph")
+ self._full_size_cache = None
+ previous_node = node.previous_node
+ next_node = node.next_node
+ self._token_and_elements.remove_node(node)
+ if next_node is None:
+ if previous_node and isinstance(previous_node.value, Deb822WhitespaceToken):
+ self._token_and_elements.remove_node(previous_node)
+ else:
+ if isinstance(next_node.value, Deb822WhitespaceToken):
+ self._token_and_elements.remove_node(next_node)
+ paragraph.parent_element = None
+
+ def _set_parent(self, t):
+ # type: (TE) -> TE
+ t.parent_element = self
+ return t
+
+ def position_in_parent(self, *, skip_leading_comments: bool = True) -> Position:
+ # Recursive base-case
+ return START_POSITION
+
+ def position_in_file(self, *, skip_leading_comments: bool = True) -> Position:
+ # By definition
+ return START_POSITION
+
+ @overload
+ def dump(
+ self, fd # type: IO[bytes]
+ ):
+ # type: (...) -> None
+ pass
+
+ @overload
+ def dump(self):
+ # type: () -> str
+ pass
+
+ def dump(
+ self, fd=None # type: Optional[IO[bytes]]
+ ):
+ # type: (...) -> Optional[str]
+ if fd is None:
+ return "".join(t.text for t in self.iter_tokens())
+ for token in self.iter_tokens():
+ fd.write(token.text.encode("utf-8"))
+ return None
+
+
+_combine_error_tokens_into_elements = combine_into_replacement(
+ Deb822ErrorToken, Deb822ErrorElement
+)
+_combine_comment_tokens_into_elements = combine_into_replacement(
+ Deb822CommentToken, Deb822CommentElement
+)
+_combine_vl_elements_into_value_elements = combine_into_replacement(
+ Deb822ValueLineElement, Deb822ValueElement
+)
+_combine_kvp_elements_into_paragraphs = combine_into_replacement(
+ Deb822KeyValuePairElement,
+ Deb822ParagraphElement,
+ constructor=Deb822ParagraphElement.from_kvpairs,
+)
+
+
+def _parsed_value_render_factory(discard_comments):
+ # type: (bool) -> Callable[[Deb822ParsedValueElement], str]
+ return (
+ Deb822ParsedValueElement.convert_to_text_without_comments
+ if discard_comments
+ else Deb822ParsedValueElement.convert_to_text
+ )
+
+
+LIST_SPACE_SEPARATED_INTERPRETATION = ListInterpretation(
+ whitespace_split_tokenizer,
+ _parse_whitespace_list_value,
+ Deb822ParsedValueElement,
+ Deb822SemanticallySignificantWhiteSpace,
+ lambda: Deb822SpaceSeparatorToken(" "),
+ _parsed_value_render_factory,
+)
+LIST_COMMA_SEPARATED_INTERPRETATION = ListInterpretation(
+ comma_split_tokenizer,
+ _parse_comma_list_value,
+ Deb822ParsedValueElement,
+ Deb822CommaToken,
+ Deb822CommaToken,
+ _parsed_value_render_factory,
+)
+LIST_UPLOADERS_INTERPRETATION = ListInterpretation(
+ comma_split_tokenizer,
+ _parse_uploaders_list_value,
+ Deb822ParsedValueElement,
+ Deb822CommaToken,
+ Deb822CommaToken,
+ _parsed_value_render_factory,
+)
+
+
+def _non_end_of_line_token(v):
+ # type: (TokenOrElement) -> bool
+ # Consume tokens until the newline
+ return not isinstance(v, Deb822WhitespaceToken) or v.text != "\n"
+
+
+def _build_value_line(
+ token_stream, # type: Iterable[Union[TokenOrElement, Deb822CommentElement]]
+):
+ # type: (...) -> Iterable[Union[TokenOrElement, Deb822ValueLineElement]]
+ """Parser helper - consumes tokens part of a Deb822ValueEntryElement and turns them into one"""
+ buffered_stream = BufferingIterator(token_stream)
+
+ # Deb822ValueLineElement is a bit tricky because of how we handle whitespace
+ # and comments.
+ #
+ # In relation to comments, then only continuation lines can have comments.
+ # If there is a comment before a "K: V" line, then the comment is associated
+ # with the field rather than the value.
+ #
+ # On the whitespace front, then we separate syntactical mandatory whitespace
+ # from optional whitespace. As an example:
+ #
+ # """
+ # # some comment associated with the Depends field
+ # Depends:_foo_$
+ # # some comment associated with the line containing "bar"
+ # !________bar_$
+ # """
+ #
+ # Where "$" and "!" represents mandatory whitespace (the newline and the first
+ # space are required for the file to be parsed correctly), where as "_" is
+ # "optional" whitespace (from a syntactical point of view).
+ #
+ # This distinction enable us to facilitate APIs for easy removal/normalization
+ # of redundant whitespaces without having programmers worry about trashing
+ # the file.
+ #
+ #
+
+ comment_element = None
+ continuation_line_token = None
+ token = None # type: Optional[TokenOrElement]
+
+ for token in buffered_stream:
+ start_of_value_entry = False
+ if isinstance(token, Deb822ValueContinuationToken):
+ continuation_line_token = token
+ start_of_value_entry = True
+ token = None
+ elif isinstance(token, Deb822FieldSeparatorToken):
+ start_of_value_entry = True
+ elif isinstance(token, Deb822CommentElement):
+ next_token = buffered_stream.peek()
+ # If the next token is a continuation line token, then this comment
+ # belong to a value and we might as well just start the value
+ # parsing now.
+ #
+ # Note that we rely on this behaviour to avoid emitting the comment
+ # token (failing to do so would cause the comment to appear twice
+ # in the file).
+ if isinstance(next_token, Deb822ValueContinuationToken):
+ start_of_value_entry = True
+ comment_element = token
+ token = None
+ # Use next with None to avoid raising StopIteration inside a generator
+ # It won't happen, but pylint cannot see that, so we do this instead.
+ continuation_line_token = cast(
+ "Deb822ValueContinuationToken", next(buffered_stream, None)
+ )
+ assert continuation_line_token is not None
+
+ if token is not None:
+ yield token
+ if start_of_value_entry:
+ tokens_in_value = list(buffered_stream.takewhile(_non_end_of_line_token))
+ eol_token = cast("Deb822WhitespaceToken", next(buffered_stream, None))
+ assert eol_token is None or eol_token.text == "\n"
+ leading_whitespace = None
+ trailing_whitespace = None
+ # "Depends:\n foo" would cause tokens_in_value to be empty for the
+ # first "value line" (the empty part between ":" and "\n")
+ if tokens_in_value:
+ # Another special-case, "Depends: \n foo" (i.e. space after colon)
+ # should not introduce an IndexError
+ if isinstance(tokens_in_value[-1], Deb822WhitespaceToken):
+ trailing_whitespace = cast(
+ "Deb822WhitespaceToken", tokens_in_value.pop()
+ )
+ if tokens_in_value and isinstance(
+ tokens_in_value[-1], Deb822WhitespaceToken
+ ):
+ leading_whitespace = cast(
+ "Deb822WhitespaceToken", tokens_in_value[0]
+ )
+ tokens_in_value = tokens_in_value[1:]
+ yield Deb822ValueLineElement(
+ comment_element,
+ continuation_line_token,
+ leading_whitespace,
+ tokens_in_value,
+ trailing_whitespace,
+ eol_token,
+ )
+ comment_element = None
+ continuation_line_token = None
+
+
+def _build_field_with_value(
+ token_stream, # type: Iterable[Union[TokenOrElement, Deb822ValueElement]]
+):
+ # type: (...) -> Iterable[Union[TokenOrElement, Deb822KeyValuePairElement]]
+ buffered_stream = BufferingIterator(token_stream)
+ for token_or_element in buffered_stream:
+ start_of_field = False
+ comment_element = None
+ if isinstance(token_or_element, Deb822FieldNameToken):
+ start_of_field = True
+ elif isinstance(token_or_element, Deb822CommentElement):
+ comment_element = token_or_element
+ next_token = buffered_stream.peek()
+ start_of_field = isinstance(next_token, Deb822FieldNameToken)
+ if start_of_field:
+ # Remember to consume the field token
+ try:
+ token_or_element = next(buffered_stream)
+ except StopIteration: # pragma: no cover
+ raise AssertionError
+
+ if start_of_field:
+ field_name = token_or_element
+ separator = next(buffered_stream, None)
+ value_element = next(buffered_stream, None)
+ if separator is None or value_element is None:
+ # Early EOF - should not be possible with how the tokenizer works
+ # right now, but now it is future-proof.
+ if comment_element:
+ yield comment_element
+ error_elements = [field_name]
+ if separator is not None:
+ error_elements.append(separator)
+ yield Deb822ErrorElement(error_elements)
+ return
+
+ if isinstance(separator, Deb822FieldSeparatorToken) and isinstance(
+ value_element, Deb822ValueElement
+ ):
+ yield Deb822KeyValuePairElement(
+ comment_element,
+ cast("Deb822FieldNameToken", field_name),
+ separator,
+ value_element,
+ )
+ else:
+ # We had a parse error, consume until the newline.
+ error_tokens = [token_or_element] # type: List[TokenOrElement]
+ error_tokens.extend(buffered_stream.takewhile(_non_end_of_line_token))
+ nl = buffered_stream.peek()
+ # Take the newline as well if present
+ if nl and isinstance(nl, Deb822NewlineAfterValueToken):
+ next(buffered_stream, None)
+ error_tokens.append(nl)
+ yield Deb822ErrorElement(error_tokens)
+ else:
+ # Token is not part of a field, emit it as-is
+ yield token_or_element
+
+
+def _abort_on_error_tokens(sequence):
+ # type: (Iterable[TokenOrElement]) -> Iterable[TokenOrElement]
+ line_no = 1
+ for token in sequence:
+ # We are always called while the sequence consists entirely of tokens
+ if token.is_error:
+ error_as_text = token.convert_to_text().replace("\n", "\\n")
+ raise SyntaxOrParseError(
+ 'Syntax or Parse error on or near line {line_no}: "{error_as_text}"'.format(
+ error_as_text=error_as_text, line_no=line_no
+ )
+ )
+ line_no += token.convert_to_text().count("\n")
+ yield token
+
+
+def parse_deb822_file(
+ sequence, # type: Union[Iterable[Union[str, bytes]], str]
+ *,
+ accept_files_with_error_tokens=False, # type: bool
+ accept_files_with_duplicated_fields=False, # type: bool
+ encoding="utf-8", # type: str
+):
+ # type: (...) -> Deb822FileElement
+ """
+
+ :param sequence: An iterable over lines of str or bytes (an open file for
+ reading will do). If line endings are provided in the input, then they
+ must be present on every line (except the last) will be preserved as-is.
+ If omitted and the content is at least 2 lines, then parser will assume
+ implicit newlines.
+ :param accept_files_with_error_tokens: If True, files with critical syntax
+ or parse errors will be returned as "successfully" parsed. Usually,
+ working on files with this kind of errors are not desirable as it is
+ hard to make sense of such files (and they might in fact not be a deb822
+ file at all). When set to False (the default) a ValueError is raised if
+ there is a critical syntax or parse error.
+ Note that duplicated fields in a paragraph is not considered a critical
+ parse error by this parser as the implementation can gracefully cope
+ with these. Use accept_files_with_duplicated_fields to determine if
+ such files should be accepted.
+ :param accept_files_with_duplicated_fields: If True, then
+ files containing paragraphs with duplicated fields will be returned as
+ "successfully" parsed even though they are invalid according to the
+ specification. The paragraphs will prefer the first appearance of the
+ field unless caller explicitly requests otherwise (e.g., via
+ Deb822ParagraphElement.configured_view). If False, then this method
+ will raise a ValueError if any duplicated fields are seen inside any
+ paragraph.
+ :param encoding: The encoding to use (this is here to support Deb822-like
+ APIs, new code should not use this parameter).
+ """
+
+ if isinstance(sequence, (str, bytes)):
+ # Match the deb822 API.
+ sequence = sequence.splitlines(True)
+
+ # The order of operations are important here. As an example,
+ # _build_value_line assumes that all comment tokens have been merged
+ # into comment elements. Likewise, _build_field_and_value assumes
+ # that value tokens (along with their comments) have been combined
+ # into elements.
+ tokens = tokenize_deb822_file(
+ sequence, encoding=encoding
+ ) # type: Iterable[TokenOrElement]
+ if not accept_files_with_error_tokens:
+ tokens = _abort_on_error_tokens(tokens)
+ tokens = _combine_comment_tokens_into_elements(tokens)
+ tokens = _build_value_line(tokens)
+ tokens = _combine_vl_elements_into_value_elements(tokens)
+ tokens = _build_field_with_value(tokens)
+ tokens = _combine_kvp_elements_into_paragraphs(tokens)
+ # Combine any free-floating error tokens into error elements. We do
+ # this last as it enables other parts of the parser to include error
+ # tokens in their error elements if they discover something is wrong.
+ tokens = _combine_error_tokens_into_elements(tokens)
+
+ deb822_file = Deb822FileElement(LinkedList(tokens))
+
+ if not accept_files_with_duplicated_fields:
+ for no, paragraph in enumerate(deb822_file):
+ if isinstance(paragraph, Deb822DuplicateFieldsParagraphElement):
+ field_names = set()
+ dup_field = None
+ for field in paragraph.keys():
+ field_name, _, _ = _unpack_key(field)
+ # assert for mypy
+ assert isinstance(field_name, str)
+ if field_name in field_names:
+ dup_field = field_name
+ break
+ field_names.add(field_name)
+ if dup_field is not None:
+ msg = 'Duplicate field "{dup_field}" in paragraph number {no}'
+ raise ValueError(msg.format(dup_field=dup_field, no=no))
+
+ return deb822_file
+
+
+if __name__ == "__main__": # pragma: no cover
+ import doctest
+
+ doctest.testmod()