summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ruyaml/parser.py')
-rw-r--r--lib/ruyaml/parser.py938
1 files changed, 938 insertions, 0 deletions
diff --git a/lib/ruyaml/parser.py b/lib/ruyaml/parser.py
new file mode 100644
index 0000000..f17331b
--- /dev/null
+++ b/lib/ruyaml/parser.py
@@ -0,0 +1,938 @@
+# coding: utf-8
+
+# The following YAML grammar is LL(1) and is parsed by a recursive descent
+# parser.
+#
+# stream ::= STREAM-START implicit_document? explicit_document*
+# STREAM-END
+# implicit_document ::= block_node DOCUMENT-END*
+# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
+# block_node_or_indentless_sequence ::=
+# ALIAS
+# | properties (block_content |
+# indentless_block_sequence)?
+# | block_content
+# | indentless_block_sequence
+# block_node ::= ALIAS
+# | properties block_content?
+# | block_content
+# flow_node ::= ALIAS
+# | properties flow_content?
+# | flow_content
+# properties ::= TAG ANCHOR? | ANCHOR TAG?
+# block_content ::= block_collection | flow_collection | SCALAR
+# flow_content ::= flow_collection | SCALAR
+# block_collection ::= block_sequence | block_mapping
+# flow_collection ::= flow_sequence | flow_mapping
+# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
+# BLOCK-END
+# indentless_sequence ::= (BLOCK-ENTRY block_node?)+
+# block_mapping ::= BLOCK-MAPPING_START
+# ((KEY block_node_or_indentless_sequence?)?
+# (VALUE block_node_or_indentless_sequence?)?)*
+# BLOCK-END
+# flow_sequence ::= FLOW-SEQUENCE-START
+# (flow_sequence_entry FLOW-ENTRY)*
+# flow_sequence_entry?
+# FLOW-SEQUENCE-END
+# flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
+# flow_mapping ::= FLOW-MAPPING-START
+# (flow_mapping_entry FLOW-ENTRY)*
+# flow_mapping_entry?
+# FLOW-MAPPING-END
+# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
+#
+# FIRST sets:
+#
+# stream: { STREAM-START <}
+# explicit_document: { DIRECTIVE DOCUMENT-START }
+# implicit_document: FIRST(block_node)
+# block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START
+# BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START }
+# flow_node: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START }
+# block_content: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START
+# FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR }
+# flow_content: { FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR }
+# block_collection: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START }
+# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START }
+# block_sequence: { BLOCK-SEQUENCE-START }
+# block_mapping: { BLOCK-MAPPING-START }
+# block_node_or_indentless_sequence: { ALIAS ANCHOR TAG SCALAR
+# BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START
+# FLOW-MAPPING-START BLOCK-ENTRY }
+# indentless_sequence: { ENTRY }
+# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START }
+# flow_sequence: { FLOW-SEQUENCE-START }
+# flow_mapping: { FLOW-MAPPING-START }
+# flow_sequence_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START
+# FLOW-MAPPING-START KEY }
+# flow_mapping_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START
+# FLOW-MAPPING-START KEY }
+
+# need to have full path with import, as pkg_resources tries to load parser.py in __init__.py
+# only to not do anything with the package afterwards
+# and for Jython too
+
+
+from ruyaml.comments import C_POST, C_PRE, C_SPLIT_ON_FIRST_BLANK
+from ruyaml.compat import _F, nprint, nprintf # NOQA
+from ruyaml.error import MarkedYAMLError
+from ruyaml.events import * # NOQA
+from ruyaml.scanner import ( # NOQA
+ BlankLineComment,
+ RoundTripScanner,
+ Scanner,
+ ScannerError,
+)
+from ruyaml.tokens import * # NOQA
+
+if False: # MYPY
+ from typing import Any, Dict, List, Optional # NOQA
+
+__all__ = ['Parser', 'RoundTripParser', 'ParserError']
+
+
+def xprintf(*args, **kw):
+ # type: (Any, Any) -> Any
+ return nprintf(*args, **kw)
+ pass
+
+
+class ParserError(MarkedYAMLError):
+ pass
+
+
+class Parser:
+ # Since writing a recursive-descendant parser is a straightforward task, we
+ # do not give many comments here.
+
+ DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'}
+
+ def __init__(self, loader):
+ # type: (Any) -> None
+ self.loader = loader
+ if self.loader is not None and getattr(self.loader, '_parser', None) is None:
+ self.loader._parser = self
+ self.reset_parser()
+
+ def reset_parser(self):
+ # type: () -> None
+ # Reset the state attributes (to clear self-references)
+ self.current_event = self.last_event = None
+ self.tag_handles = {} # type: Dict[Any, Any]
+ self.states = [] # type: List[Any]
+ self.marks = [] # type: List[Any]
+ self.state = self.parse_stream_start # type: Any
+
+ def dispose(self):
+ # type: () -> None
+ self.reset_parser()
+
+ @property
+ def scanner(self):
+ # type: () -> Any
+ if hasattr(self.loader, 'typ'):
+ return self.loader.scanner
+ return self.loader._scanner
+
+ @property
+ def resolver(self):
+ # type: () -> Any
+ if hasattr(self.loader, 'typ'):
+ return self.loader.resolver
+ return self.loader._resolver
+
+ def check_event(self, *choices):
+ # type: (Any) -> bool
+ # Check the type of the next event.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ if self.current_event is not None:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.current_event, choice):
+ return True
+ return False
+
+ def peek_event(self):
+ # type: () -> Any
+ # Get the next event.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ return self.current_event
+
+ def get_event(self):
+ # type: () -> Any
+ # Get the next event and proceed further.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ # assert self.current_event is not None
+ # if self.current_event.end_mark.line != self.peek_event().start_mark.line:
+ xprintf(
+ 'get_event', repr(self.current_event), self.peek_event().start_mark.line
+ )
+ self.last_event = value = self.current_event
+ self.current_event = None
+ return value
+
+ # stream ::= STREAM-START implicit_document? explicit_document*
+ # STREAM-END
+ # implicit_document ::= block_node DOCUMENT-END*
+ # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
+
+ def parse_stream_start(self):
+ # type: () -> Any
+ # Parse the stream start.
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ event = StreamStartEvent(
+ token.start_mark, token.end_mark, encoding=token.encoding
+ )
+
+ # Prepare the next state.
+ self.state = self.parse_implicit_document_start
+
+ return event
+
+ def parse_implicit_document_start(self):
+ # type: () -> Any
+ # Parse an implicit document.
+ if not self.scanner.check_token(
+ DirectiveToken, DocumentStartToken, StreamEndToken
+ ):
+ self.tag_handles = self.DEFAULT_TAGS
+ token = self.scanner.peek_token()
+ start_mark = end_mark = token.start_mark
+ event = DocumentStartEvent(start_mark, end_mark, explicit=False)
+
+ # Prepare the next state.
+ self.states.append(self.parse_document_end)
+ self.state = self.parse_block_node
+
+ return event
+
+ else:
+ return self.parse_document_start()
+
+ def parse_document_start(self):
+ # type: () -> Any
+ # Parse any extra document end indicators.
+ while self.scanner.check_token(DocumentEndToken):
+ self.scanner.get_token()
+ # Parse an explicit document.
+ if not self.scanner.check_token(StreamEndToken):
+ version, tags = self.process_directives()
+ if not self.scanner.check_token(DocumentStartToken):
+ raise ParserError(
+ None,
+ None,
+ _F(
+ "expected '<document start>', but found {pt!r}",
+ pt=self.scanner.peek_token().id,
+ ),
+ self.scanner.peek_token().start_mark,
+ )
+ token = self.scanner.get_token()
+ start_mark = token.start_mark
+ end_mark = token.end_mark
+ # if self.loader is not None and \
+ # end_mark.line != self.scanner.peek_token().start_mark.line:
+ # self.loader.scalar_after_indicator = False
+ event = DocumentStartEvent(
+ start_mark, end_mark, explicit=True, version=version, tags=tags
+ ) # type: Any
+ self.states.append(self.parse_document_end)
+ self.state = self.parse_document_content
+ else:
+ # Parse the end of the stream.
+ token = self.scanner.get_token()
+ event = StreamEndEvent(
+ token.start_mark, token.end_mark, comment=token.comment
+ )
+ assert not self.states
+ assert not self.marks
+ self.state = None
+ return event
+
+ def parse_document_end(self):
+ # type: () -> Any
+ # Parse the document end.
+ token = self.scanner.peek_token()
+ start_mark = end_mark = token.start_mark
+ explicit = False
+ if self.scanner.check_token(DocumentEndToken):
+ token = self.scanner.get_token()
+ end_mark = token.end_mark
+ explicit = True
+ event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
+
+ # Prepare the next state.
+ if self.resolver.processing_version == (1, 1):
+ self.state = self.parse_document_start
+ else:
+ self.state = self.parse_implicit_document_start
+
+ return event
+
+ def parse_document_content(self):
+ # type: () -> Any
+ if self.scanner.check_token(
+ DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken
+ ):
+ event = self.process_empty_scalar(self.scanner.peek_token().start_mark)
+ self.state = self.states.pop()
+ return event
+ else:
+ return self.parse_block_node()
+
+ def process_directives(self):
+ # type: () -> Any
+ yaml_version = None
+ self.tag_handles = {}
+ while self.scanner.check_token(DirectiveToken):
+ token = self.scanner.get_token()
+ if token.name == 'YAML':
+ if yaml_version is not None:
+ raise ParserError(
+ None, None, 'found duplicate YAML directive', token.start_mark
+ )
+ major, minor = token.value
+ if major != 1:
+ raise ParserError(
+ None,
+ None,
+ 'found incompatible YAML document (version 1.* is required)',
+ token.start_mark,
+ )
+ yaml_version = token.value
+ elif token.name == 'TAG':
+ handle, prefix = token.value
+ if handle in self.tag_handles:
+ raise ParserError(
+ None,
+ None,
+ _F('duplicate tag handle {handle!r}', handle=handle),
+ token.start_mark,
+ )
+ self.tag_handles[handle] = prefix
+ if bool(self.tag_handles):
+ value = yaml_version, self.tag_handles.copy() # type: Any
+ else:
+ value = yaml_version, None
+ if self.loader is not None and hasattr(self.loader, 'tags'):
+ self.loader.version = yaml_version
+ if self.loader.tags is None:
+ self.loader.tags = {}
+ for k in self.tag_handles:
+ self.loader.tags[k] = self.tag_handles[k]
+ for key in self.DEFAULT_TAGS:
+ if key not in self.tag_handles:
+ self.tag_handles[key] = self.DEFAULT_TAGS[key]
+ return value
+
+ # block_node_or_indentless_sequence ::= ALIAS
+ # | properties (block_content | indentless_block_sequence)?
+ # | block_content
+ # | indentless_block_sequence
+ # block_node ::= ALIAS
+ # | properties block_content?
+ # | block_content
+ # flow_node ::= ALIAS
+ # | properties flow_content?
+ # | flow_content
+ # properties ::= TAG ANCHOR? | ANCHOR TAG?
+ # block_content ::= block_collection | flow_collection | SCALAR
+ # flow_content ::= flow_collection | SCALAR
+ # block_collection ::= block_sequence | block_mapping
+ # flow_collection ::= flow_sequence | flow_mapping
+
+ def parse_block_node(self):
+ # type: () -> Any
+ return self.parse_node(block=True)
+
+ def parse_flow_node(self):
+ # type: () -> Any
+ return self.parse_node()
+
+ def parse_block_node_or_indentless_sequence(self):
+ # type: () -> Any
+ return self.parse_node(block=True, indentless_sequence=True)
+
+ def transform_tag(self, handle, suffix):
+ # type: (Any, Any) -> Any
+ return self.tag_handles[handle] + suffix
+
+ def parse_node(self, block=False, indentless_sequence=False):
+ # type: (bool, bool) -> Any
+ if self.scanner.check_token(AliasToken):
+ token = self.scanner.get_token()
+ event = AliasEvent(
+ token.value, token.start_mark, token.end_mark
+ ) # type: Any
+ self.state = self.states.pop()
+ return event
+
+ anchor = None
+ tag = None
+ start_mark = end_mark = tag_mark = None
+ if self.scanner.check_token(AnchorToken):
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ start_mark = token.start_mark
+ end_mark = token.end_mark
+ anchor = token.value
+ if self.scanner.check_token(TagToken):
+ token = self.scanner.get_token()
+ tag_mark = token.start_mark
+ end_mark = token.end_mark
+ tag = token.value
+ elif self.scanner.check_token(TagToken):
+ token = self.scanner.get_token()
+ start_mark = tag_mark = token.start_mark
+ end_mark = token.end_mark
+ tag = token.value
+ if self.scanner.check_token(AnchorToken):
+ token = self.scanner.get_token()
+ start_mark = tag_mark = token.start_mark
+ end_mark = token.end_mark
+ anchor = token.value
+ if tag is not None:
+ handle, suffix = tag
+ if handle is not None:
+ if handle not in self.tag_handles:
+ raise ParserError(
+ 'while parsing a node',
+ start_mark,
+ _F('found undefined tag handle {handle!r}', handle=handle),
+ tag_mark,
+ )
+ tag = self.transform_tag(handle, suffix)
+ else:
+ tag = suffix
+ # if tag == '!':
+ # raise ParserError("while parsing a node", start_mark,
+ # "found non-specific tag '!'", tag_mark,
+ # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag'
+ # and share your opinion.")
+ if start_mark is None:
+ start_mark = end_mark = self.scanner.peek_token().start_mark
+ event = None
+ implicit = tag is None or tag == '!'
+ if indentless_sequence and self.scanner.check_token(BlockEntryToken):
+ comment = None
+ pt = self.scanner.peek_token()
+ if self.loader and self.loader.comment_handling is None:
+ if pt.comment and pt.comment[0]:
+ comment = [pt.comment[0], []]
+ pt.comment[0] = None
+ elif self.loader:
+ if pt.comment:
+ comment = pt.comment
+ end_mark = self.scanner.peek_token().end_mark
+ event = SequenceStartEvent(
+ anchor,
+ tag,
+ implicit,
+ start_mark,
+ end_mark,
+ flow_style=False,
+ comment=comment,
+ )
+ self.state = self.parse_indentless_sequence_entry
+ return event
+
+ if self.scanner.check_token(ScalarToken):
+ token = self.scanner.get_token()
+ # self.scanner.peek_token_same_line_comment(token)
+ end_mark = token.end_mark
+ if (token.plain and tag is None) or tag == '!':
+ implicit = (True, False)
+ elif tag is None:
+ implicit = (False, True)
+ else:
+ implicit = (False, False)
+ # nprint('se', token.value, token.comment)
+ event = ScalarEvent(
+ anchor,
+ tag,
+ implicit,
+ token.value,
+ start_mark,
+ end_mark,
+ style=token.style,
+ comment=token.comment,
+ )
+ self.state = self.states.pop()
+ elif self.scanner.check_token(FlowSequenceStartToken):
+ pt = self.scanner.peek_token()
+ end_mark = pt.end_mark
+ event = SequenceStartEvent(
+ anchor,
+ tag,
+ implicit,
+ start_mark,
+ end_mark,
+ flow_style=True,
+ comment=pt.comment,
+ )
+ self.state = self.parse_flow_sequence_first_entry
+ elif self.scanner.check_token(FlowMappingStartToken):
+ pt = self.scanner.peek_token()
+ end_mark = pt.end_mark
+ event = MappingStartEvent(
+ anchor,
+ tag,
+ implicit,
+ start_mark,
+ end_mark,
+ flow_style=True,
+ comment=pt.comment,
+ )
+ self.state = self.parse_flow_mapping_first_key
+ elif block and self.scanner.check_token(BlockSequenceStartToken):
+ end_mark = self.scanner.peek_token().start_mark
+ # should inserting the comment be dependent on the
+ # indentation?
+ pt = self.scanner.peek_token()
+ comment = pt.comment
+ # nprint('pt0', type(pt))
+ if comment is None or comment[1] is None:
+ comment = pt.split_old_comment()
+ # nprint('pt1', comment)
+ event = SequenceStartEvent(
+ anchor,
+ tag,
+ implicit,
+ start_mark,
+ end_mark,
+ flow_style=False,
+ comment=comment,
+ )
+ self.state = self.parse_block_sequence_first_entry
+ elif block and self.scanner.check_token(BlockMappingStartToken):
+ end_mark = self.scanner.peek_token().start_mark
+ comment = self.scanner.peek_token().comment
+ event = MappingStartEvent(
+ anchor,
+ tag,
+ implicit,
+ start_mark,
+ end_mark,
+ flow_style=False,
+ comment=comment,
+ )
+ self.state = self.parse_block_mapping_first_key
+ elif anchor is not None or tag is not None:
+ # Empty scalars are allowed even if a tag or an anchor is
+ # specified.
+ event = ScalarEvent(
+ anchor, tag, (implicit, False), "", start_mark, end_mark
+ )
+ self.state = self.states.pop()
+ else:
+ if block:
+ node = 'block'
+ else:
+ node = 'flow'
+ token = self.scanner.peek_token()
+ raise ParserError(
+ _F('while parsing a {node!s} node', node=node),
+ start_mark,
+ _F(
+ 'expected the node content, but found {token_id!r}',
+ token_id=token.id,
+ ),
+ token.start_mark,
+ )
+ return event
+
+ # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
+ # BLOCK-END
+
+ def parse_block_sequence_first_entry(self):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ # move any comment from start token
+ # self.move_token_comment(token)
+ self.marks.append(token.start_mark)
+ return self.parse_block_sequence_entry()
+
+ def parse_block_sequence_entry(self):
+ # type: () -> Any
+ if self.scanner.check_token(BlockEntryToken):
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ if not self.scanner.check_token(BlockEntryToken, BlockEndToken):
+ self.states.append(self.parse_block_sequence_entry)
+ return self.parse_block_node()
+ else:
+ self.state = self.parse_block_sequence_entry
+ return self.process_empty_scalar(token.end_mark)
+ if not self.scanner.check_token(BlockEndToken):
+ token = self.scanner.peek_token()
+ raise ParserError(
+ 'while parsing a block collection',
+ self.marks[-1],
+ _F('expected <block end>, but found {token_id!r}', token_id=token.id),
+ token.start_mark,
+ )
+ token = self.scanner.get_token() # BlockEndToken
+ event = SequenceEndEvent(
+ token.start_mark, token.end_mark, comment=token.comment
+ )
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ # indentless_sequence ::= (BLOCK-ENTRY block_node?)+
+
+ # indentless_sequence?
+ # sequence:
+ # - entry
+ # - nested
+
+ def parse_indentless_sequence_entry(self):
+ # type: () -> Any
+ if self.scanner.check_token(BlockEntryToken):
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ if not self.scanner.check_token(
+ BlockEntryToken, KeyToken, ValueToken, BlockEndToken
+ ):
+ self.states.append(self.parse_indentless_sequence_entry)
+ return self.parse_block_node()
+ else:
+ self.state = self.parse_indentless_sequence_entry
+ return self.process_empty_scalar(token.end_mark)
+ token = self.scanner.peek_token()
+ c = None
+ if self.loader and self.loader.comment_handling is None:
+ c = token.comment
+ start_mark = token.start_mark
+ else:
+ start_mark = self.last_event.end_mark # type: ignore
+ c = self.distribute_comment(token.comment, start_mark.line) # type: ignore
+ event = SequenceEndEvent(start_mark, start_mark, comment=c)
+ self.state = self.states.pop()
+ return event
+
+ # block_mapping ::= BLOCK-MAPPING_START
+ # ((KEY block_node_or_indentless_sequence?)?
+ # (VALUE block_node_or_indentless_sequence?)?)*
+ # BLOCK-END
+
+ def parse_block_mapping_first_key(self):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ self.marks.append(token.start_mark)
+ return self.parse_block_mapping_key()
+
+ def parse_block_mapping_key(self):
+ # type: () -> Any
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
+ self.states.append(self.parse_block_mapping_value)
+ return self.parse_block_node_or_indentless_sequence()
+ else:
+ self.state = self.parse_block_mapping_value
+ return self.process_empty_scalar(token.end_mark)
+ if self.resolver.processing_version > (1, 1) and self.scanner.check_token(
+ ValueToken
+ ):
+ self.state = self.parse_block_mapping_value
+ return self.process_empty_scalar(self.scanner.peek_token().start_mark)
+ if not self.scanner.check_token(BlockEndToken):
+ token = self.scanner.peek_token()
+ raise ParserError(
+ 'while parsing a block mapping',
+ self.marks[-1],
+ _F('expected <block end>, but found {token_id!r}', token_id=token.id),
+ token.start_mark,
+ )
+ token = self.scanner.get_token()
+ self.move_token_comment(token)
+ event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment)
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ def parse_block_mapping_value(self):
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
+ # value token might have post comment move it to e.g. block
+ if self.scanner.check_token(ValueToken):
+ self.move_token_comment(token)
+ else:
+ if not self.scanner.check_token(KeyToken):
+ self.move_token_comment(token, empty=True)
+ # else: empty value for this key cannot move token.comment
+ if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
+ self.states.append(self.parse_block_mapping_key)
+ return self.parse_block_node_or_indentless_sequence()
+ else:
+ self.state = self.parse_block_mapping_key
+ comment = token.comment
+ if comment is None:
+ token = self.scanner.peek_token()
+ comment = token.comment
+ if comment:
+ token._comment = [None, comment[1]]
+ comment = [comment[0], None]
+ return self.process_empty_scalar(token.end_mark, comment=comment)
+ else:
+ self.state = self.parse_block_mapping_key
+ token = self.scanner.peek_token()
+ return self.process_empty_scalar(token.start_mark)
+
+ # flow_sequence ::= FLOW-SEQUENCE-START
+ # (flow_sequence_entry FLOW-ENTRY)*
+ # flow_sequence_entry?
+ # FLOW-SEQUENCE-END
+ # flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
+ #
+ # Note that while production rules for both flow_sequence_entry and
+ # flow_mapping_entry are equal, their interpretations are different.
+ # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?`
+ # generate an inline mapping (set syntax).
+
+ def parse_flow_sequence_first_entry(self):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ self.marks.append(token.start_mark)
+ return self.parse_flow_sequence_entry(first=True)
+
+ def parse_flow_sequence_entry(self, first=False):
+ # type: (bool) -> Any
+ if not self.scanner.check_token(FlowSequenceEndToken):
+ if not first:
+ if self.scanner.check_token(FlowEntryToken):
+ self.scanner.get_token()
+ else:
+ token = self.scanner.peek_token()
+ raise ParserError(
+ 'while parsing a flow sequence',
+ self.marks[-1],
+ _F(
+ "expected ',' or ']', but got {token_id!r}",
+ token_id=token.id,
+ ),
+ token.start_mark,
+ )
+
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.peek_token()
+ event = MappingStartEvent(
+ None, None, True, token.start_mark, token.end_mark, flow_style=True
+ ) # type: Any
+ self.state = self.parse_flow_sequence_entry_mapping_key
+ return event
+ elif not self.scanner.check_token(FlowSequenceEndToken):
+ self.states.append(self.parse_flow_sequence_entry)
+ return self.parse_flow_node()
+ token = self.scanner.get_token()
+ event = SequenceEndEvent(
+ token.start_mark, token.end_mark, comment=token.comment
+ )
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ def parse_flow_sequence_entry_mapping_key(self):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(
+ ValueToken, FlowEntryToken, FlowSequenceEndToken
+ ):
+ self.states.append(self.parse_flow_sequence_entry_mapping_value)
+ return self.parse_flow_node()
+ else:
+ self.state = self.parse_flow_sequence_entry_mapping_value
+ return self.process_empty_scalar(token.end_mark)
+
+ def parse_flow_sequence_entry_mapping_value(self):
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken):
+ self.states.append(self.parse_flow_sequence_entry_mapping_end)
+ return self.parse_flow_node()
+ else:
+ self.state = self.parse_flow_sequence_entry_mapping_end
+ return self.process_empty_scalar(token.end_mark)
+ else:
+ self.state = self.parse_flow_sequence_entry_mapping_end
+ token = self.scanner.peek_token()
+ return self.process_empty_scalar(token.start_mark)
+
+ def parse_flow_sequence_entry_mapping_end(self):
+ # type: () -> Any
+ self.state = self.parse_flow_sequence_entry
+ token = self.scanner.peek_token()
+ return MappingEndEvent(token.start_mark, token.start_mark)
+
+ # flow_mapping ::= FLOW-MAPPING-START
+ # (flow_mapping_entry FLOW-ENTRY)*
+ # flow_mapping_entry?
+ # FLOW-MAPPING-END
+ # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
+
+ def parse_flow_mapping_first_key(self):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ self.marks.append(token.start_mark)
+ return self.parse_flow_mapping_key(first=True)
+
+ def parse_flow_mapping_key(self, first=False):
+ # type: (Any) -> Any
+ if not self.scanner.check_token(FlowMappingEndToken):
+ if not first:
+ if self.scanner.check_token(FlowEntryToken):
+ self.scanner.get_token()
+ else:
+ token = self.scanner.peek_token()
+ raise ParserError(
+ 'while parsing a flow mapping',
+ self.marks[-1],
+ _F(
+ "expected ',' or '}}', but got {token_id!r}",
+ token_id=token.id,
+ ),
+ token.start_mark,
+ )
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(
+ ValueToken, FlowEntryToken, FlowMappingEndToken
+ ):
+ self.states.append(self.parse_flow_mapping_value)
+ return self.parse_flow_node()
+ else:
+ self.state = self.parse_flow_mapping_value
+ return self.process_empty_scalar(token.end_mark)
+ elif self.resolver.processing_version > (1, 1) and self.scanner.check_token(
+ ValueToken
+ ):
+ self.state = self.parse_flow_mapping_value
+ return self.process_empty_scalar(self.scanner.peek_token().end_mark)
+ elif not self.scanner.check_token(FlowMappingEndToken):
+ self.states.append(self.parse_flow_mapping_empty_value)
+ return self.parse_flow_node()
+ token = self.scanner.get_token()
+ event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment)
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ def parse_flow_mapping_value(self):
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken):
+ self.states.append(self.parse_flow_mapping_key)
+ return self.parse_flow_node()
+ else:
+ self.state = self.parse_flow_mapping_key
+ return self.process_empty_scalar(token.end_mark)
+ else:
+ self.state = self.parse_flow_mapping_key
+ token = self.scanner.peek_token()
+ return self.process_empty_scalar(token.start_mark)
+
+ def parse_flow_mapping_empty_value(self):
+ # type: () -> Any
+ self.state = self.parse_flow_mapping_key
+ return self.process_empty_scalar(self.scanner.peek_token().start_mark)
+
+ def process_empty_scalar(self, mark, comment=None):
+ # type: (Any, Any) -> Any
+ return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment)
+
+ def move_token_comment(self, token, nt=None, empty=False):
+ # type: (Any, Optional[Any], Optional[bool]) -> Any
+ pass
+
+
+class RoundTripParser(Parser):
+ """roundtrip is a safe loader, that wants to see the unmangled tag"""
+
+ def transform_tag(self, handle, suffix):
+ # type: (Any, Any) -> Any
+ # return self.tag_handles[handle]+suffix
+ if handle == '!!' and suffix in (
+ 'null',
+ 'bool',
+ 'int',
+ 'float',
+ 'binary',
+ 'timestamp',
+ 'omap',
+ 'pairs',
+ 'set',
+ 'str',
+ 'seq',
+ 'map',
+ ):
+ return Parser.transform_tag(self, handle, suffix)
+ return handle + suffix
+
+ def move_token_comment(self, token, nt=None, empty=False):
+ # type: (Any, Optional[Any], Optional[bool]) -> Any
+ token.move_old_comment(
+ self.scanner.peek_token() if nt is None else nt, empty=empty
+ )
+
+
+class RoundTripParserSC(RoundTripParser):
+ """roundtrip is a safe loader, that wants to see the unmangled tag"""
+
+ # some of the differences are based on the superclass testing
+ # if self.loader.comment_handling is not None
+
+ def move_token_comment(self, token, nt=None, empty=False):
+ # type: (Any, Any, Any, Optional[bool]) -> None
+ token.move_new_comment(
+ self.scanner.peek_token() if nt is None else nt, empty=empty
+ )
+
+ def distribute_comment(self, comment, line):
+ # type: (Any, Any) -> Any
+ # ToDo, look at indentation of the comment to determine attachment
+ if comment is None:
+ return None
+ if not comment[0]:
+ return None
+ if comment[0][0] != line + 1:
+ nprintf('>>>dcxxx', comment, line)
+ assert comment[0][0] == line + 1
+ # if comment[0] - line > 1:
+ # return
+ typ = self.loader.comment_handling & 0b11
+ # nprintf('>>>dca', comment, line, typ)
+ if typ == C_POST:
+ return None
+ if typ == C_PRE:
+ c = [None, None, comment[0]]
+ comment[0] = None
+ return c
+ # nprintf('>>>dcb', comment[0])
+ for _idx, cmntidx in enumerate(comment[0]):
+ # nprintf('>>>dcb', cmntidx)
+ if isinstance(self.scanner.comments[cmntidx], BlankLineComment):
+ break
+ else:
+ return None # no space found
+ if _idx == 0:
+ return None # first line was blank
+ # nprintf('>>>dcc', idx)
+ if typ == C_SPLIT_ON_FIRST_BLANK:
+ c = [None, None, comment[0][:_idx]]
+ comment[0] = comment[0][_idx:]
+ return c
+ raise NotImplementedError # reserved