summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/serializer.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/ruyaml/serializer.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/lib/ruyaml/serializer.py b/lib/ruyaml/serializer.py
new file mode 100644
index 0000000..c92649d
--- /dev/null
+++ b/lib/ruyaml/serializer.py
@@ -0,0 +1,251 @@
+# coding: utf-8
+
+from ruyaml.compat import DBG_NODE, dbg, nprint, nprintf # NOQA
+from ruyaml.error import YAMLError
+from ruyaml.events import (
+ AliasEvent,
+ DocumentEndEvent,
+ DocumentStartEvent,
+ MappingEndEvent,
+ MappingStartEvent,
+ ScalarEvent,
+ SequenceEndEvent,
+ SequenceStartEvent,
+ StreamEndEvent,
+ StreamStartEvent,
+)
+from ruyaml.nodes import MappingNode, ScalarNode, SequenceNode
+from ruyaml.util import RegExp
+
+if False: # MYPY
+ from typing import Any, Dict, Optional, Text, Union # NOQA
+
+ from ruyaml.compat import VersionType # NOQA
+
+__all__ = ['Serializer', 'SerializerError']
+
+
+class SerializerError(YAMLError):
+ pass
+
+
+class Serializer:
+
+ # 'id' and 3+ numbers, but not 000
+ ANCHOR_TEMPLATE = 'id%03d'
+ ANCHOR_RE = RegExp('id(?!000$)\\d{3,}')
+
+ def __init__(
+ self,
+ encoding=None,
+ explicit_start=None,
+ explicit_end=None,
+ version=None,
+ tags=None,
+ dumper=None,
+ ):
+ # type: (Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any) -> None # NOQA
+ self.dumper = dumper
+ if self.dumper is not None:
+ self.dumper._serializer = self
+ self.use_encoding = encoding
+ self.use_explicit_start = explicit_start
+ self.use_explicit_end = explicit_end
+ if isinstance(version, str):
+ self.use_version = tuple(map(int, version.split('.')))
+ else:
+ self.use_version = version # type: ignore
+ self.use_tags = tags
+ self.serialized_nodes = {} # type: Dict[Any, Any]
+ self.anchors = {} # type: Dict[Any, Any]
+ self.last_anchor_id = 0
+ self.closed = None # type: Optional[bool]
+ self._templated_id = None
+
+ @property
+ def emitter(self):
+ # type: () -> Any
+ if hasattr(self.dumper, 'typ'):
+ return self.dumper.emitter # type: ignore
+ return self.dumper._emitter # type: ignore
+
+ @property
+ def resolver(self):
+ # type: () -> Any
+ if hasattr(self.dumper, 'typ'):
+ self.dumper.resolver # type: ignore
+ return self.dumper._resolver # type: ignore
+
+ def open(self):
+ # type: () -> None
+ if self.closed is None:
+ self.emitter.emit(StreamStartEvent(encoding=self.use_encoding))
+ self.closed = False
+ elif self.closed:
+ raise SerializerError('serializer is closed')
+ else:
+ raise SerializerError('serializer is already opened')
+
+ def close(self):
+ # type: () -> None
+ if self.closed is None:
+ raise SerializerError('serializer is not opened')
+ elif not self.closed:
+ self.emitter.emit(StreamEndEvent())
+ self.closed = True
+
+ # def __del__(self):
+ # self.close()
+
+ def serialize(self, node):
+ # type: (Any) -> None
+ if dbg(DBG_NODE):
+ nprint('Serializing nodes')
+ node.dump()
+ if self.closed is None:
+ raise SerializerError('serializer is not opened')
+ elif self.closed:
+ raise SerializerError('serializer is closed')
+ self.emitter.emit(
+ DocumentStartEvent(
+ explicit=self.use_explicit_start,
+ version=self.use_version,
+ tags=self.use_tags,
+ )
+ )
+ self.anchor_node(node)
+ self.serialize_node(node, None, None)
+ self.emitter.emit(DocumentEndEvent(explicit=self.use_explicit_end))
+ self.serialized_nodes = {}
+ self.anchors = {}
+ self.last_anchor_id = 0
+
+ def anchor_node(self, node):
+ # type: (Any) -> None
+ if node in self.anchors:
+ if self.anchors[node] is None:
+ self.anchors[node] = self.generate_anchor(node)
+ else:
+ anchor = None
+ try:
+ if node.anchor.always_dump:
+ anchor = node.anchor.value
+ except: # NOQA
+ pass
+ self.anchors[node] = anchor
+ if isinstance(node, SequenceNode):
+ for item in node.value:
+ self.anchor_node(item)
+ elif isinstance(node, MappingNode):
+ for key, value in node.value:
+ self.anchor_node(key)
+ self.anchor_node(value)
+
+ def generate_anchor(self, node):
+ # type: (Any) -> Any
+ try:
+ anchor = node.anchor.value
+ except: # NOQA
+ anchor = None
+ if anchor is None:
+ self.last_anchor_id += 1
+ return self.ANCHOR_TEMPLATE % self.last_anchor_id
+ return anchor
+
+ def serialize_node(self, node, parent, index):
+ # type: (Any, Any, Any) -> None
+ alias = self.anchors[node]
+ if node in self.serialized_nodes:
+ node_style = getattr(node, 'style', None)
+ if node_style != '?':
+ node_style = None
+ self.emitter.emit(AliasEvent(alias, style=node_style))
+ else:
+ self.serialized_nodes[node] = True
+ self.resolver.descend_resolver(parent, index)
+ if isinstance(node, ScalarNode):
+ # here check if the node.tag equals the one that would result from parsing
+ # if not equal quoting is necessary for strings
+ detected_tag = self.resolver.resolve(
+ ScalarNode, node.value, (True, False)
+ )
+ default_tag = self.resolver.resolve(
+ ScalarNode, node.value, (False, True)
+ )
+ implicit = (
+ (node.tag == detected_tag),
+ (node.tag == default_tag),
+ node.tag.startswith('tag:yaml.org,2002:'),
+ )
+ self.emitter.emit(
+ ScalarEvent(
+ alias,
+ node.tag,
+ implicit,
+ node.value,
+ style=node.style,
+ comment=node.comment,
+ )
+ )
+ elif isinstance(node, SequenceNode):
+ implicit = node.tag == self.resolver.resolve(
+ SequenceNode, node.value, True
+ )
+ comment = node.comment
+ end_comment = None
+ seq_comment = None
+ if node.flow_style is True:
+ if comment: # eol comment on flow style sequence
+ seq_comment = comment[0]
+ # comment[0] = None
+ if comment and len(comment) > 2:
+ end_comment = comment[2]
+ else:
+ end_comment = None
+ self.emitter.emit(
+ SequenceStartEvent(
+ alias,
+ node.tag,
+ implicit,
+ flow_style=node.flow_style,
+ comment=node.comment,
+ )
+ )
+ index = 0
+ for item in node.value:
+ self.serialize_node(item, node, index)
+ index += 1
+ self.emitter.emit(SequenceEndEvent(comment=[seq_comment, end_comment]))
+ elif isinstance(node, MappingNode):
+ implicit = node.tag == self.resolver.resolve(
+ MappingNode, node.value, True
+ )
+ comment = node.comment
+ end_comment = None
+ map_comment = None
+ if node.flow_style is True:
+ if comment: # eol comment on flow style sequence
+ map_comment = comment[0]
+ # comment[0] = None
+ if comment and len(comment) > 2:
+ end_comment = comment[2]
+ self.emitter.emit(
+ MappingStartEvent(
+ alias,
+ node.tag,
+ implicit,
+ flow_style=node.flow_style,
+ comment=node.comment,
+ nr_items=len(node.value),
+ )
+ )
+ for key, value in node.value:
+ self.serialize_node(key, node, None)
+ self.serialize_node(value, node, key)
+ self.emitter.emit(MappingEndEvent(comment=[map_comment, end_comment]))
+ self.resolver.ascend_resolver()
+
+
+def templated_id(s):
+ # type: (Text) -> Any
+ return Serializer.ANCHOR_RE.match(s)