diff options
Diffstat (limited to '')
-rw-r--r-- | lib/ruyaml/parser.py | 938 |
1 files changed, 938 insertions, 0 deletions
diff --git a/lib/ruyaml/parser.py b/lib/ruyaml/parser.py new file mode 100644 index 0000000..f17331b --- /dev/null +++ b/lib/ruyaml/parser.py @@ -0,0 +1,938 @@ +# coding: utf-8 + +# The following YAML grammar is LL(1) and is parsed by a recursive descent +# parser. +# +# stream ::= STREAM-START implicit_document? explicit_document* +# STREAM-END +# implicit_document ::= block_node DOCUMENT-END* +# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* +# block_node_or_indentless_sequence ::= +# ALIAS +# | properties (block_content | +# indentless_block_sequence)? +# | block_content +# | indentless_block_sequence +# block_node ::= ALIAS +# | properties block_content? +# | block_content +# flow_node ::= ALIAS +# | properties flow_content? +# | flow_content +# properties ::= TAG ANCHOR? | ANCHOR TAG? +# block_content ::= block_collection | flow_collection | SCALAR +# flow_content ::= flow_collection | SCALAR +# block_collection ::= block_sequence | block_mapping +# flow_collection ::= flow_sequence | flow_mapping +# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* +# BLOCK-END +# indentless_sequence ::= (BLOCK-ENTRY block_node?)+ +# block_mapping ::= BLOCK-MAPPING_START +# ((KEY block_node_or_indentless_sequence?)? +# (VALUE block_node_or_indentless_sequence?)?)* +# BLOCK-END +# flow_sequence ::= FLOW-SEQUENCE-START +# (flow_sequence_entry FLOW-ENTRY)* +# flow_sequence_entry? +# FLOW-SEQUENCE-END +# flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? +# flow_mapping ::= FLOW-MAPPING-START +# (flow_mapping_entry FLOW-ENTRY)* +# flow_mapping_entry? +# FLOW-MAPPING-END +# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? +# +# FIRST sets: +# +# stream: { STREAM-START <} +# explicit_document: { DIRECTIVE DOCUMENT-START } +# implicit_document: FIRST(block_node) +# block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START +# BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START } +# flow_node: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START } +# block_content: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START +# FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } +# flow_content: { FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } +# block_collection: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START } +# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } +# block_sequence: { BLOCK-SEQUENCE-START } +# block_mapping: { BLOCK-MAPPING-START } +# block_node_or_indentless_sequence: { ALIAS ANCHOR TAG SCALAR +# BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START +# FLOW-MAPPING-START BLOCK-ENTRY } +# indentless_sequence: { ENTRY } +# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } +# flow_sequence: { FLOW-SEQUENCE-START } +# flow_mapping: { FLOW-MAPPING-START } +# flow_sequence_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START +# FLOW-MAPPING-START KEY } +# flow_mapping_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START +# FLOW-MAPPING-START KEY } + +# need to have full path with import, as pkg_resources tries to load parser.py in __init__.py +# only to not do anything with the package afterwards +# and for Jython too + + +from ruyaml.comments import C_POST, C_PRE, C_SPLIT_ON_FIRST_BLANK +from ruyaml.compat import _F, nprint, nprintf # NOQA +from ruyaml.error import MarkedYAMLError +from ruyaml.events import * # NOQA +from ruyaml.scanner import ( # NOQA + BlankLineComment, + RoundTripScanner, + Scanner, + ScannerError, +) +from ruyaml.tokens import * # NOQA + +if False: # MYPY + from typing import Any, Dict, List, Optional # NOQA + +__all__ = ['Parser', 'RoundTripParser', 'ParserError'] + + +def xprintf(*args, **kw): + # type: (Any, Any) -> Any + return nprintf(*args, **kw) + pass + + +class ParserError(MarkedYAMLError): + pass + + +class Parser: + # Since writing a recursive-descendant parser is a straightforward task, we + # do not give many comments here. + + DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'} + + def __init__(self, loader): + # type: (Any) -> None + self.loader = loader + if self.loader is not None and getattr(self.loader, '_parser', None) is None: + self.loader._parser = self + self.reset_parser() + + def reset_parser(self): + # type: () -> None + # Reset the state attributes (to clear self-references) + self.current_event = self.last_event = None + self.tag_handles = {} # type: Dict[Any, Any] + self.states = [] # type: List[Any] + self.marks = [] # type: List[Any] + self.state = self.parse_stream_start # type: Any + + def dispose(self): + # type: () -> None + self.reset_parser() + + @property + def scanner(self): + # type: () -> Any + if hasattr(self.loader, 'typ'): + return self.loader.scanner + return self.loader._scanner + + @property + def resolver(self): + # type: () -> Any + if hasattr(self.loader, 'typ'): + return self.loader.resolver + return self.loader._resolver + + def check_event(self, *choices): + # type: (Any) -> bool + # Check the type of the next event. + if self.current_event is None: + if self.state: + self.current_event = self.state() + if self.current_event is not None: + if not choices: + return True + for choice in choices: + if isinstance(self.current_event, choice): + return True + return False + + def peek_event(self): + # type: () -> Any + # Get the next event. + if self.current_event is None: + if self.state: + self.current_event = self.state() + return self.current_event + + def get_event(self): + # type: () -> Any + # Get the next event and proceed further. + if self.current_event is None: + if self.state: + self.current_event = self.state() + # assert self.current_event is not None + # if self.current_event.end_mark.line != self.peek_event().start_mark.line: + xprintf( + 'get_event', repr(self.current_event), self.peek_event().start_mark.line + ) + self.last_event = value = self.current_event + self.current_event = None + return value + + # stream ::= STREAM-START implicit_document? explicit_document* + # STREAM-END + # implicit_document ::= block_node DOCUMENT-END* + # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* + + def parse_stream_start(self): + # type: () -> Any + # Parse the stream start. + token = self.scanner.get_token() + self.move_token_comment(token) + event = StreamStartEvent( + token.start_mark, token.end_mark, encoding=token.encoding + ) + + # Prepare the next state. + self.state = self.parse_implicit_document_start + + return event + + def parse_implicit_document_start(self): + # type: () -> Any + # Parse an implicit document. + if not self.scanner.check_token( + DirectiveToken, DocumentStartToken, StreamEndToken + ): + self.tag_handles = self.DEFAULT_TAGS + token = self.scanner.peek_token() + start_mark = end_mark = token.start_mark + event = DocumentStartEvent(start_mark, end_mark, explicit=False) + + # Prepare the next state. + self.states.append(self.parse_document_end) + self.state = self.parse_block_node + + return event + + else: + return self.parse_document_start() + + def parse_document_start(self): + # type: () -> Any + # Parse any extra document end indicators. + while self.scanner.check_token(DocumentEndToken): + self.scanner.get_token() + # Parse an explicit document. + if not self.scanner.check_token(StreamEndToken): + version, tags = self.process_directives() + if not self.scanner.check_token(DocumentStartToken): + raise ParserError( + None, + None, + _F( + "expected '<document start>', but found {pt!r}", + pt=self.scanner.peek_token().id, + ), + self.scanner.peek_token().start_mark, + ) + token = self.scanner.get_token() + start_mark = token.start_mark + end_mark = token.end_mark + # if self.loader is not None and \ + # end_mark.line != self.scanner.peek_token().start_mark.line: + # self.loader.scalar_after_indicator = False + event = DocumentStartEvent( + start_mark, end_mark, explicit=True, version=version, tags=tags + ) # type: Any + self.states.append(self.parse_document_end) + self.state = self.parse_document_content + else: + # Parse the end of the stream. + token = self.scanner.get_token() + event = StreamEndEvent( + token.start_mark, token.end_mark, comment=token.comment + ) + assert not self.states + assert not self.marks + self.state = None + return event + + def parse_document_end(self): + # type: () -> Any + # Parse the document end. + token = self.scanner.peek_token() + start_mark = end_mark = token.start_mark + explicit = False + if self.scanner.check_token(DocumentEndToken): + token = self.scanner.get_token() + end_mark = token.end_mark + explicit = True + event = DocumentEndEvent(start_mark, end_mark, explicit=explicit) + + # Prepare the next state. + if self.resolver.processing_version == (1, 1): + self.state = self.parse_document_start + else: + self.state = self.parse_implicit_document_start + + return event + + def parse_document_content(self): + # type: () -> Any + if self.scanner.check_token( + DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken + ): + event = self.process_empty_scalar(self.scanner.peek_token().start_mark) + self.state = self.states.pop() + return event + else: + return self.parse_block_node() + + def process_directives(self): + # type: () -> Any + yaml_version = None + self.tag_handles = {} + while self.scanner.check_token(DirectiveToken): + token = self.scanner.get_token() + if token.name == 'YAML': + if yaml_version is not None: + raise ParserError( + None, None, 'found duplicate YAML directive', token.start_mark + ) + major, minor = token.value + if major != 1: + raise ParserError( + None, + None, + 'found incompatible YAML document (version 1.* is required)', + token.start_mark, + ) + yaml_version = token.value + elif token.name == 'TAG': + handle, prefix = token.value + if handle in self.tag_handles: + raise ParserError( + None, + None, + _F('duplicate tag handle {handle!r}', handle=handle), + token.start_mark, + ) + self.tag_handles[handle] = prefix + if bool(self.tag_handles): + value = yaml_version, self.tag_handles.copy() # type: Any + else: + value = yaml_version, None + if self.loader is not None and hasattr(self.loader, 'tags'): + self.loader.version = yaml_version + if self.loader.tags is None: + self.loader.tags = {} + for k in self.tag_handles: + self.loader.tags[k] = self.tag_handles[k] + for key in self.DEFAULT_TAGS: + if key not in self.tag_handles: + self.tag_handles[key] = self.DEFAULT_TAGS[key] + return value + + # block_node_or_indentless_sequence ::= ALIAS + # | properties (block_content | indentless_block_sequence)? + # | block_content + # | indentless_block_sequence + # block_node ::= ALIAS + # | properties block_content? + # | block_content + # flow_node ::= ALIAS + # | properties flow_content? + # | flow_content + # properties ::= TAG ANCHOR? | ANCHOR TAG? + # block_content ::= block_collection | flow_collection | SCALAR + # flow_content ::= flow_collection | SCALAR + # block_collection ::= block_sequence | block_mapping + # flow_collection ::= flow_sequence | flow_mapping + + def parse_block_node(self): + # type: () -> Any + return self.parse_node(block=True) + + def parse_flow_node(self): + # type: () -> Any + return self.parse_node() + + def parse_block_node_or_indentless_sequence(self): + # type: () -> Any + return self.parse_node(block=True, indentless_sequence=True) + + def transform_tag(self, handle, suffix): + # type: (Any, Any) -> Any + return self.tag_handles[handle] + suffix + + def parse_node(self, block=False, indentless_sequence=False): + # type: (bool, bool) -> Any + if self.scanner.check_token(AliasToken): + token = self.scanner.get_token() + event = AliasEvent( + token.value, token.start_mark, token.end_mark + ) # type: Any + self.state = self.states.pop() + return event + + anchor = None + tag = None + start_mark = end_mark = tag_mark = None + if self.scanner.check_token(AnchorToken): + token = self.scanner.get_token() + self.move_token_comment(token) + start_mark = token.start_mark + end_mark = token.end_mark + anchor = token.value + if self.scanner.check_token(TagToken): + token = self.scanner.get_token() + tag_mark = token.start_mark + end_mark = token.end_mark + tag = token.value + elif self.scanner.check_token(TagToken): + token = self.scanner.get_token() + start_mark = tag_mark = token.start_mark + end_mark = token.end_mark + tag = token.value + if self.scanner.check_token(AnchorToken): + token = self.scanner.get_token() + start_mark = tag_mark = token.start_mark + end_mark = token.end_mark + anchor = token.value + if tag is not None: + handle, suffix = tag + if handle is not None: + if handle not in self.tag_handles: + raise ParserError( + 'while parsing a node', + start_mark, + _F('found undefined tag handle {handle!r}', handle=handle), + tag_mark, + ) + tag = self.transform_tag(handle, suffix) + else: + tag = suffix + # if tag == '!': + # raise ParserError("while parsing a node", start_mark, + # "found non-specific tag '!'", tag_mark, + # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag' + # and share your opinion.") + if start_mark is None: + start_mark = end_mark = self.scanner.peek_token().start_mark + event = None + implicit = tag is None or tag == '!' + if indentless_sequence and self.scanner.check_token(BlockEntryToken): + comment = None + pt = self.scanner.peek_token() + if self.loader and self.loader.comment_handling is None: + if pt.comment and pt.comment[0]: + comment = [pt.comment[0], []] + pt.comment[0] = None + elif self.loader: + if pt.comment: + comment = pt.comment + end_mark = self.scanner.peek_token().end_mark + event = SequenceStartEvent( + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=False, + comment=comment, + ) + self.state = self.parse_indentless_sequence_entry + return event + + if self.scanner.check_token(ScalarToken): + token = self.scanner.get_token() + # self.scanner.peek_token_same_line_comment(token) + end_mark = token.end_mark + if (token.plain and tag is None) or tag == '!': + implicit = (True, False) + elif tag is None: + implicit = (False, True) + else: + implicit = (False, False) + # nprint('se', token.value, token.comment) + event = ScalarEvent( + anchor, + tag, + implicit, + token.value, + start_mark, + end_mark, + style=token.style, + comment=token.comment, + ) + self.state = self.states.pop() + elif self.scanner.check_token(FlowSequenceStartToken): + pt = self.scanner.peek_token() + end_mark = pt.end_mark + event = SequenceStartEvent( + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=True, + comment=pt.comment, + ) + self.state = self.parse_flow_sequence_first_entry + elif self.scanner.check_token(FlowMappingStartToken): + pt = self.scanner.peek_token() + end_mark = pt.end_mark + event = MappingStartEvent( + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=True, + comment=pt.comment, + ) + self.state = self.parse_flow_mapping_first_key + elif block and self.scanner.check_token(BlockSequenceStartToken): + end_mark = self.scanner.peek_token().start_mark + # should inserting the comment be dependent on the + # indentation? + pt = self.scanner.peek_token() + comment = pt.comment + # nprint('pt0', type(pt)) + if comment is None or comment[1] is None: + comment = pt.split_old_comment() + # nprint('pt1', comment) + event = SequenceStartEvent( + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=False, + comment=comment, + ) + self.state = self.parse_block_sequence_first_entry + elif block and self.scanner.check_token(BlockMappingStartToken): + end_mark = self.scanner.peek_token().start_mark + comment = self.scanner.peek_token().comment + event = MappingStartEvent( + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=False, + comment=comment, + ) + self.state = self.parse_block_mapping_first_key + elif anchor is not None or tag is not None: + # Empty scalars are allowed even if a tag or an anchor is + # specified. + event = ScalarEvent( + anchor, tag, (implicit, False), "", start_mark, end_mark + ) + self.state = self.states.pop() + else: + if block: + node = 'block' + else: + node = 'flow' + token = self.scanner.peek_token() + raise ParserError( + _F('while parsing a {node!s} node', node=node), + start_mark, + _F( + 'expected the node content, but found {token_id!r}', + token_id=token.id, + ), + token.start_mark, + ) + return event + + # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* + # BLOCK-END + + def parse_block_sequence_first_entry(self): + # type: () -> Any + token = self.scanner.get_token() + # move any comment from start token + # self.move_token_comment(token) + self.marks.append(token.start_mark) + return self.parse_block_sequence_entry() + + def parse_block_sequence_entry(self): + # type: () -> Any + if self.scanner.check_token(BlockEntryToken): + token = self.scanner.get_token() + self.move_token_comment(token) + if not self.scanner.check_token(BlockEntryToken, BlockEndToken): + self.states.append(self.parse_block_sequence_entry) + return self.parse_block_node() + else: + self.state = self.parse_block_sequence_entry + return self.process_empty_scalar(token.end_mark) + if not self.scanner.check_token(BlockEndToken): + token = self.scanner.peek_token() + raise ParserError( + 'while parsing a block collection', + self.marks[-1], + _F('expected <block end>, but found {token_id!r}', token_id=token.id), + token.start_mark, + ) + token = self.scanner.get_token() # BlockEndToken + event = SequenceEndEvent( + token.start_mark, token.end_mark, comment=token.comment + ) + self.state = self.states.pop() + self.marks.pop() + return event + + # indentless_sequence ::= (BLOCK-ENTRY block_node?)+ + + # indentless_sequence? + # sequence: + # - entry + # - nested + + def parse_indentless_sequence_entry(self): + # type: () -> Any + if self.scanner.check_token(BlockEntryToken): + token = self.scanner.get_token() + self.move_token_comment(token) + if not self.scanner.check_token( + BlockEntryToken, KeyToken, ValueToken, BlockEndToken + ): + self.states.append(self.parse_indentless_sequence_entry) + return self.parse_block_node() + else: + self.state = self.parse_indentless_sequence_entry + return self.process_empty_scalar(token.end_mark) + token = self.scanner.peek_token() + c = None + if self.loader and self.loader.comment_handling is None: + c = token.comment + start_mark = token.start_mark + else: + start_mark = self.last_event.end_mark # type: ignore + c = self.distribute_comment(token.comment, start_mark.line) # type: ignore + event = SequenceEndEvent(start_mark, start_mark, comment=c) + self.state = self.states.pop() + return event + + # block_mapping ::= BLOCK-MAPPING_START + # ((KEY block_node_or_indentless_sequence?)? + # (VALUE block_node_or_indentless_sequence?)?)* + # BLOCK-END + + def parse_block_mapping_first_key(self): + # type: () -> Any + token = self.scanner.get_token() + self.marks.append(token.start_mark) + return self.parse_block_mapping_key() + + def parse_block_mapping_key(self): + # type: () -> Any + if self.scanner.check_token(KeyToken): + token = self.scanner.get_token() + self.move_token_comment(token) + if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): + self.states.append(self.parse_block_mapping_value) + return self.parse_block_node_or_indentless_sequence() + else: + self.state = self.parse_block_mapping_value + return self.process_empty_scalar(token.end_mark) + if self.resolver.processing_version > (1, 1) and self.scanner.check_token( + ValueToken + ): + self.state = self.parse_block_mapping_value + return self.process_empty_scalar(self.scanner.peek_token().start_mark) + if not self.scanner.check_token(BlockEndToken): + token = self.scanner.peek_token() + raise ParserError( + 'while parsing a block mapping', + self.marks[-1], + _F('expected <block end>, but found {token_id!r}', token_id=token.id), + token.start_mark, + ) + token = self.scanner.get_token() + self.move_token_comment(token) + event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) + self.state = self.states.pop() + self.marks.pop() + return event + + def parse_block_mapping_value(self): + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() + # value token might have post comment move it to e.g. block + if self.scanner.check_token(ValueToken): + self.move_token_comment(token) + else: + if not self.scanner.check_token(KeyToken): + self.move_token_comment(token, empty=True) + # else: empty value for this key cannot move token.comment + if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): + self.states.append(self.parse_block_mapping_key) + return self.parse_block_node_or_indentless_sequence() + else: + self.state = self.parse_block_mapping_key + comment = token.comment + if comment is None: + token = self.scanner.peek_token() + comment = token.comment + if comment: + token._comment = [None, comment[1]] + comment = [comment[0], None] + return self.process_empty_scalar(token.end_mark, comment=comment) + else: + self.state = self.parse_block_mapping_key + token = self.scanner.peek_token() + return self.process_empty_scalar(token.start_mark) + + # flow_sequence ::= FLOW-SEQUENCE-START + # (flow_sequence_entry FLOW-ENTRY)* + # flow_sequence_entry? + # FLOW-SEQUENCE-END + # flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? + # + # Note that while production rules for both flow_sequence_entry and + # flow_mapping_entry are equal, their interpretations are different. + # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?` + # generate an inline mapping (set syntax). + + def parse_flow_sequence_first_entry(self): + # type: () -> Any + token = self.scanner.get_token() + self.marks.append(token.start_mark) + return self.parse_flow_sequence_entry(first=True) + + def parse_flow_sequence_entry(self, first=False): + # type: (bool) -> Any + if not self.scanner.check_token(FlowSequenceEndToken): + if not first: + if self.scanner.check_token(FlowEntryToken): + self.scanner.get_token() + else: + token = self.scanner.peek_token() + raise ParserError( + 'while parsing a flow sequence', + self.marks[-1], + _F( + "expected ',' or ']', but got {token_id!r}", + token_id=token.id, + ), + token.start_mark, + ) + + if self.scanner.check_token(KeyToken): + token = self.scanner.peek_token() + event = MappingStartEvent( + None, None, True, token.start_mark, token.end_mark, flow_style=True + ) # type: Any + self.state = self.parse_flow_sequence_entry_mapping_key + return event + elif not self.scanner.check_token(FlowSequenceEndToken): + self.states.append(self.parse_flow_sequence_entry) + return self.parse_flow_node() + token = self.scanner.get_token() + event = SequenceEndEvent( + token.start_mark, token.end_mark, comment=token.comment + ) + self.state = self.states.pop() + self.marks.pop() + return event + + def parse_flow_sequence_entry_mapping_key(self): + # type: () -> Any + token = self.scanner.get_token() + if not self.scanner.check_token( + ValueToken, FlowEntryToken, FlowSequenceEndToken + ): + self.states.append(self.parse_flow_sequence_entry_mapping_value) + return self.parse_flow_node() + else: + self.state = self.parse_flow_sequence_entry_mapping_value + return self.process_empty_scalar(token.end_mark) + + def parse_flow_sequence_entry_mapping_value(self): + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() + if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken): + self.states.append(self.parse_flow_sequence_entry_mapping_end) + return self.parse_flow_node() + else: + self.state = self.parse_flow_sequence_entry_mapping_end + return self.process_empty_scalar(token.end_mark) + else: + self.state = self.parse_flow_sequence_entry_mapping_end + token = self.scanner.peek_token() + return self.process_empty_scalar(token.start_mark) + + def parse_flow_sequence_entry_mapping_end(self): + # type: () -> Any + self.state = self.parse_flow_sequence_entry + token = self.scanner.peek_token() + return MappingEndEvent(token.start_mark, token.start_mark) + + # flow_mapping ::= FLOW-MAPPING-START + # (flow_mapping_entry FLOW-ENTRY)* + # flow_mapping_entry? + # FLOW-MAPPING-END + # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? + + def parse_flow_mapping_first_key(self): + # type: () -> Any + token = self.scanner.get_token() + self.marks.append(token.start_mark) + return self.parse_flow_mapping_key(first=True) + + def parse_flow_mapping_key(self, first=False): + # type: (Any) -> Any + if not self.scanner.check_token(FlowMappingEndToken): + if not first: + if self.scanner.check_token(FlowEntryToken): + self.scanner.get_token() + else: + token = self.scanner.peek_token() + raise ParserError( + 'while parsing a flow mapping', + self.marks[-1], + _F( + "expected ',' or '}}', but got {token_id!r}", + token_id=token.id, + ), + token.start_mark, + ) + if self.scanner.check_token(KeyToken): + token = self.scanner.get_token() + if not self.scanner.check_token( + ValueToken, FlowEntryToken, FlowMappingEndToken + ): + self.states.append(self.parse_flow_mapping_value) + return self.parse_flow_node() + else: + self.state = self.parse_flow_mapping_value + return self.process_empty_scalar(token.end_mark) + elif self.resolver.processing_version > (1, 1) and self.scanner.check_token( + ValueToken + ): + self.state = self.parse_flow_mapping_value + return self.process_empty_scalar(self.scanner.peek_token().end_mark) + elif not self.scanner.check_token(FlowMappingEndToken): + self.states.append(self.parse_flow_mapping_empty_value) + return self.parse_flow_node() + token = self.scanner.get_token() + event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) + self.state = self.states.pop() + self.marks.pop() + return event + + def parse_flow_mapping_value(self): + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() + if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken): + self.states.append(self.parse_flow_mapping_key) + return self.parse_flow_node() + else: + self.state = self.parse_flow_mapping_key + return self.process_empty_scalar(token.end_mark) + else: + self.state = self.parse_flow_mapping_key + token = self.scanner.peek_token() + return self.process_empty_scalar(token.start_mark) + + def parse_flow_mapping_empty_value(self): + # type: () -> Any + self.state = self.parse_flow_mapping_key + return self.process_empty_scalar(self.scanner.peek_token().start_mark) + + def process_empty_scalar(self, mark, comment=None): + # type: (Any, Any) -> Any + return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment) + + def move_token_comment(self, token, nt=None, empty=False): + # type: (Any, Optional[Any], Optional[bool]) -> Any + pass + + +class RoundTripParser(Parser): + """roundtrip is a safe loader, that wants to see the unmangled tag""" + + def transform_tag(self, handle, suffix): + # type: (Any, Any) -> Any + # return self.tag_handles[handle]+suffix + if handle == '!!' and suffix in ( + 'null', + 'bool', + 'int', + 'float', + 'binary', + 'timestamp', + 'omap', + 'pairs', + 'set', + 'str', + 'seq', + 'map', + ): + return Parser.transform_tag(self, handle, suffix) + return handle + suffix + + def move_token_comment(self, token, nt=None, empty=False): + # type: (Any, Optional[Any], Optional[bool]) -> Any + token.move_old_comment( + self.scanner.peek_token() if nt is None else nt, empty=empty + ) + + +class RoundTripParserSC(RoundTripParser): + """roundtrip is a safe loader, that wants to see the unmangled tag""" + + # some of the differences are based on the superclass testing + # if self.loader.comment_handling is not None + + def move_token_comment(self, token, nt=None, empty=False): + # type: (Any, Any, Any, Optional[bool]) -> None + token.move_new_comment( + self.scanner.peek_token() if nt is None else nt, empty=empty + ) + + def distribute_comment(self, comment, line): + # type: (Any, Any) -> Any + # ToDo, look at indentation of the comment to determine attachment + if comment is None: + return None + if not comment[0]: + return None + if comment[0][0] != line + 1: + nprintf('>>>dcxxx', comment, line) + assert comment[0][0] == line + 1 + # if comment[0] - line > 1: + # return + typ = self.loader.comment_handling & 0b11 + # nprintf('>>>dca', comment, line, typ) + if typ == C_POST: + return None + if typ == C_PRE: + c = [None, None, comment[0]] + comment[0] = None + return c + # nprintf('>>>dcb', comment[0]) + for _idx, cmntidx in enumerate(comment[0]): + # nprintf('>>>dcb', cmntidx) + if isinstance(self.scanner.comments[cmntidx], BlankLineComment): + break + else: + return None # no space found + if _idx == 0: + return None # first line was blank + # nprintf('>>>dcc', idx) + if typ == C_SPLIT_ON_FIRST_BLANK: + c = [None, None, comment[0][:_idx]] + comment[0] = comment[0][_idx:] + return c + raise NotImplementedError # reserved |