summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/composer.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ruyaml/composer.py')
-rw-r--r--lib/ruyaml/composer.py242
1 files changed, 242 insertions, 0 deletions
diff --git a/lib/ruyaml/composer.py b/lib/ruyaml/composer.py
new file mode 100644
index 0000000..336d142
--- /dev/null
+++ b/lib/ruyaml/composer.py
@@ -0,0 +1,242 @@
+# coding: utf-8
+
+import warnings
+from typing import Any, Dict
+
+from ruyaml.compat import _F, nprint, nprintf # NOQA
+from ruyaml.error import MarkedYAMLError, ReusedAnchorWarning
+from ruyaml.events import (
+ AliasEvent,
+ MappingEndEvent,
+ MappingStartEvent,
+ ScalarEvent,
+ SequenceEndEvent,
+ SequenceStartEvent,
+ StreamEndEvent,
+ StreamStartEvent,
+)
+from ruyaml.nodes import MappingNode, ScalarNode, SequenceNode
+
+__all__ = ['Composer', 'ComposerError']
+
+
+class ComposerError(MarkedYAMLError):
+ pass
+
+
+class Composer:
+ def __init__(self, loader=None):
+ # type: (Any) -> None
+ self.loader = loader
+ if self.loader is not None and getattr(self.loader, '_composer', None) is None:
+ self.loader._composer = self
+ self.anchors = {} # type: Dict[Any, Any]
+
+ @property
+ def parser(self):
+ # type: () -> Any
+ if hasattr(self.loader, 'typ'):
+ self.loader.parser # type: ignore
+ return self.loader._parser # type: ignore
+
+ @property
+ def resolver(self):
+ # type: () -> Any
+ # assert self.loader._resolver is not None
+ if hasattr(self.loader, 'typ'):
+ self.loader.resolver # type: ignore
+ return self.loader._resolver # type: ignore
+
+ def check_node(self):
+ # type: () -> Any
+ # Drop the STREAM-START event.
+ if self.parser.check_event(StreamStartEvent):
+ self.parser.get_event()
+
+ # If there are more documents available?
+ return not self.parser.check_event(StreamEndEvent)
+
+ def get_node(self):
+ # type: () -> Any
+ # Get the root node of the next document.
+ if not self.parser.check_event(StreamEndEvent):
+ return self.compose_document()
+
+ def get_single_node(self):
+ # type: () -> Any
+ # Drop the STREAM-START event.
+ self.parser.get_event()
+
+ # Compose a document if the stream is not empty.
+ document = None # type: Any
+ if not self.parser.check_event(StreamEndEvent):
+ document = self.compose_document()
+
+ # Ensure that the stream contains no more documents.
+ if not self.parser.check_event(StreamEndEvent):
+ event = self.parser.get_event()
+ raise ComposerError(
+ 'expected a single document in the stream',
+ document.start_mark,
+ 'but found another document',
+ event.start_mark,
+ )
+
+ # Drop the STREAM-END event.
+ self.parser.get_event()
+
+ return document
+
+ def compose_document(self):
+ # type: (Any) -> Any
+ # Drop the DOCUMENT-START event.
+ self.parser.get_event()
+
+ # Compose the root node.
+ node = self.compose_node(None, None)
+
+ # Drop the DOCUMENT-END event.
+ self.parser.get_event()
+
+ self.anchors = {}
+ return node
+
+ def return_alias(self, a):
+ # type: (Any) -> Any
+ return a
+
+ def compose_node(self, parent, index):
+ # type: (Any, Any) -> Any
+ if self.parser.check_event(AliasEvent):
+ event = self.parser.get_event()
+ alias = event.anchor
+ if alias not in self.anchors:
+ raise ComposerError(
+ None,
+ None,
+ _F('found undefined alias {alias!r}', alias=alias),
+ event.start_mark,
+ )
+ return self.return_alias(self.anchors[alias])
+ event = self.parser.peek_event()
+ anchor = event.anchor
+ if anchor is not None: # have an anchor
+ if anchor in self.anchors:
+ # raise ComposerError(
+ # "found duplicate anchor %r; first occurrence"
+ # % (anchor), self.anchors[anchor].start_mark,
+ # "second occurrence", event.start_mark)
+ ws = (
+ '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence '
+ '{}'.format(
+ (anchor), self.anchors[anchor].start_mark, event.start_mark
+ )
+ )
+ warnings.warn(ws, ReusedAnchorWarning)
+ self.resolver.descend_resolver(parent, index)
+ if self.parser.check_event(ScalarEvent):
+ node = self.compose_scalar_node(anchor)
+ elif self.parser.check_event(SequenceStartEvent):
+ node = self.compose_sequence_node(anchor)
+ elif self.parser.check_event(MappingStartEvent):
+ node = self.compose_mapping_node(anchor)
+ self.resolver.ascend_resolver()
+ return node
+
+ def compose_scalar_node(self, anchor):
+ # type: (Any) -> Any
+ event = self.parser.get_event()
+ tag = event.tag
+ if tag is None or tag == '!':
+ tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
+ node = ScalarNode(
+ tag,
+ event.value,
+ event.start_mark,
+ event.end_mark,
+ style=event.style,
+ comment=event.comment,
+ anchor=anchor,
+ )
+ if anchor is not None:
+ self.anchors[anchor] = node
+ return node
+
+ def compose_sequence_node(self, anchor):
+ # type: (Any) -> Any
+ start_event = self.parser.get_event()
+ tag = start_event.tag
+ if tag is None or tag == '!':
+ tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
+ node = SequenceNode(
+ tag,
+ [],
+ start_event.start_mark,
+ None,
+ flow_style=start_event.flow_style,
+ comment=start_event.comment,
+ anchor=anchor,
+ )
+ if anchor is not None:
+ self.anchors[anchor] = node
+ index = 0
+ while not self.parser.check_event(SequenceEndEvent):
+ node.value.append(self.compose_node(node, index))
+ index += 1
+ end_event = self.parser.get_event()
+ if node.flow_style is True and end_event.comment is not None:
+ if node.comment is not None:
+ nprint(
+ 'Warning: unexpected end_event commment in sequence '
+ 'node {}'.format(node.flow_style)
+ )
+ node.comment = end_event.comment
+ node.end_mark = end_event.end_mark
+ self.check_end_doc_comment(end_event, node)
+ return node
+
+ def compose_mapping_node(self, anchor):
+ # type: (Any) -> Any
+ start_event = self.parser.get_event()
+ tag = start_event.tag
+ if tag is None or tag == '!':
+ tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
+ node = MappingNode(
+ tag,
+ [],
+ start_event.start_mark,
+ None,
+ flow_style=start_event.flow_style,
+ comment=start_event.comment,
+ anchor=anchor,
+ )
+ if anchor is not None:
+ self.anchors[anchor] = node
+ while not self.parser.check_event(MappingEndEvent):
+ # key_event = self.parser.peek_event()
+ item_key = self.compose_node(node, None)
+ # if item_key in node.value:
+ # raise ComposerError("while composing a mapping",
+ # start_event.start_mark,
+ # "found duplicate key", key_event.start_mark)
+ item_value = self.compose_node(node, item_key)
+ # node.value[item_key] = item_value
+ node.value.append((item_key, item_value))
+ end_event = self.parser.get_event()
+ if node.flow_style is True and end_event.comment is not None:
+ node.comment = end_event.comment
+ node.end_mark = end_event.end_mark
+ self.check_end_doc_comment(end_event, node)
+ return node
+
+ def check_end_doc_comment(self, end_event, node):
+ # type: (Any, Any) -> None
+ if end_event.comment and end_event.comment[1]:
+ # pre comments on an end_event, no following to move to
+ if node.comment is None:
+ node.comment = [None, None]
+ assert not isinstance(node, ScalarEvent)
+ # this is a post comment on a mapping node, add as third element
+ # in the list
+ node.comment.append(end_event.comment[1])
+ end_event.comment[1] = None