summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/comments.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/ruyaml/comments.py1280
1 files changed, 1280 insertions, 0 deletions
diff --git a/lib/ruyaml/comments.py b/lib/ruyaml/comments.py
new file mode 100644
index 0000000..db701c8
--- /dev/null
+++ b/lib/ruyaml/comments.py
@@ -0,0 +1,1280 @@
+# coding: utf-8
+
+"""
+stuff to deal with comments and formatting on dict/list/ordereddict/set
+these are not really related, formatting could be factored out as
+a separate base
+"""
+
+import copy
+import sys
+from collections.abc import Mapping, MutableSet, Set, Sized
+from typing import Any, Dict, Iterator, List, Optional, Union
+
+from ruyaml.anchor import Anchor
+from ruyaml.compat import nprintf # NOQA
+from ruyaml.compat import ordereddict # NOQA; type: ignore
+from ruyaml.compat import _F, MutableSliceableSequence
+from ruyaml.scalarstring import ScalarString
+
+if False: # MYPY
+ from typing import Any, Dict, Iterator, List, Optional, Union # NOQA
+
+# fmt: off
+__all__ = ['CommentedSeq', 'CommentedKeySeq',
+ 'CommentedMap', 'CommentedOrderedMap',
+ 'CommentedSet', 'comment_attrib', 'merge_attrib',
+ 'C_POST', 'C_PRE', 'C_SPLIT_ON_FIRST_BLANK', 'C_BLANK_LINE_PRESERVE_SPACE',
+ ]
+# fmt: on
+
+# splitting of comments by the scanner
+# an EOLC (End-Of-Line Comment) is preceded by some token
+# an FLC (Full Line Comment) is a comment not preceded by a token, i.e. # is
+# the first non-blank on line
+# a BL is a blank line i.e. empty or spaces/tabs only
+# bits 0 and 1 are combined, you can choose only one
+C_POST = 0b00
+C_PRE = 0b01
+C_SPLIT_ON_FIRST_BLANK = (
+ 0b10 # as C_POST, but if blank line then C_PRE all lines before
+)
+# first blank goes to POST even if no following real FLC
+# (first blank -> first of post)
+# 0b11 -> reserved for future use
+C_BLANK_LINE_PRESERVE_SPACE = 0b100
+# C_EOL_PRESERVE_SPACE2 = 0b1000
+
+
+class IDX:
+ # temporary auto increment, so rearranging is easier
+ def __init__(self):
+ # type: () -> None
+ self._idx = 0
+
+ def __call__(self):
+ # type: () -> Any
+ x = self._idx
+ self._idx += 1
+ return x
+
+ def __str__(self):
+ # type: () -> Any
+ return str(self._idx)
+
+
+cidx = IDX()
+
+# more or less in order of subjective expected likelyhood
+# the _POST and _PRE ones are lists themselves
+C_VALUE_EOL = C_ELEM_EOL = cidx()
+C_KEY_EOL = cidx()
+C_KEY_PRE = C_ELEM_PRE = cidx() # not this is not value
+C_VALUE_POST = C_ELEM_POST = cidx() # not this is not value
+C_VALUE_PRE = cidx()
+C_KEY_POST = cidx()
+C_TAG_EOL = cidx()
+C_TAG_POST = cidx()
+C_TAG_PRE = cidx()
+C_ANCHOR_EOL = cidx()
+C_ANCHOR_POST = cidx()
+C_ANCHOR_PRE = cidx()
+
+
+comment_attrib = '_yaml_comment'
+format_attrib = '_yaml_format'
+line_col_attrib = '_yaml_line_col'
+merge_attrib = '_yaml_merge'
+tag_attrib = '_yaml_tag'
+
+
+class Comment:
+ # using sys.getsize tested the Comment objects, __slots__ makes them bigger
+ # and adding self.end did not matter
+ __slots__ = 'comment', '_items', '_post', '_pre'
+ attrib = comment_attrib
+
+ def __init__(self, old=True):
+ # type: (bool) -> None
+ self._pre = None if old else [] # type: ignore
+ self.comment = None # [post, [pre]]
+ # map key (mapping/omap/dict) or index (sequence/list) to a list of
+ # dict: post_key, pre_key, post_value, pre_value
+ # list: pre item, post item
+ self._items = {} # type: Dict[Any, Any]
+ # self._start = [] # should not put these on first item
+ self._post = [] # type: List[Any] # end of document comments
+
+ def __str__(self):
+ # type: () -> str
+ if bool(self._post):
+ end = ',\n end=' + str(self._post)
+ else:
+ end = ""
+ return 'Comment(comment={0},\n items={1}{2})'.format(
+ self.comment, self._items, end
+ )
+
+ def _old__repr__(self):
+ # type: () -> str
+ if bool(self._post):
+ end = ',\n end=' + str(self._post)
+ else:
+ end = ""
+ ln = '' # type: Union[str,int]
+ try:
+ ln = max([len(str(k)) for k in self._items]) + 1
+ except ValueError:
+ pass
+ it = ' '.join(
+ ['{:{}} {}\n'.format(str(k) + ':', ln, v) for k, v in self._items.items()]
+ )
+ if it:
+ it = '\n ' + it + ' '
+ return 'Comment(\n start={},\n items={{{}}}{})'.format(self.comment, it, end)
+
+ def __repr__(self):
+ # type: () -> str
+ if self._pre is None:
+ return self._old__repr__()
+ if bool(self._post):
+ end = ',\n end=' + repr(self._post)
+ else:
+ end = ""
+ try:
+ ln = max([len(str(k)) for k in self._items]) + 1
+ except ValueError:
+ ln = '' # type: ignore
+ it = ' '.join(
+ ['{:{}} {}\n'.format(str(k) + ':', ln, v) for k, v in self._items.items()]
+ )
+ if it:
+ it = '\n ' + it + ' '
+ return 'Comment(\n pre={},\n items={{{}}}{})'.format(self.pre, it, end)
+
+ @property
+ def items(self):
+ # type: () -> Any
+ return self._items
+
+ @property
+ def end(self):
+ # type: () -> Any
+ return self._post
+
+ @end.setter
+ def end(self, value):
+ # type: (Any) -> None
+ self._post = value
+
+ @property
+ def pre(self):
+ # type: () -> Any
+ return self._pre
+
+ @pre.setter
+ def pre(self, value):
+ # type: (Any) -> None
+ self._pre = value
+
+ def get(self, item, pos):
+ # type: (Any, Any) -> Any
+ x = self._items.get(item)
+ if x is None or len(x) < pos:
+ return None
+ return x[pos] # can be None
+
+ def set(self, item, pos, value):
+ # type: (Any, Any, Any) -> Any
+ x = self._items.get(item)
+ if x is None:
+ self._items[item] = x = [None] * (pos + 1)
+ else:
+ while len(x) <= pos:
+ x.append(None)
+ assert x[pos] is None
+ x[pos] = value
+
+ def __contains__(self, x):
+ # type: (Any) -> Any
+ # test if a substring is in any of the attached comments
+ if self.comment:
+ if self.comment[0] and x in self.comment[0].value:
+ return True
+ if self.comment[1]:
+ for c in self.comment[1]:
+ if x in c.value:
+ return True
+ for value in self.items.values():
+ if not value:
+ continue
+ for c in value:
+ if c and x in c.value:
+ return True
+ if self.end:
+ for c in self.end:
+ if x in c.value:
+ return True
+ return False
+
+
+# to distinguish key from None
+def NoComment():
+ # type: () -> None
+ pass
+
+
+class Format:
+ __slots__ = ('_flow_style',)
+ attrib = format_attrib
+
+ def __init__(self):
+ # type: () -> None
+ self._flow_style = None # type: Any
+
+ def set_flow_style(self):
+ # type: () -> None
+ self._flow_style = True
+
+ def set_block_style(self):
+ # type: () -> None
+ self._flow_style = False
+
+ def flow_style(self, default=None):
+ # type: (Optional[Any]) -> Any
+ """if default (the flow_style) is None, the flow style tacked on to
+ the object explicitly will be taken. If that is None as well the
+ default flow style rules the format down the line, or the type
+ of the constituent values (simple -> flow, map/list -> block)"""
+ if self._flow_style is None:
+ return default
+ return self._flow_style
+
+
+class LineCol:
+ """
+ line and column information wrt document, values start at zero (0)
+ """
+
+ attrib = line_col_attrib
+
+ def __init__(self):
+ # type: () -> None
+ self.line = None
+ self.col = None
+ self.data = None # type: Optional[Dict[Any, Any]]
+
+ def add_kv_line_col(self, key, data):
+ # type: (Any, Any) -> None
+ if self.data is None:
+ self.data = {}
+ self.data[key] = data
+
+ def key(self, k):
+ # type: (Any) -> Any
+ return self._kv(k, 0, 1)
+
+ def value(self, k):
+ # type: (Any) -> Any
+ return self._kv(k, 2, 3)
+
+ def _kv(self, k, x0, x1):
+ # type: (Any, Any, Any) -> Any
+ if self.data is None:
+ return None
+ data = self.data[k]
+ return data[x0], data[x1]
+
+ def item(self, idx):
+ # type: (Any) -> Any
+ if self.data is None:
+ return None
+ return self.data[idx][0], self.data[idx][1]
+
+ def add_idx_line_col(self, key, data):
+ # type: (Any, Any) -> None
+ if self.data is None:
+ self.data = {}
+ self.data[key] = data
+
+ def __repr__(self):
+ # type: () -> str
+ return _F('LineCol({line}, {col})', line=self.line, col=self.col) # type: ignore
+
+
+class Tag:
+ """store tag information for roundtripping"""
+
+ __slots__ = ('value',)
+ attrib = tag_attrib
+
+ def __init__(self):
+ # type: () -> None
+ self.value = None
+
+ def __repr__(self):
+ # type: () -> Any
+ return '{0.__class__.__name__}({0.value!r})'.format(self)
+
+
+class CommentedBase:
+ @property
+ def ca(self):
+ # type: () -> Any
+ if not hasattr(self, Comment.attrib):
+ setattr(self, Comment.attrib, Comment())
+ return getattr(self, Comment.attrib)
+
+ def yaml_end_comment_extend(self, comment, clear=False):
+ # type: (Any, bool) -> None
+ if comment is None:
+ return
+ if clear or self.ca.end is None:
+ self.ca.end = []
+ self.ca.end.extend(comment)
+
+ def yaml_key_comment_extend(self, key, comment, clear=False):
+ # type: (Any, Any, bool) -> None
+ r = self.ca._items.setdefault(key, [None, None, None, None])
+ if clear or r[1] is None:
+ if comment[1] is not None:
+ assert isinstance(comment[1], list)
+ r[1] = comment[1]
+ else:
+ r[1].extend(comment[0])
+ r[0] = comment[0]
+
+ def yaml_value_comment_extend(self, key, comment, clear=False):
+ # type: (Any, Any, bool) -> None
+ r = self.ca._items.setdefault(key, [None, None, None, None])
+ if clear or r[3] is None:
+ if comment[1] is not None:
+ assert isinstance(comment[1], list)
+ r[3] = comment[1]
+ else:
+ r[3].extend(comment[0])
+ r[2] = comment[0]
+
+ def yaml_set_start_comment(self, comment, indent=0):
+ # type: (Any, Any) -> None
+ """overwrites any preceding comment lines on an object
+ expects comment to be without `#` and possible have multiple lines
+ """
+ from ruyaml.error import CommentMark
+ from ruyaml.tokens import CommentToken
+
+ pre_comments = self._yaml_clear_pre_comment() # type: ignore
+ if comment[-1] == '\n':
+ comment = comment[:-1] # strip final newline if there
+ start_mark = CommentMark(indent)
+ for com in comment.split('\n'):
+ c = com.strip()
+ if len(c) > 0 and c[0] != '#':
+ com = '# ' + com
+ pre_comments.append(CommentToken(com + '\n', start_mark))
+
+ def yaml_set_comment_before_after_key(
+ self, key, before=None, indent=0, after=None, after_indent=None
+ ):
+ # type: (Any, Any, Any, Any, Any) -> None
+ """
+ expects comment (before/after) to be without `#` and possible have multiple lines
+ """
+ from ruyaml.error import CommentMark
+ from ruyaml.tokens import CommentToken
+
+ def comment_token(s, mark):
+ # type: (Any, Any) -> Any
+ # handle empty lines as having no comment
+ return CommentToken(('# ' if s else "") + s + '\n', mark)
+
+ if after_indent is None:
+ after_indent = indent + 2
+ if before and (len(before) > 1) and before[-1] == '\n':
+ before = before[:-1] # strip final newline if there
+ if after and after[-1] == '\n':
+ after = after[:-1] # strip final newline if there
+ start_mark = CommentMark(indent)
+ c = self.ca.items.setdefault(key, [None, [], None, None])
+ if before is not None:
+ if c[1] is None:
+ c[1] = []
+ if before == '\n':
+ c[1].append(comment_token("", start_mark)) # type: ignore
+ else:
+ for com in before.split('\n'):
+ c[1].append(comment_token(com, start_mark)) # type: ignore
+ if after:
+ start_mark = CommentMark(after_indent)
+ if c[3] is None:
+ c[3] = []
+ for com in after.split('\n'):
+ c[3].append(comment_token(com, start_mark)) # type: ignore
+
+ @property
+ def fa(self):
+ # type: () -> Any
+ """format attribute
+
+ set_flow_style()/set_block_style()"""
+ if not hasattr(self, Format.attrib):
+ setattr(self, Format.attrib, Format())
+ return getattr(self, Format.attrib)
+
+ def yaml_add_eol_comment(self, comment, key=NoComment, column=None):
+ # type: (Any, Optional[Any], Optional[Any]) -> None
+ """
+ there is a problem as eol comments should start with ' #'
+ (but at the beginning of the line the space doesn't have to be before
+ the #. The column index is for the # mark
+ """
+ from ruyaml.error import CommentMark
+ from ruyaml.tokens import CommentToken
+
+ if column is None:
+ try:
+ column = self._yaml_get_column(key)
+ except AttributeError:
+ column = 0
+ if comment[0] != '#':
+ comment = '# ' + comment
+ if column is None:
+ if comment[0] == '#':
+ comment = ' ' + comment
+ column = 0
+ start_mark = CommentMark(column)
+ ct = [CommentToken(comment, start_mark), None]
+ self._yaml_add_eol_comment(ct, key=key)
+
+ @property
+ def lc(self):
+ # type: () -> Any
+ if not hasattr(self, LineCol.attrib):
+ setattr(self, LineCol.attrib, LineCol())
+ return getattr(self, LineCol.attrib)
+
+ def _yaml_set_line_col(self, line, col):
+ # type: (Any, Any) -> None
+ self.lc.line = line
+ self.lc.col = col
+
+ def _yaml_set_kv_line_col(self, key, data):
+ # type: (Any, Any) -> None
+ self.lc.add_kv_line_col(key, data)
+
+ def _yaml_set_idx_line_col(self, key, data):
+ # type: (Any, Any) -> None
+ self.lc.add_idx_line_col(key, data)
+
+ @property
+ def anchor(self):
+ # type: () -> Any
+ if not hasattr(self, Anchor.attrib):
+ setattr(self, Anchor.attrib, Anchor())
+ return getattr(self, Anchor.attrib)
+
+ def yaml_anchor(self):
+ # type: () -> Any
+ if not hasattr(self, Anchor.attrib):
+ return None
+ return self.anchor
+
+ def yaml_set_anchor(self, value, always_dump=False):
+ # type: (Any, bool) -> None
+ self.anchor.value = value
+ self.anchor.always_dump = always_dump
+
+ @property
+ def tag(self):
+ # type: () -> Any
+ if not hasattr(self, Tag.attrib):
+ setattr(self, Tag.attrib, Tag())
+ return getattr(self, Tag.attrib)
+
+ def yaml_set_tag(self, value):
+ # type: (Any) -> None
+ self.tag.value = value
+
+ def copy_attributes(self, t, memo=None):
+ # type: (Any, Any) -> None
+ # fmt: off
+ for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib,
+ Tag.attrib, merge_attrib]:
+ if hasattr(self, a):
+ if memo is not None:
+ setattr(t, a, copy.deepcopy(getattr(self, a, memo)))
+ else:
+ setattr(t, a, getattr(self, a))
+ # fmt: on
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ raise NotImplementedError
+
+ def _yaml_get_pre_comment(self):
+ # type: () -> Any
+ raise NotImplementedError
+
+ def _yaml_clear_pre_comment(self):
+ # type: () -> Any
+ raise NotImplementedError
+
+ def _yaml_get_column(self, key):
+ # type: (Any) -> Any
+ raise NotImplementedError
+
+
+class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore
+ __slots__ = (Comment.attrib, '_lst')
+
+ def __init__(self, *args, **kw):
+ # type: (Any, Any) -> None
+ list.__init__(self, *args, **kw)
+
+ def __getsingleitem__(self, idx):
+ # type: (Any) -> Any
+ return list.__getitem__(self, idx)
+
+ def __setsingleitem__(self, idx, value):
+ # type: (Any, Any) -> None
+ # try to preserve the scalarstring type if setting an existing key to a new value
+ if idx < len(self):
+ if (
+ isinstance(value, str)
+ and not isinstance(value, ScalarString)
+ and isinstance(self[idx], ScalarString)
+ ):
+ value = type(self[idx])(value)
+ list.__setitem__(self, idx, value)
+
+ def __delsingleitem__(self, idx=None):
+ # type: (Any) -> Any
+ list.__delitem__(self, idx) # type: ignore
+ self.ca.items.pop(idx, None) # might not be there -> default value
+ for list_index in sorted(self.ca.items):
+ if list_index < idx:
+ continue
+ self.ca.items[list_index - 1] = self.ca.items.pop(list_index)
+
+ def __len__(self):
+ # type: () -> int
+ return list.__len__(self)
+
+ def insert(self, idx, val):
+ # type: (Any, Any) -> None
+ """the comments after the insertion have to move forward"""
+ list.insert(self, idx, val)
+ for list_index in sorted(self.ca.items, reverse=True):
+ if list_index < idx:
+ break
+ self.ca.items[list_index + 1] = self.ca.items.pop(list_index)
+
+ def extend(self, val):
+ # type: (Any) -> None
+ list.extend(self, val)
+
+ def __eq__(self, other):
+ # type: (Any) -> bool
+ return list.__eq__(self, other)
+
+ def _yaml_add_comment(self, comment, key=NoComment):
+ # type: (Any, Optional[Any]) -> None
+ if key is not NoComment:
+ self.yaml_key_comment_extend(key, comment, clear=True)
+ else:
+ self.ca.comment = comment
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ self._yaml_add_comment(comment, key=key)
+
+ def _yaml_get_columnX(self, key):
+ # type: (Any) -> Any
+ return self.ca.items[key][0].start_mark.column
+
+ def _yaml_get_column(self, key):
+ # type: (Any) -> Any
+ column = None
+ sel_idx = None
+ pre, post = key - 1, key + 1
+ if pre in self.ca.items:
+ sel_idx = pre
+ elif post in self.ca.items:
+ sel_idx = post
+ else:
+ # self.ca.items is not ordered
+ for row_idx, _k1 in enumerate(self):
+ if row_idx >= key:
+ break
+ if row_idx not in self.ca.items:
+ continue
+ sel_idx = row_idx
+ if sel_idx is not None:
+ column = self._yaml_get_columnX(sel_idx)
+ return column
+
+ def _yaml_get_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ pre_comments = self.ca.comment[1]
+ return pre_comments
+
+ def _yaml_clear_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ self.ca.comment[1] = pre_comments
+ return pre_comments
+
+ def __deepcopy__(self, memo):
+ # type: (Any) -> Any
+ res = self.__class__()
+ memo[id(self)] = res
+ for k in self:
+ res.append(copy.deepcopy(k, memo))
+ self.copy_attributes(res, memo=memo)
+ return res
+
+ def __add__(self, other):
+ # type: (Any) -> Any
+ return list.__add__(self, other)
+
+ def sort(self, key=None, reverse=False):
+ # type: (Any, bool) -> None
+ if key is None:
+ tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse)
+ list.__init__(self, [x[0] for x in tmp_lst])
+ else:
+ tmp_lst = sorted(
+ zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse
+ )
+ list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst])
+ itm = self.ca.items
+ self.ca._items = {}
+ for idx, x in enumerate(tmp_lst):
+ old_index = x[1]
+ if old_index in itm:
+ self.ca.items[idx] = itm[old_index]
+
+ def __repr__(self):
+ # type: () -> Any
+ return list.__repr__(self)
+
+
+class CommentedKeySeq(tuple, CommentedBase): # type: ignore
+ """This primarily exists to be able to roundtrip keys that are sequences"""
+
+ def _yaml_add_comment(self, comment, key=NoComment):
+ # type: (Any, Optional[Any]) -> None
+ if key is not NoComment:
+ self.yaml_key_comment_extend(key, comment)
+ else:
+ self.ca.comment = comment
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ self._yaml_add_comment(comment, key=key)
+
+ def _yaml_get_columnX(self, key):
+ # type: (Any) -> Any
+ return self.ca.items[key][0].start_mark.column
+
+ def _yaml_get_column(self, key):
+ # type: (Any) -> Any
+ column = None
+ sel_idx = None
+ pre, post = key - 1, key + 1
+ if pre in self.ca.items:
+ sel_idx = pre
+ elif post in self.ca.items:
+ sel_idx = post
+ else:
+ # self.ca.items is not ordered
+ for row_idx, _k1 in enumerate(self):
+ if row_idx >= key:
+ break
+ if row_idx not in self.ca.items:
+ continue
+ sel_idx = row_idx
+ if sel_idx is not None:
+ column = self._yaml_get_columnX(sel_idx)
+ return column
+
+ def _yaml_get_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ pre_comments = self.ca.comment[1]
+ return pre_comments
+
+ def _yaml_clear_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ self.ca.comment[1] = pre_comments
+ return pre_comments
+
+
+class CommentedMapView(Sized):
+ __slots__ = ('_mapping',)
+
+ def __init__(self, mapping):
+ # type: (Any) -> None
+ self._mapping = mapping
+
+ def __len__(self):
+ # type: () -> int
+ count = len(self._mapping)
+ return count
+
+
+class CommentedMapKeysView(CommentedMapView, Set): # type: ignore
+ __slots__ = ()
+
+ @classmethod
+ def _from_iterable(self, it):
+ # type: (Any) -> Any
+ return set(it)
+
+ def __contains__(self, key):
+ # type: (Any) -> Any
+ return key in self._mapping
+
+ def __iter__(self):
+ # type: () -> Any # yield from self._mapping # not in pypy
+ # for x in self._mapping._keys():
+ for x in self._mapping:
+ yield x
+
+
+class CommentedMapItemsView(CommentedMapView, Set): # type: ignore
+ __slots__ = ()
+
+ @classmethod
+ def _from_iterable(self, it):
+ # type: (Any) -> Any
+ return set(it)
+
+ def __contains__(self, item):
+ # type: (Any) -> Any
+ key, value = item
+ try:
+ v = self._mapping[key]
+ except KeyError:
+ return False
+ else:
+ return v == value
+
+ def __iter__(self):
+ # type: () -> Any
+ for key in self._mapping._keys():
+ yield (key, self._mapping[key])
+
+
+class CommentedMapValuesView(CommentedMapView):
+ __slots__ = ()
+
+ def __contains__(self, value):
+ # type: (Any) -> Any
+ for key in self._mapping:
+ if value == self._mapping[key]:
+ return True
+ return False
+
+ def __iter__(self):
+ # type: () -> Any
+ for key in self._mapping._keys():
+ yield self._mapping[key]
+
+
+class CommentedMap(ordereddict, CommentedBase):
+ __slots__ = (Comment.attrib, '_ok', '_ref')
+
+ def __init__(self, *args, **kw):
+ # type: (Any, Any) -> None
+ self._ok = set() # type: MutableSet[Any] # own keys
+ self._ref = [] # type: List[CommentedMap]
+ ordereddict.__init__(self, *args, **kw)
+
+ def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
+ # type: (Any, Optional[Any], Optional[Any]) -> None
+ """values is set to key to indicate a value attachment of comment"""
+ if key is not NoComment:
+ self.yaml_key_comment_extend(key, comment)
+ return
+ if value is not NoComment:
+ self.yaml_value_comment_extend(value, comment)
+ else:
+ self.ca.comment = comment
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ """add on the value line, with value specified by the key"""
+ self._yaml_add_comment(comment, value=key)
+
+ def _yaml_get_columnX(self, key):
+ # type: (Any) -> Any
+ return self.ca.items[key][2].start_mark.column
+
+ def _yaml_get_column(self, key):
+ # type: (Any) -> Any
+ column = None
+ sel_idx = None
+ pre, post, last = None, None, None
+ for x in self:
+ if pre is not None and x != key:
+ post = x
+ break
+ if x == key:
+ pre = last
+ last = x
+ if pre in self.ca.items:
+ sel_idx = pre
+ elif post in self.ca.items:
+ sel_idx = post
+ else:
+ # self.ca.items is not ordered
+ for k1 in self:
+ if k1 >= key:
+ break
+ if k1 not in self.ca.items:
+ continue
+ sel_idx = k1
+ if sel_idx is not None:
+ column = self._yaml_get_columnX(sel_idx)
+ return column
+
+ def _yaml_get_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ pre_comments = self.ca.comment[1]
+ return pre_comments
+
+ def _yaml_clear_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ self.ca.comment[1] = pre_comments
+ return pre_comments
+
+ def update(self, *vals, **kw):
+ # type: (Any, Any) -> None
+ try:
+ ordereddict.update(self, *vals, **kw)
+ except TypeError:
+ # probably a dict that is used
+ for x in vals[0]:
+ self[x] = vals[0][x]
+ if vals:
+ try:
+ self._ok.update(vals[0].keys()) # type: ignore
+ except AttributeError:
+ # assume one argument that is a list/tuple of two element lists/tuples
+ for x in vals[0]:
+ self._ok.add(x[0])
+ if kw:
+ self._ok.add(*kw.keys())
+
+ def insert(self, pos, key, value, comment=None):
+ # type: (Any, Any, Any, Optional[Any]) -> None
+ """insert key value into given position
+ attach comment if provided
+ """
+ keys = list(self.keys()) + [key]
+ ordereddict.insert(self, pos, key, value)
+ for keytmp in keys:
+ self._ok.add(keytmp)
+ for referer in self._ref:
+ for keytmp in keys:
+ referer.update_key_value(keytmp)
+ if comment is not None:
+ self.yaml_add_eol_comment(comment, key=key)
+
+ def mlget(self, key, default=None, list_ok=False):
+ # type: (Any, Any, Any) -> Any
+ """multi-level get that expects dicts within dicts"""
+ if not isinstance(key, list):
+ return self.get(key, default)
+ # assume that the key is a list of recursively accessible dicts
+
+ def get_one_level(key_list, level, d):
+ # type: (Any, Any, Any) -> Any
+ if not list_ok:
+ assert isinstance(d, dict)
+ if level >= len(key_list):
+ if level > len(key_list):
+ raise IndexError
+ return d[key_list[level - 1]]
+ return get_one_level(key_list, level + 1, d[key_list[level - 1]])
+
+ try:
+ return get_one_level(key, 1, self)
+ except KeyError:
+ return default
+ except (TypeError, IndexError):
+ if not list_ok:
+ raise
+ return default
+
+ def __getitem__(self, key):
+ # type: (Any) -> Any
+ try:
+ return ordereddict.__getitem__(self, key)
+ except KeyError:
+ for merged in getattr(self, merge_attrib, []):
+ if key in merged[1]:
+ return merged[1][key]
+ raise
+
+ def __setitem__(self, key, value):
+ # type: (Any, Any) -> None
+ # try to preserve the scalarstring type if setting an existing key to a new value
+ if key in self:
+ if (
+ isinstance(value, str)
+ and not isinstance(value, ScalarString)
+ and isinstance(self[key], ScalarString)
+ ):
+ value = type(self[key])(value)
+ ordereddict.__setitem__(self, key, value)
+ self._ok.add(key)
+
+ def _unmerged_contains(self, key):
+ # type: (Any) -> Any
+ if key in self._ok:
+ return True
+ return None
+
+ def __contains__(self, key):
+ # type: (Any) -> bool
+ return bool(ordereddict.__contains__(self, key))
+
+ def get(self, key, default=None):
+ # type: (Any, Any) -> Any
+ try:
+ return self.__getitem__(key)
+ except: # NOQA
+ return default
+
+ def __repr__(self):
+ # type: () -> Any
+ return ordereddict.__repr__(self).replace('CommentedMap', 'ordereddict')
+
+ def non_merged_items(self):
+ # type: () -> Any
+ for x in ordereddict.__iter__(self):
+ if x in self._ok:
+ yield x, ordereddict.__getitem__(self, x)
+
+ def __delitem__(self, key):
+ # type: (Any) -> None
+ # for merged in getattr(self, merge_attrib, []):
+ # if key in merged[1]:
+ # value = merged[1][key]
+ # break
+ # else:
+ # # not found in merged in stuff
+ # ordereddict.__delitem__(self, key)
+ # for referer in self._ref:
+ # referer.update=_key_value(key)
+ # return
+ #
+ # ordereddict.__setitem__(self, key, value) # merge might have different value
+ # self._ok.discard(key)
+ self._ok.discard(key)
+ ordereddict.__delitem__(self, key)
+ for referer in self._ref:
+ referer.update_key_value(key)
+
+ def __iter__(self):
+ # type: () -> Any
+ for x in ordereddict.__iter__(self):
+ yield x
+
+ def _keys(self):
+ # type: () -> Any
+ for x in ordereddict.__iter__(self):
+ yield x
+
+ def __len__(self):
+ # type: () -> int
+ return int(ordereddict.__len__(self))
+
+ def __eq__(self, other):
+ # type: (Any) -> bool
+ return bool(dict(self) == other)
+
+ def keys(self):
+ # type: () -> Any
+ return CommentedMapKeysView(self)
+
+ def values(self):
+ # type: () -> Any
+ return CommentedMapValuesView(self)
+
+ def _items(self):
+ # type: () -> Any
+ for x in ordereddict.__iter__(self):
+ yield x, ordereddict.__getitem__(self, x)
+
+ def items(self):
+ # type: () -> Any
+ return CommentedMapItemsView(self)
+
+ @property
+ def merge(self):
+ # type: () -> Any
+ if not hasattr(self, merge_attrib):
+ setattr(self, merge_attrib, [])
+ return getattr(self, merge_attrib)
+
+ def copy(self):
+ # type: () -> Any
+ x = type(self)() # update doesn't work
+ for k, v in self._items():
+ x[k] = v
+ self.copy_attributes(x)
+ return x
+
+ def add_referent(self, cm):
+ # type: (Any) -> None
+ if cm not in self._ref:
+ self._ref.append(cm)
+
+ def add_yaml_merge(self, value):
+ # type: (Any) -> None
+ for v in value:
+ v[1].add_referent(self)
+ for k, v in v[1].items():
+ if ordereddict.__contains__(self, k):
+ continue
+ ordereddict.__setitem__(self, k, v)
+ self.merge.extend(value)
+
+ def update_key_value(self, key):
+ # type: (Any) -> None
+ if key in self._ok:
+ return
+ for v in self.merge:
+ if key in v[1]:
+ ordereddict.__setitem__(self, key, v[1][key])
+ return
+ ordereddict.__delitem__(self, key)
+
+ def __deepcopy__(self, memo):
+ # type: (Any) -> Any
+ res = self.__class__()
+ memo[id(self)] = res
+ for k in self:
+ res[k] = copy.deepcopy(self[k], memo)
+ self.copy_attributes(res, memo=memo)
+ return res
+
+
+# based on brownie mappings
+@classmethod # type: ignore
+def raise_immutable(cls, *args, **kwargs):
+ # type: (Any, *Any, **Any) -> None
+ raise TypeError('{} objects are immutable'.format(cls.__name__))
+
+
+class CommentedKeyMap(CommentedBase, Mapping): # type: ignore
+ __slots__ = Comment.attrib, '_od'
+ """This primarily exists to be able to roundtrip keys that are mappings"""
+
+ def __init__(self, *args, **kw):
+ # type: (Any, Any) -> None
+ if hasattr(self, '_od'):
+ raise_immutable(self)
+ try:
+ self._od = ordereddict(*args, **kw)
+ except TypeError:
+ raise
+
+ __delitem__ = (
+ __setitem__
+ ) = clear = pop = popitem = setdefault = update = raise_immutable
+
+ # need to implement __getitem__, __iter__ and __len__
+ def __getitem__(self, index):
+ # type: (Any) -> Any
+ return self._od[index]
+
+ def __iter__(self):
+ # type: () -> Iterator[Any]
+ for x in self._od.__iter__():
+ yield x
+
+ def __len__(self):
+ # type: () -> int
+ return len(self._od)
+
+ def __hash__(self):
+ # type: () -> Any
+ return hash(tuple(self.items()))
+
+ def __repr__(self):
+ # type: () -> Any
+ if not hasattr(self, merge_attrib):
+ return self._od.__repr__()
+ return 'ordereddict(' + repr(list(self._od.items())) + ')'
+
+ @classmethod
+ def fromkeys(keys, v=None):
+ # type: (Any, Any) -> Any
+ return CommentedKeyMap(dict.fromkeys(keys, v))
+
+ def _yaml_add_comment(self, comment, key=NoComment):
+ # type: (Any, Optional[Any]) -> None
+ if key is not NoComment:
+ self.yaml_key_comment_extend(key, comment)
+ else:
+ self.ca.comment = comment
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ self._yaml_add_comment(comment, key=key)
+
+ def _yaml_get_columnX(self, key):
+ # type: (Any) -> Any
+ return self.ca.items[key][0].start_mark.column
+
+ def _yaml_get_column(self, key):
+ # type: (Any) -> Any
+ column = None
+ sel_idx = None
+ pre, post = key - 1, key + 1
+ if pre in self.ca.items:
+ sel_idx = pre
+ elif post in self.ca.items:
+ sel_idx = post
+ else:
+ # self.ca.items is not ordered
+ for row_idx, _k1 in enumerate(self):
+ if row_idx >= key:
+ break
+ if row_idx not in self.ca.items:
+ continue
+ sel_idx = row_idx
+ if sel_idx is not None:
+ column = self._yaml_get_columnX(sel_idx)
+ return column
+
+ def _yaml_get_pre_comment(self):
+ # type: () -> Any
+ pre_comments = [] # type: List[Any]
+ if self.ca.comment is None:
+ self.ca.comment = [None, pre_comments]
+ else:
+ self.ca.comment[1] = pre_comments
+ return pre_comments
+
+
+class CommentedOrderedMap(CommentedMap):
+ __slots__ = (Comment.attrib,)
+
+
+class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA
+ __slots__ = Comment.attrib, 'odict'
+
+ def __init__(self, values=None):
+ # type: (Any) -> None
+ self.odict = ordereddict()
+ MutableSet.__init__(self)
+ if values is not None:
+ self |= values # type: ignore
+
+ def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
+ # type: (Any, Optional[Any], Optional[Any]) -> None
+ """values is set to key to indicate a value attachment of comment"""
+ if key is not NoComment:
+ self.yaml_key_comment_extend(key, comment)
+ return
+ if value is not NoComment:
+ self.yaml_value_comment_extend(value, comment)
+ else:
+ self.ca.comment = comment
+
+ def _yaml_add_eol_comment(self, comment, key):
+ # type: (Any, Any) -> None
+ """add on the value line, with value specified by the key"""
+ self._yaml_add_comment(comment, value=key)
+
+ def add(self, value):
+ # type: (Any) -> None
+ """Add an element."""
+ self.odict[value] = None
+
+ def discard(self, value):
+ # type: (Any) -> None
+ """Remove an element. Do not raise an exception if absent."""
+ del self.odict[value]
+
+ def __contains__(self, x):
+ # type: (Any) -> Any
+ return x in self.odict
+
+ def __iter__(self):
+ # type: () -> Any
+ for x in self.odict:
+ yield x
+
+ def __len__(self):
+ # type: () -> int
+ return len(self.odict)
+
+ def __repr__(self):
+ # type: () -> str
+ return 'set({0!r})'.format(self.odict.keys())
+
+
+class TaggedScalar(CommentedBase):
+ # the value and style attributes are set during roundtrip construction
+ def __init__(self, value=None, style=None, tag=None):
+ # type: (Any, Any, Any) -> None
+ self.value = value
+ self.style = style
+ if tag is not None:
+ self.yaml_set_tag(tag)
+
+ def __str__(self):
+ # type: () -> Any
+ return self.value
+
+
+def dump_comments(d, name="", sep='.', out=sys.stdout):
+ # type: (Any, str, str, Any) -> None
+ """
+ recursively dump comments, all but the toplevel preceded by the path
+ in dotted form x.0.a
+ """
+ if isinstance(d, dict) and hasattr(d, 'ca'):
+ if name:
+ out.write('{} {}\n'.format(name, type(d)))
+ out.write('{!r}\n'.format(d.ca)) # type: ignore
+ for k in d:
+ dump_comments(
+ d[k], name=(name + sep + str(k)) if name else k, sep=sep, out=out
+ )
+ elif isinstance(d, list) and hasattr(d, 'ca'):
+ if name:
+ out.write('{} {}\n'.format(name, type(d)))
+ out.write('{!r}\n'.format(d.ca)) # type: ignore
+ for idx, k in enumerate(d):
+ dump_comments(
+ k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out
+ )