diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 00:30:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 00:30:09 +0000 |
commit | 4dd6e896edac1096089a9cd54296266436ffec03 (patch) | |
tree | a3da8663c704704177ad944ae5a680c302f29f55 /serializer.py | |
parent | Initial commit. (diff) | |
download | ruamel.yaml-4dd6e896edac1096089a9cd54296266436ffec03.tar.xz ruamel.yaml-4dd6e896edac1096089a9cd54296266436ffec03.zip |
Adding upstream version 0.18.5.upstream/0.18.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | serializer.py | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/serializer.py b/serializer.py new file mode 100644 index 0000000..1ac46d2 --- /dev/null +++ b/serializer.py @@ -0,0 +1,231 @@ +# coding: utf-8 + +from ruamel.yaml.error import YAMLError +from ruamel.yaml.compat import nprint, DBG_NODE, dbg, nprintf # NOQA +from ruamel.yaml.util import RegExp + +from ruamel.yaml.events import ( + StreamStartEvent, + StreamEndEvent, + MappingStartEvent, + MappingEndEvent, + SequenceStartEvent, + SequenceEndEvent, + AliasEvent, + ScalarEvent, + DocumentStartEvent, + DocumentEndEvent, +) +from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode + +from typing import Any, Dict, Union, Text, Optional # NOQA +from ruamel.yaml.compat import VersionType # NOQA + +__all__ = ['Serializer', 'SerializerError'] + + +class SerializerError(YAMLError): + pass + + +class Serializer: + + # 'id' and 3+ numbers, but not 000 + ANCHOR_TEMPLATE = 'id{:03d}' + ANCHOR_RE = RegExp('id(?!000$)\\d{3,}') + + def __init__( + self, + encoding: Any = None, + explicit_start: Optional[bool] = None, + explicit_end: Optional[bool] = None, + version: Optional[VersionType] = None, + tags: Any = None, + dumper: Any = None, + ) -> None: + # NOQA + self.dumper = dumper + if self.dumper is not None: + self.dumper._serializer = self + self.use_encoding = encoding + self.use_explicit_start = explicit_start + self.use_explicit_end = explicit_end + if isinstance(version, str): + self.use_version = tuple(map(int, version.split('.'))) + else: + self.use_version = version # type: ignore + self.use_tags = tags + self.serialized_nodes: Dict[Any, Any] = {} + self.anchors: Dict[Any, Any] = {} + self.last_anchor_id = 0 + self.closed: Optional[bool] = None + self._templated_id = None + + @property + def emitter(self) -> Any: + if hasattr(self.dumper, 'typ'): + return self.dumper.emitter + return self.dumper._emitter + + @property + def resolver(self) -> Any: + if hasattr(self.dumper, 'typ'): + self.dumper.resolver + return self.dumper._resolver + + def open(self) -> None: + if self.closed is None: + self.emitter.emit(StreamStartEvent(encoding=self.use_encoding)) + self.closed = False + elif self.closed: + raise SerializerError('serializer is closed') + else: + raise SerializerError('serializer is already opened') + + def close(self) -> None: + if self.closed is None: + raise SerializerError('serializer is not opened') + elif not self.closed: + self.emitter.emit(StreamEndEvent()) + self.closed = True + + # def __del__(self): + # self.close() + + def serialize(self, node: Any) -> None: + if dbg(DBG_NODE): + nprint('Serializing nodes') + node.dump() + if self.closed is None: + raise SerializerError('serializer is not opened') + elif self.closed: + raise SerializerError('serializer is closed') + self.emitter.emit( + DocumentStartEvent( + explicit=self.use_explicit_start, version=self.use_version, tags=self.use_tags, + ), + ) + self.anchor_node(node) + self.serialize_node(node, None, None) + self.emitter.emit(DocumentEndEvent(explicit=self.use_explicit_end)) + self.serialized_nodes = {} + self.anchors = {} + self.last_anchor_id = 0 + + def anchor_node(self, node: Any) -> None: + if node in self.anchors: + if self.anchors[node] is None: + self.anchors[node] = self.generate_anchor(node) + else: + anchor = None + try: + if node.anchor.always_dump: + anchor = node.anchor.value + except: # NOQA + pass + self.anchors[node] = anchor + if isinstance(node, SequenceNode): + for item in node.value: + self.anchor_node(item) + elif isinstance(node, MappingNode): + for key, value in node.value: + self.anchor_node(key) + self.anchor_node(value) + + def generate_anchor(self, node: Any) -> Any: + try: + anchor = node.anchor.value + except: # NOQA + anchor = None + if anchor is None: + self.last_anchor_id += 1 + return self.ANCHOR_TEMPLATE.format(self.last_anchor_id) + return anchor + + def serialize_node(self, node: Any, parent: Any, index: Any) -> None: + alias = self.anchors[node] + if node in self.serialized_nodes: + node_style = getattr(node, 'style', None) + if node_style != '?': + node_style = None + self.emitter.emit(AliasEvent(alias, style=node_style)) + else: + self.serialized_nodes[node] = True + self.resolver.descend_resolver(parent, index) + if isinstance(node, ScalarNode): + # here check if the node.tag equals the one that would result from parsing + # if not equal quoting is necessary for strings + detected_tag = self.resolver.resolve(ScalarNode, node.value, (True, False)) + default_tag = self.resolver.resolve(ScalarNode, node.value, (False, True)) + implicit = ( + (node.ctag == detected_tag), + (node.ctag == default_tag), + node.tag.startswith('tag:yaml.org,2002:'), # type: ignore + ) + self.emitter.emit( + ScalarEvent( + alias, + node.ctag, + implicit, + node.value, + style=node.style, + comment=node.comment, + ), + ) + elif isinstance(node, SequenceNode): + implicit = node.ctag == self.resolver.resolve(SequenceNode, node.value, True) + comment = node.comment + end_comment = None + seq_comment = None + if node.flow_style is True: + if comment: # eol comment on flow style sequence + seq_comment = comment[0] + # comment[0] = None + if comment and len(comment) > 2: + end_comment = comment[2] + else: + end_comment = None + self.emitter.emit( + SequenceStartEvent( + alias, + node.ctag, + implicit, + flow_style=node.flow_style, + comment=node.comment, + ), + ) + index = 0 + for item in node.value: + self.serialize_node(item, node, index) + index += 1 + self.emitter.emit(SequenceEndEvent(comment=[seq_comment, end_comment])) + elif isinstance(node, MappingNode): + implicit = node.ctag == self.resolver.resolve(MappingNode, node.value, True) + comment = node.comment + end_comment = None + map_comment = None + if node.flow_style is True: + if comment: # eol comment on flow style sequence + map_comment = comment[0] + # comment[0] = None + if comment and len(comment) > 2: + end_comment = comment[2] + self.emitter.emit( + MappingStartEvent( + alias, + node.ctag, + implicit, + flow_style=node.flow_style, + comment=node.comment, + nr_items=len(node.value), + ), + ) + for key, value in node.value: + self.serialize_node(key, node, None) + self.serialize_node(value, node, key) + self.emitter.emit(MappingEndEvent(comment=[map_comment, end_comment])) + self.resolver.ascend_resolver() + + +def templated_id(s: Text) -> Any: + return Serializer.ANCHOR_RE.match(s) |