diff options
Diffstat (limited to 'comments.py')
-rw-r--r-- | comments.py | 1175 |
1 files changed, 1175 insertions, 0 deletions
diff --git a/comments.py b/comments.py new file mode 100644 index 0000000..843b329 --- /dev/null +++ b/comments.py @@ -0,0 +1,1175 @@ +# coding: utf-8 + +""" +stuff to deal with comments and formatting on dict/list/ordereddict/set +these are not really related, formatting could be factored out as +a separate base +""" + +import sys +import copy + + +from ruamel.yaml.compat import ordereddict +from ruamel.yaml.compat import MutableSliceableSequence, nprintf # NOQA +from ruamel.yaml.scalarstring import ScalarString +from ruamel.yaml.anchor import Anchor +from ruamel.yaml.tag import Tag + +from collections.abc import MutableSet, Sized, Set, Mapping + +from typing import Any, Dict, Optional, List, Union, Optional, Iterator # NOQA + +# fmt: off +__all__ = ['CommentedSeq', 'CommentedKeySeq', + 'CommentedMap', 'CommentedOrderedMap', + 'CommentedSet', 'comment_attrib', 'merge_attrib', + 'TaggedScalar', + 'C_POST', 'C_PRE', 'C_SPLIT_ON_FIRST_BLANK', 'C_BLANK_LINE_PRESERVE_SPACE', + ] +# fmt: on + +# splitting of comments by the scanner +# an EOLC (End-Of-Line Comment) is preceded by some token +# an FLC (Full Line Comment) is a comment not preceded by a token, i.e. # is +# the first non-blank on line +# a BL is a blank line i.e. empty or spaces/tabs only +# bits 0 and 1 are combined, you can choose only one +C_POST = 0b00 +C_PRE = 0b01 +C_SPLIT_ON_FIRST_BLANK = 0b10 # as C_POST, but if blank line then C_PRE all lines before +# first blank goes to POST even if no following real FLC +# (first blank -> first of post) +# 0b11 -> reserved for future use +C_BLANK_LINE_PRESERVE_SPACE = 0b100 +# C_EOL_PRESERVE_SPACE2 = 0b1000 + + +class IDX: + # temporary auto increment, so rearranging is easier + def __init__(self) -> None: + self._idx = 0 + + def __call__(self) -> Any: + x = self._idx + self._idx += 1 + return x + + def __str__(self) -> Any: + return str(self._idx) + + +cidx = IDX() + +# more or less in order of subjective expected likelyhood +# the _POST and _PRE ones are lists themselves +C_VALUE_EOL = C_ELEM_EOL = cidx() +C_KEY_EOL = cidx() +C_KEY_PRE = C_ELEM_PRE = cidx() # not this is not value +C_VALUE_POST = C_ELEM_POST = cidx() # not this is not value +C_VALUE_PRE = cidx() +C_KEY_POST = cidx() +C_TAG_EOL = cidx() +C_TAG_POST = cidx() +C_TAG_PRE = cidx() +C_ANCHOR_EOL = cidx() +C_ANCHOR_POST = cidx() +C_ANCHOR_PRE = cidx() + + +comment_attrib = '_yaml_comment' +format_attrib = '_yaml_format' +line_col_attrib = '_yaml_line_col' +merge_attrib = '_yaml_merge' + + +class Comment: + # using sys.getsize tested the Comment objects, __slots__ makes them bigger + # and adding self.end did not matter + __slots__ = 'comment', '_items', '_post', '_pre' + attrib = comment_attrib + + def __init__(self, old: bool = True) -> None: + self._pre = None if old else [] # type: ignore + self.comment = None # [post, [pre]] + # map key (mapping/omap/dict) or index (sequence/list) to a list of + # dict: post_key, pre_key, post_value, pre_value + # list: pre item, post item + self._items: Dict[Any, Any] = {} + # self._start = [] # should not put these on first item + self._post: List[Any] = [] # end of document comments + + def __str__(self) -> str: + if bool(self._post): + end = ',\n end=' + str(self._post) + else: + end = "" + return f'Comment(comment={self.comment},\n items={self._items}{end})' + + def _old__repr__(self) -> str: + if bool(self._post): + end = ',\n end=' + str(self._post) + else: + end = "" + try: + ln = max([len(str(k)) for k in self._items]) + 1 + except ValueError: + ln = '' # type: ignore + it = ' '.join([f'{str(k) + ":":{ln}} {v}\n' for k, v in self._items.items()]) + if it: + it = '\n ' + it + ' ' + return f'Comment(\n start={self.comment},\n items={{{it}}}{end})' + + def __repr__(self) -> str: + if self._pre is None: + return self._old__repr__() + if bool(self._post): + end = ',\n end=' + repr(self._post) + else: + end = "" + try: + ln = max([len(str(k)) for k in self._items]) + 1 + except ValueError: + ln = '' # type: ignore + it = ' '.join([f'{str(k) + ":":{ln}} {v}\n' for k, v in self._items.items()]) + if it: + it = '\n ' + it + ' ' + return f'Comment(\n pre={self.pre},\n items={{{it}}}{end})' + + @property + def items(self) -> Any: + return self._items + + @property + def end(self) -> Any: + return self._post + + @end.setter + def end(self, value: Any) -> None: + self._post = value + + @property + def pre(self) -> Any: + return self._pre + + @pre.setter + def pre(self, value: Any) -> None: + self._pre = value + + def get(self, item: Any, pos: Any) -> Any: + x = self._items.get(item) + if x is None or len(x) < pos: + return None + return x[pos] # can be None + + def set(self, item: Any, pos: Any, value: Any) -> Any: + x = self._items.get(item) + if x is None: + self._items[item] = x = [None] * (pos + 1) + else: + while len(x) <= pos: + x.append(None) + assert x[pos] is None + x[pos] = value + + def __contains__(self, x: Any) -> Any: + # test if a substring is in any of the attached comments + if self.comment: + if self.comment[0] and x in self.comment[0].value: + return True + if self.comment[1]: + for c in self.comment[1]: + if x in c.value: + return True + for value in self.items.values(): + if not value: + continue + for c in value: + if c and x in c.value: + return True + if self.end: + for c in self.end: + if x in c.value: + return True + return False + + +# to distinguish key from None +class NotNone: + pass # NOQA + + +class Format: + __slots__ = ('_flow_style',) + attrib = format_attrib + + def __init__(self) -> None: + self._flow_style: Any = None + + def set_flow_style(self) -> None: + self._flow_style = True + + def set_block_style(self) -> None: + self._flow_style = False + + def flow_style(self, default: Optional[Any] = None) -> Any: + """if default (the flow_style) is None, the flow style tacked on to + the object explicitly will be taken. If that is None as well the + default flow style rules the format down the line, or the type + of the constituent values (simple -> flow, map/list -> block)""" + if self._flow_style is None: + return default + return self._flow_style + + def __repr__(self) -> str: + return f'Format({self._flow_style})' + + +class LineCol: + """ + line and column information wrt document, values start at zero (0) + """ + + attrib = line_col_attrib + + def __init__(self) -> None: + self.line = None + self.col = None + self.data: Optional[Dict[Any, Any]] = None + + def add_kv_line_col(self, key: Any, data: Any) -> None: + if self.data is None: + self.data = {} + self.data[key] = data + + def key(self, k: Any) -> Any: + return self._kv(k, 0, 1) + + def value(self, k: Any) -> Any: + return self._kv(k, 2, 3) + + def _kv(self, k: Any, x0: Any, x1: Any) -> Any: + if self.data is None: + return None + data = self.data[k] + return data[x0], data[x1] + + def item(self, idx: Any) -> Any: + if self.data is None: + return None + return self.data[idx][0], self.data[idx][1] + + def add_idx_line_col(self, key: Any, data: Any) -> None: + if self.data is None: + self.data = {} + self.data[key] = data + + def __repr__(self) -> str: + return f'LineCol({self.line}, {self.col})' + + +class CommentedBase: + @property + def ca(self): + # type: () -> Any + if not hasattr(self, Comment.attrib): + setattr(self, Comment.attrib, Comment()) + return getattr(self, Comment.attrib) + + def yaml_end_comment_extend(self, comment: Any, clear: bool = False) -> None: + if comment is None: + return + if clear or self.ca.end is None: + self.ca.end = [] + self.ca.end.extend(comment) + + def yaml_key_comment_extend(self, key: Any, comment: Any, clear: bool = False) -> None: + r = self.ca._items.setdefault(key, [None, None, None, None]) + if clear or r[1] is None: + if comment[1] is not None: + assert isinstance(comment[1], list) + r[1] = comment[1] + else: + r[1].extend(comment[0]) + r[0] = comment[0] + + def yaml_value_comment_extend(self, key: Any, comment: Any, clear: bool = False) -> None: + r = self.ca._items.setdefault(key, [None, None, None, None]) + if clear or r[3] is None: + if comment[1] is not None: + assert isinstance(comment[1], list) + r[3] = comment[1] + else: + r[3].extend(comment[0]) + r[2] = comment[0] + + def yaml_set_start_comment(self, comment: Any, indent: Any = 0) -> None: + """overwrites any preceding comment lines on an object + expects comment to be without `#` and possible have multiple lines + """ + from .error import CommentMark + from .tokens import CommentToken + + pre_comments = self._yaml_clear_pre_comment() # type: ignore + if comment[-1] == '\n': + comment = comment[:-1] # strip final newline if there + start_mark = CommentMark(indent) + for com in comment.split('\n'): + c = com.strip() + if len(c) > 0 and c[0] != '#': + com = '# ' + com + pre_comments.append(CommentToken(com + '\n', start_mark)) + + def yaml_set_comment_before_after_key( + self, + key: Any, + before: Any = None, + indent: Any = 0, + after: Any = None, + after_indent: Any = None, + ) -> None: + """ + expects comment (before/after) to be without `#` and possible have multiple lines + """ + from ruamel.yaml.error import CommentMark + from ruamel.yaml.tokens import CommentToken + + def comment_token(s: Any, mark: Any) -> Any: + # handle empty lines as having no comment + return CommentToken(('# ' if s else "") + s + '\n', mark) + + if after_indent is None: + after_indent = indent + 2 + if before and (len(before) > 1) and before[-1] == '\n': + before = before[:-1] # strip final newline if there + if after and after[-1] == '\n': + after = after[:-1] # strip final newline if there + start_mark = CommentMark(indent) + c = self.ca.items.setdefault(key, [None, [], None, None]) + if before is not None: + if c[1] is None: + c[1] = [] + if before == '\n': + c[1].append(comment_token("", start_mark)) # type: ignore + else: + for com in before.split('\n'): + c[1].append(comment_token(com, start_mark)) # type: ignore + if after: + start_mark = CommentMark(after_indent) + if c[3] is None: + c[3] = [] + for com in after.split('\n'): + c[3].append(comment_token(com, start_mark)) # type: ignore + + @property + def fa(self) -> Any: + """format attribute + + set_flow_style()/set_block_style()""" + if not hasattr(self, Format.attrib): + setattr(self, Format.attrib, Format()) + return getattr(self, Format.attrib) + + def yaml_add_eol_comment( + self, comment: Any, key: Optional[Any] = NotNone, column: Optional[Any] = None, + ) -> None: + """ + there is a problem as eol comments should start with ' #' + (but at the beginning of the line the space doesn't have to be before + the #. The column index is for the # mark + """ + from .tokens import CommentToken + from .error import CommentMark + + if column is None: + try: + column = self._yaml_get_column(key) + except AttributeError: + column = 0 + if comment[0] != '#': + comment = '# ' + comment + if column is None: + if comment[0] == '#': + comment = ' ' + comment + column = 0 + start_mark = CommentMark(column) + ct = [CommentToken(comment, start_mark), None] + self._yaml_add_eol_comment(ct, key=key) + + @property + def lc(self) -> Any: + if not hasattr(self, LineCol.attrib): + setattr(self, LineCol.attrib, LineCol()) + return getattr(self, LineCol.attrib) + + def _yaml_set_line_col(self, line: Any, col: Any) -> None: + self.lc.line = line + self.lc.col = col + + def _yaml_set_kv_line_col(self, key: Any, data: Any) -> None: + self.lc.add_kv_line_col(key, data) + + def _yaml_set_idx_line_col(self, key: Any, data: Any) -> None: + self.lc.add_idx_line_col(key, data) + + @property + def anchor(self) -> Any: + if not hasattr(self, Anchor.attrib): + setattr(self, Anchor.attrib, Anchor()) + return getattr(self, Anchor.attrib) + + def yaml_anchor(self) -> Any: + if not hasattr(self, Anchor.attrib): + return None + return self.anchor + + def yaml_set_anchor(self, value: Any, always_dump: bool = False) -> None: + self.anchor.value = value + self.anchor.always_dump = always_dump + + @property + def tag(self) -> Any: + if not hasattr(self, Tag.attrib): + setattr(self, Tag.attrib, Tag()) + return getattr(self, Tag.attrib) + + def yaml_set_ctag(self, value: Tag) -> None: + setattr(self, Tag.attrib, value) + + def copy_attributes(self, t: Any, memo: Any = None) -> Any: + """ + copies the YAML related attributes, not e.g. .values + returns target + """ + # fmt: off + for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib, + Tag.attrib, merge_attrib]: + if hasattr(self, a): + if memo is not None: + setattr(t, a, copy.deepcopy(getattr(self, a, memo))) + else: + setattr(t, a, getattr(self, a)) + return t + # fmt: on + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + raise NotImplementedError + + def _yaml_get_pre_comment(self) -> Any: + raise NotImplementedError + + def _yaml_get_column(self, key: Any) -> Any: + raise NotImplementedError + + +class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore + __slots__ = (Comment.attrib, '_lst') + + def __init__(self, *args: Any, **kw: Any) -> None: + list.__init__(self, *args, **kw) + + def __getsingleitem__(self, idx: Any) -> Any: + return list.__getitem__(self, idx) + + def __setsingleitem__(self, idx: Any, value: Any) -> None: + # try to preserve the scalarstring type if setting an existing key to a new value + if idx < len(self): + if ( + isinstance(value, str) + and not isinstance(value, ScalarString) + and isinstance(self[idx], ScalarString) + ): + value = type(self[idx])(value) + list.__setitem__(self, idx, value) + + def __delsingleitem__(self, idx: Any = None) -> Any: + list.__delitem__(self, idx) + self.ca.items.pop(idx, None) # might not be there -> default value + for list_index in sorted(self.ca.items): + if list_index < idx: + continue + self.ca.items[list_index - 1] = self.ca.items.pop(list_index) + + def __len__(self) -> int: + return list.__len__(self) + + def insert(self, idx: Any, val: Any) -> None: + """the comments after the insertion have to move forward""" + list.insert(self, idx, val) + for list_index in sorted(self.ca.items, reverse=True): + if list_index < idx: + break + self.ca.items[list_index + 1] = self.ca.items.pop(list_index) + + def extend(self, val: Any) -> None: + list.extend(self, val) + + def __eq__(self, other: Any) -> bool: + return list.__eq__(self, other) + + def _yaml_add_comment(self, comment: Any, key: Optional[Any] = NotNone) -> None: + if key is not NotNone: + self.yaml_key_comment_extend(key, comment) + else: + self.ca.comment = comment + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + self._yaml_add_comment(comment, key=key) + + def _yaml_get_columnX(self, key: Any) -> Any: + return self.ca.items[key][0].start_mark.column + + def _yaml_get_column(self, key: Any) -> Any: + column = None + sel_idx = None + pre, post = key - 1, key + 1 + if pre in self.ca.items: + sel_idx = pre + elif post in self.ca.items: + sel_idx = post + else: + # self.ca.items is not ordered + for row_idx, _k1 in enumerate(self): + if row_idx >= key: + break + if row_idx not in self.ca.items: + continue + sel_idx = row_idx + if sel_idx is not None: + column = self._yaml_get_columnX(sel_idx) + return column + + def _yaml_get_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + pre_comments = self.ca.comment[1] + return pre_comments + + def _yaml_clear_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + self.ca.comment[1] = pre_comments + return pre_comments + + def __deepcopy__(self, memo: Any) -> Any: + res = self.__class__() + memo[id(self)] = res + for k in self: + res.append(copy.deepcopy(k, memo)) + self.copy_attributes(res, memo=memo) + return res + + def __add__(self, other: Any) -> Any: + return list.__add__(self, other) + + def sort(self, key: Any = None, reverse: bool = False) -> None: + if key is None: + tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse) + list.__init__(self, [x[0] for x in tmp_lst]) + else: + tmp_lst = sorted( + zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse, + ) + list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst]) + itm = self.ca.items + self.ca._items = {} + for idx, x in enumerate(tmp_lst): + old_index = x[1] + if old_index in itm: + self.ca.items[idx] = itm[old_index] + + def __repr__(self) -> Any: + return list.__repr__(self) + + +class CommentedKeySeq(tuple, CommentedBase): # type: ignore + """This primarily exists to be able to roundtrip keys that are sequences""" + + def _yaml_add_comment(self, comment: Any, key: Optional[Any] = NotNone) -> None: + if key is not NotNone: + self.yaml_key_comment_extend(key, comment) + else: + self.ca.comment = comment + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + self._yaml_add_comment(comment, key=key) + + def _yaml_get_columnX(self, key: Any) -> Any: + return self.ca.items[key][0].start_mark.column + + def _yaml_get_column(self, key: Any) -> Any: + column = None + sel_idx = None + pre, post = key - 1, key + 1 + if pre in self.ca.items: + sel_idx = pre + elif post in self.ca.items: + sel_idx = post + else: + # self.ca.items is not ordered + for row_idx, _k1 in enumerate(self): + if row_idx >= key: + break + if row_idx not in self.ca.items: + continue + sel_idx = row_idx + if sel_idx is not None: + column = self._yaml_get_columnX(sel_idx) + return column + + def _yaml_get_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + pre_comments = self.ca.comment[1] + return pre_comments + + def _yaml_clear_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + self.ca.comment[1] = pre_comments + return pre_comments + + +class CommentedMapView(Sized): + __slots__ = ('_mapping',) + + def __init__(self, mapping: Any) -> None: + self._mapping = mapping + + def __len__(self) -> int: + count = len(self._mapping) + return count + + +class CommentedMapKeysView(CommentedMapView, Set): # type: ignore + __slots__ = () + + @classmethod + def _from_iterable(self, it: Any) -> Any: + return set(it) + + def __contains__(self, key: Any) -> Any: + return key in self._mapping + + def __iter__(self) -> Any: + # yield from self._mapping # not in py27, pypy + # for x in self._mapping._keys(): + for x in self._mapping: + yield x + + +class CommentedMapItemsView(CommentedMapView, Set): # type: ignore + __slots__ = () + + @classmethod + def _from_iterable(self, it: Any) -> Any: + return set(it) + + def __contains__(self, item: Any) -> Any: + key, value = item + try: + v = self._mapping[key] + except KeyError: + return False + else: + return v == value + + def __iter__(self) -> Any: + for key in self._mapping._keys(): + yield (key, self._mapping[key]) + + +class CommentedMapValuesView(CommentedMapView): + __slots__ = () + + def __contains__(self, value: Any) -> Any: + for key in self._mapping: + if value == self._mapping[key]: + return True + return False + + def __iter__(self) -> Any: + for key in self._mapping._keys(): + yield self._mapping[key] + + +class CommentedMap(ordereddict, CommentedBase): + __slots__ = (Comment.attrib, '_ok', '_ref') + + def __init__(self, *args: Any, **kw: Any) -> None: + self._ok: MutableSet[Any] = set() # own keys + self._ref: List[CommentedMap] = [] + ordereddict.__init__(self, *args, **kw) + + def _yaml_add_comment( + self, comment: Any, key: Optional[Any] = NotNone, value: Optional[Any] = NotNone, + ) -> None: + """values is set to key to indicate a value attachment of comment""" + if key is not NotNone: + self.yaml_key_comment_extend(key, comment) + return + if value is not NotNone: + self.yaml_value_comment_extend(value, comment) + else: + self.ca.comment = comment + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + """add on the value line, with value specified by the key""" + self._yaml_add_comment(comment, value=key) + + def _yaml_get_columnX(self, key: Any) -> Any: + return self.ca.items[key][2].start_mark.column + + def _yaml_get_column(self, key: Any) -> Any: + column = None + sel_idx = None + pre, post, last = None, None, None + for x in self: + if pre is not None and x != key: + post = x + break + if x == key: + pre = last + last = x + if pre in self.ca.items: + sel_idx = pre + elif post in self.ca.items: + sel_idx = post + else: + # self.ca.items is not ordered + for k1 in self: + if k1 >= key: + break + if k1 not in self.ca.items: + continue + sel_idx = k1 + if sel_idx is not None: + column = self._yaml_get_columnX(sel_idx) + return column + + def _yaml_get_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + pre_comments = self.ca.comment[1] + return pre_comments + + def _yaml_clear_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + self.ca.comment[1] = pre_comments + return pre_comments + + def update(self, *vals: Any, **kw: Any) -> None: + try: + ordereddict.update(self, *vals, **kw) + except TypeError: + # probably a dict that is used + for x in vals[0]: + self[x] = vals[0][x] + if vals: + try: + self._ok.update(vals[0].keys()) # type: ignore + except AttributeError: + # assume one argument that is a list/tuple of two element lists/tuples + for x in vals[0]: + self._ok.add(x[0]) + if kw: + self._ok.update(*kw.keys()) # type: ignore + + def insert(self, pos: Any, key: Any, value: Any, comment: Optional[Any] = None) -> None: + """insert key value into given position, as defined by source YAML + attach comment if provided + """ + if key in self._ok: + del self[key] + keys = [k for k in self.keys() if k in self._ok] + try: + ma0 = getattr(self, merge_attrib, [[-1]])[0] + merge_pos = ma0[0] + except IndexError: + merge_pos = -1 + if merge_pos >= 0: + if merge_pos >= pos: + getattr(self, merge_attrib)[0] = (merge_pos + 1, ma0[1]) + idx_min = pos + idx_max = len(self._ok) + else: + idx_min = pos - 1 + idx_max = len(self._ok) + else: + idx_min = pos + idx_max = len(self._ok) + self[key] = value # at the end + # print(f'{idx_min=} {idx_max=}') + for idx in range(idx_min, idx_max): + self.move_to_end(keys[idx]) + self._ok.add(key) + # for referer in self._ref: + # for keytmp in keys: + # referer.update_key_value(keytmp) + if comment is not None: + self.yaml_add_eol_comment(comment, key=key) + + def mlget(self, key: Any, default: Any = None, list_ok: Any = False) -> Any: + """multi-level get that expects dicts within dicts""" + if not isinstance(key, list): + return self.get(key, default) + # assume that the key is a list of recursively accessible dicts + + def get_one_level(key_list: Any, level: Any, d: Any) -> Any: + if not list_ok: + assert isinstance(d, dict) + if level >= len(key_list): + if level > len(key_list): + raise IndexError + return d[key_list[level - 1]] + return get_one_level(key_list, level + 1, d[key_list[level - 1]]) + + try: + return get_one_level(key, 1, self) + except KeyError: + return default + except (TypeError, IndexError): + if not list_ok: + raise + return default + + def __getitem__(self, key: Any) -> Any: + try: + return ordereddict.__getitem__(self, key) + except KeyError: + for merged in getattr(self, merge_attrib, []): + if key in merged[1]: + return merged[1][key] + raise + + def __setitem__(self, key: Any, value: Any) -> None: + # try to preserve the scalarstring type if setting an existing key to a new value + if key in self: + if ( + isinstance(value, str) + and not isinstance(value, ScalarString) + and isinstance(self[key], ScalarString) + ): + value = type(self[key])(value) + ordereddict.__setitem__(self, key, value) + self._ok.add(key) + + def _unmerged_contains(self, key: Any) -> Any: + if key in self._ok: + return True + return None + + def __contains__(self, key: Any) -> bool: + return bool(ordereddict.__contains__(self, key)) + + def get(self, key: Any, default: Any = None) -> Any: + try: + return self.__getitem__(key) + except: # NOQA + return default + + def __repr__(self) -> Any: + res = '{' + sep = '' + for k, v in self.items(): + res += f'{sep}{k!r}: {v!r}' + if not sep: + sep = ', ' + res += '}' + return res + + def non_merged_items(self) -> Any: + for x in ordereddict.__iter__(self): + if x in self._ok: + yield x, ordereddict.__getitem__(self, x) + + def __delitem__(self, key: Any) -> None: + # for merged in getattr(self, merge_attrib, []): + # if key in merged[1]: + # value = merged[1][key] + # break + # else: + # # not found in merged in stuff + # ordereddict.__delitem__(self, key) + # for referer in self._ref: + # referer.update=_key_value(key) + # return + # + # ordereddict.__setitem__(self, key, value) # merge might have different value + # self._ok.discard(key) + self._ok.discard(key) + ordereddict.__delitem__(self, key) + for referer in self._ref: + referer.update_key_value(key) + + def __iter__(self) -> Any: + for x in ordereddict.__iter__(self): + yield x + + def pop(self, key: Any, default: Any = NotNone) -> Any: + try: + result = self[key] + except KeyError: + if default is NotNone: + raise + return default + del self[key] + return result + + def _keys(self) -> Any: + for x in ordereddict.__iter__(self): + yield x + + def __len__(self) -> int: + return int(ordereddict.__len__(self)) + + def __eq__(self, other: Any) -> bool: + return bool(dict(self) == other) + + def keys(self) -> Any: + return CommentedMapKeysView(self) + + def values(self) -> Any: + return CommentedMapValuesView(self) + + def _items(self) -> Any: + for x in ordereddict.__iter__(self): + yield x, ordereddict.__getitem__(self, x) + + def items(self) -> Any: + return CommentedMapItemsView(self) + + @property + def merge(self) -> Any: + if not hasattr(self, merge_attrib): + setattr(self, merge_attrib, []) + return getattr(self, merge_attrib) + + def copy(self) -> Any: + x = type(self)() # update doesn't work + for k, v in self._items(): + x[k] = v + self.copy_attributes(x) + return x + + def add_referent(self, cm: Any) -> None: + if cm not in self._ref: + self._ref.append(cm) + + def add_yaml_merge(self, value: Any) -> None: + for v in value: + v[1].add_referent(self) + for k1, v1 in v[1].items(): + if ordereddict.__contains__(self, k1): + continue + ordereddict.__setitem__(self, k1, v1) + self.merge.extend(value) + + def update_key_value(self, key: Any) -> None: + if key in self._ok: + return + for v in self.merge: + if key in v[1]: + ordereddict.__setitem__(self, key, v[1][key]) + return + ordereddict.__delitem__(self, key) + + def __deepcopy__(self, memo: Any) -> Any: + res = self.__class__() + memo[id(self)] = res + for k in self: + res[k] = copy.deepcopy(self[k], memo) + self.copy_attributes(res, memo=memo) + return res + + +# based on brownie mappings +@classmethod # type: ignore +def raise_immutable(cls: Any, *args: Any, **kwargs: Any) -> None: + raise TypeError(f'{cls.__name__} objects are immutable') + + +class CommentedKeyMap(CommentedBase, Mapping): # type: ignore + __slots__ = Comment.attrib, '_od' + """This primarily exists to be able to roundtrip keys that are mappings""" + + def __init__(self, *args: Any, **kw: Any) -> None: + if hasattr(self, '_od'): + raise_immutable(self) + try: + self._od = ordereddict(*args, **kw) + except TypeError: + raise + + __delitem__ = __setitem__ = clear = pop = popitem = setdefault = update = raise_immutable + + # need to implement __getitem__, __iter__ and __len__ + def __getitem__(self, index: Any) -> Any: + return self._od[index] + + def __iter__(self) -> Iterator[Any]: + for x in self._od.__iter__(): + yield x + + def __len__(self) -> int: + return len(self._od) + + def __hash__(self) -> Any: + return hash(tuple(self.items())) + + def __repr__(self) -> Any: + if not hasattr(self, merge_attrib): + return self._od.__repr__() + return 'ordereddict(' + repr(list(self._od.items())) + ')' + + @classmethod + def fromkeys(keys: Any, v: Any = None) -> Any: + return CommentedKeyMap(dict.fromkeys(keys, v)) + + def _yaml_add_comment(self, comment: Any, key: Optional[Any] = NotNone) -> None: + if key is not NotNone: + self.yaml_key_comment_extend(key, comment) + else: + self.ca.comment = comment + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + self._yaml_add_comment(comment, key=key) + + def _yaml_get_columnX(self, key: Any) -> Any: + return self.ca.items[key][0].start_mark.column + + def _yaml_get_column(self, key: Any) -> Any: + column = None + sel_idx = None + pre, post = key - 1, key + 1 + if pre in self.ca.items: + sel_idx = pre + elif post in self.ca.items: + sel_idx = post + else: + # self.ca.items is not ordered + for row_idx, _k1 in enumerate(self): + if row_idx >= key: + break + if row_idx not in self.ca.items: + continue + sel_idx = row_idx + if sel_idx is not None: + column = self._yaml_get_columnX(sel_idx) + return column + + def _yaml_get_pre_comment(self) -> Any: + pre_comments: List[Any] = [] + if self.ca.comment is None: + self.ca.comment = [None, pre_comments] + else: + self.ca.comment[1] = pre_comments + return pre_comments + + +class CommentedOrderedMap(CommentedMap): + __slots__ = (Comment.attrib,) + + +class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA + __slots__ = Comment.attrib, 'odict' + + def __init__(self, values: Any = None) -> None: + self.odict = ordereddict() + MutableSet.__init__(self) + if values is not None: + self |= values + + def _yaml_add_comment( + self, comment: Any, key: Optional[Any] = NotNone, value: Optional[Any] = NotNone, + ) -> None: + """values is set to key to indicate a value attachment of comment""" + if key is not NotNone: + self.yaml_key_comment_extend(key, comment) + return + if value is not NotNone: + self.yaml_value_comment_extend(value, comment) + else: + self.ca.comment = comment + + def _yaml_add_eol_comment(self, comment: Any, key: Any) -> None: + """add on the value line, with value specified by the key""" + self._yaml_add_comment(comment, value=key) + + def add(self, value: Any) -> None: + """Add an element.""" + self.odict[value] = None + + def discard(self, value: Any) -> None: + """Remove an element. Do not raise an exception if absent.""" + del self.odict[value] + + def __contains__(self, x: Any) -> Any: + return x in self.odict + + def __iter__(self) -> Any: + for x in self.odict: + yield x + + def __len__(self) -> int: + return len(self.odict) + + def __repr__(self) -> str: + return f'set({self.odict.keys()!r})' + + +class TaggedScalar(CommentedBase): + # the value and style attributes are set during roundtrip construction + def __init__(self, value: Any = None, style: Any = None, tag: Any = None) -> None: + self.value = value + self.style = style + if tag is not None: + if isinstance(tag, str): + tag = Tag(suffix=tag) + self.yaml_set_ctag(tag) + + def __str__(self) -> Any: + return self.value + + def count(self, s: str, start: Optional[int] = None, end: Optional[int] = None) -> Any: + return self.value.count(s, start, end) + + def __getitem__(self, pos: int) -> Any: + return self.value[pos] + + def __repr__(self) -> str: + return f'TaggedScalar(value={self.value!r}, style={self.style!r}, tag={self.tag!r})' + + +def dump_comments(d: Any, name: str = "", sep: str = '.', out: Any = sys.stdout) -> None: + """ + recursively dump comments, all but the toplevel preceded by the path + in dotted form x.0.a + """ + if isinstance(d, dict) and hasattr(d, 'ca'): + if name: + out.write(f'{name} {type(d)}\n') + out.write(f'{d.ca!r}\n') + for k in d: + dump_comments(d[k], name=(name + sep + str(k)) if name else k, sep=sep, out=out) + elif isinstance(d, list) and hasattr(d, 'ca'): + if name: + out.write(f'{name} {type(d)}\n') + out.write(f'{d.ca!r}\n') + for idx, k in enumerate(d): + dump_comments( + k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out, + ) |