summaryrefslogtreecommitdiffstats
path: root/_test/lib/canonical.py
diff options
context:
space:
mode:
Diffstat (limited to '_test/lib/canonical.py')
-rw-r--r--_test/lib/canonical.py387
1 files changed, 387 insertions, 0 deletions
diff --git a/_test/lib/canonical.py b/_test/lib/canonical.py
new file mode 100644
index 0000000..56fa297
--- /dev/null
+++ b/_test/lib/canonical.py
@@ -0,0 +1,387 @@
+import ruyaml
+from ruyaml.composer import Composer
+from ruyaml.constructor import Constructor
+from ruyaml.resolver import Resolver
+
+
+class CanonicalError(ruyaml.YAMLError):
+ pass
+
+
+class CanonicalScanner:
+ def __init__(self, data):
+ try:
+ if isinstance(data, bytes):
+ data = data.decode('utf-8')
+ except UnicodeDecodeError:
+ raise CanonicalError('utf-8 stream is expected')
+ self.data = data + '\0'
+ self.index = 0
+ self.tokens = []
+ self.scanned = False
+
+ def check_token(self, *choices):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.tokens[0], choice):
+ return True
+ return False
+
+ def peek_token(self):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ return self.tokens[0]
+
+ def get_token(self, choice=None):
+ if not self.scanned:
+ self.scan()
+ token = self.tokens.pop(0)
+ if choice and not isinstance(token, choice):
+ raise CanonicalError('unexpected token ' + repr(token))
+ return token
+
+ def get_token_value(self):
+ token = self.get_token()
+ return token.value
+
+ def scan(self):
+ self.tokens.append(ruyaml.StreamStartToken(None, None))
+ while True:
+ self.find_token()
+ ch = self.data[self.index]
+ if ch == '\0':
+ self.tokens.append(ruyaml.StreamEndToken(None, None))
+ break
+ elif ch == '%':
+ self.tokens.append(self.scan_directive())
+ elif ch == '-' and self.data[self.index : self.index + 3] == '---':
+ self.index += 3
+ self.tokens.append(ruyaml.DocumentStartToken(None, None))
+ elif ch == '[':
+ self.index += 1
+ self.tokens.append(ruyaml.FlowSequenceStartToken(None, None))
+ elif ch == '{':
+ self.index += 1
+ self.tokens.append(ruyaml.FlowMappingStartToken(None, None))
+ elif ch == ']':
+ self.index += 1
+ self.tokens.append(ruyaml.FlowSequenceEndToken(None, None))
+ elif ch == '}':
+ self.index += 1
+ self.tokens.append(ruyaml.FlowMappingEndToken(None, None))
+ elif ch == '?':
+ self.index += 1
+ self.tokens.append(ruyaml.KeyToken(None, None))
+ elif ch == ':':
+ self.index += 1
+ self.tokens.append(ruyaml.ValueToken(None, None))
+ elif ch == ',':
+ self.index += 1
+ self.tokens.append(ruyaml.FlowEntryToken(None, None))
+ elif ch == '*' or ch == '&':
+ self.tokens.append(self.scan_alias())
+ elif ch == '!':
+ self.tokens.append(self.scan_tag())
+ elif ch == '"':
+ self.tokens.append(self.scan_scalar())
+ else:
+ raise CanonicalError('invalid token')
+ self.scanned = True
+
+ DIRECTIVE = '%YAML 1.1'
+
+ def scan_directive(self):
+ if (
+ self.data[self.index : self.index + len(self.DIRECTIVE)] == self.DIRECTIVE
+ and self.data[self.index + len(self.DIRECTIVE)] in ' \n\0'
+ ):
+ self.index += len(self.DIRECTIVE)
+ return ruyaml.DirectiveToken('YAML', (1, 1), None, None)
+ else:
+ raise CanonicalError('invalid directive')
+
+ def scan_alias(self):
+ if self.data[self.index] == '*':
+ TokenClass = ruyaml.AliasToken
+ else:
+ TokenClass = ruyaml.AnchorToken
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in ', \n\0':
+ self.index += 1
+ value = self.data[start : self.index]
+ return TokenClass(value, None, None)
+
+ def scan_tag(self):
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in ' \n\0':
+ self.index += 1
+ value = self.data[start : self.index]
+ if not value:
+ value = '!'
+ elif value[0] == '!':
+ value = 'tag:yaml.org,2002:' + value[1:]
+ elif value[0] == '<' and value[-1] == '>':
+ value = value[1:-1]
+ else:
+ value = '!' + value
+ return ruyaml.TagToken(value, None, None)
+
+ QUOTE_CODES = {'x': 2, 'u': 4, 'U': 8}
+
+ QUOTE_REPLACES = {
+ '\\': '\\',
+ '"': '"',
+ ' ': ' ',
+ 'a': '\x07',
+ 'b': '\x08',
+ 'e': '\x1B',
+ 'f': '\x0C',
+ 'n': '\x0A',
+ 'r': '\x0D',
+ 't': '\x09',
+ 'v': '\x0B',
+ 'N': '\u0085',
+ 'L': '\u2028',
+ 'P': '\u2029',
+ '_': '_',
+ '0': '\x00',
+ }
+
+ def scan_scalar(self):
+ self.index += 1
+ chunks = []
+ start = self.index
+ ignore_spaces = False
+ while self.data[self.index] != '"':
+ if self.data[self.index] == '\\':
+ ignore_spaces = False
+ chunks.append(self.data[start : self.index])
+ self.index += 1
+ ch = self.data[self.index]
+ self.index += 1
+ if ch == '\n':
+ ignore_spaces = True
+ elif ch in self.QUOTE_CODES:
+ length = self.QUOTE_CODES[ch]
+ code = int(self.data[self.index : self.index + length], 16)
+ chunks.append(chr(code))
+ self.index += length
+ else:
+ if ch not in self.QUOTE_REPLACES:
+ raise CanonicalError('invalid escape code')
+ chunks.append(self.QUOTE_REPLACES[ch])
+ start = self.index
+ elif self.data[self.index] == '\n':
+ chunks.append(self.data[start : self.index])
+ chunks.append(' ')
+ self.index += 1
+ start = self.index
+ ignore_spaces = True
+ elif ignore_spaces and self.data[self.index] == ' ':
+ self.index += 1
+ start = self.index
+ else:
+ ignore_spaces = False
+ self.index += 1
+ chunks.append(self.data[start : self.index])
+ self.index += 1
+ return ruyaml.ScalarToken("".join(chunks), False, None, None)
+
+ def find_token(self):
+ found = False
+ while not found:
+ while self.data[self.index] in ' \t':
+ self.index += 1
+ if self.data[self.index] == '#':
+ while self.data[self.index] != '\n':
+ self.index += 1
+ if self.data[self.index] == '\n':
+ self.index += 1
+ else:
+ found = True
+
+
+class CanonicalParser:
+ def __init__(self):
+ self.events = []
+ self.parsed = False
+
+ def dispose(self):
+ pass
+
+ # stream: STREAM-START document* STREAM-END
+ def parse_stream(self):
+ self.get_token(ruyaml.StreamStartToken)
+ self.events.append(ruyaml.StreamStartEvent(None, None))
+ while not self.check_token(ruyaml.StreamEndToken):
+ if self.check_token(ruyaml.DirectiveToken, ruyaml.DocumentStartToken):
+ self.parse_document()
+ else:
+ raise CanonicalError(
+ 'document is expected, got ' + repr(self.tokens[0])
+ )
+ self.get_token(ruyaml.StreamEndToken)
+ self.events.append(ruyaml.StreamEndEvent(None, None))
+
+ # document: DIRECTIVE? DOCUMENT-START node
+ def parse_document(self):
+ # node = None
+ if self.check_token(ruyaml.DirectiveToken):
+ self.get_token(ruyaml.DirectiveToken)
+ self.get_token(ruyaml.DocumentStartToken)
+ self.events.append(ruyaml.DocumentStartEvent(None, None))
+ self.parse_node()
+ self.events.append(ruyaml.DocumentEndEvent(None, None))
+
+ # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
+ def parse_node(self):
+ if self.check_token(ruyaml.AliasToken):
+ self.events.append(ruyaml.AliasEvent(self.get_token_value(), None, None))
+ else:
+ anchor = None
+ if self.check_token(ruyaml.AnchorToken):
+ anchor = self.get_token_value()
+ tag = None
+ if self.check_token(ruyaml.TagToken):
+ tag = self.get_token_value()
+ if self.check_token(ruyaml.ScalarToken):
+ self.events.append(
+ ruyaml.ScalarEvent(
+ anchor, tag, (False, False), self.get_token_value(), None, None
+ )
+ )
+ elif self.check_token(ruyaml.FlowSequenceStartToken):
+ self.events.append(ruyaml.SequenceStartEvent(anchor, tag, None, None))
+ self.parse_sequence()
+ elif self.check_token(ruyaml.FlowMappingStartToken):
+ self.events.append(ruyaml.MappingStartEvent(anchor, tag, None, None))
+ self.parse_mapping()
+ else:
+ raise CanonicalError(
+ "SCALAR, '[', or '{' is expected, got " + repr(self.tokens[0])
+ )
+
+ # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
+ def parse_sequence(self):
+ self.get_token(ruyaml.FlowSequenceStartToken)
+ if not self.check_token(ruyaml.FlowSequenceEndToken):
+ self.parse_node()
+ while not self.check_token(ruyaml.FlowSequenceEndToken):
+ self.get_token(ruyaml.FlowEntryToken)
+ if not self.check_token(ruyaml.FlowSequenceEndToken):
+ self.parse_node()
+ self.get_token(ruyaml.FlowSequenceEndToken)
+ self.events.append(ruyaml.SequenceEndEvent(None, None))
+
+ # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
+ def parse_mapping(self):
+ self.get_token(ruyaml.FlowMappingStartToken)
+ if not self.check_token(ruyaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ while not self.check_token(ruyaml.FlowMappingEndToken):
+ self.get_token(ruyaml.FlowEntryToken)
+ if not self.check_token(ruyaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ self.get_token(ruyaml.FlowMappingEndToken)
+ self.events.append(ruyaml.MappingEndEvent(None, None))
+
+ # map_entry: KEY node VALUE node
+ def parse_map_entry(self):
+ self.get_token(ruyaml.KeyToken)
+ self.parse_node()
+ self.get_token(ruyaml.ValueToken)
+ self.parse_node()
+
+ def parse(self):
+ self.parse_stream()
+ self.parsed = True
+
+ def get_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events.pop(0)
+
+ def check_event(self, *choices):
+ if not self.parsed:
+ self.parse()
+ if self.events:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.events[0], choice):
+ return True
+ return False
+
+ def peek_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events[0]
+
+
+class CanonicalLoader(
+ CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver
+):
+ def __init__(self, stream):
+ if hasattr(stream, 'read'):
+ stream = stream.read()
+ CanonicalScanner.__init__(self, stream)
+ CanonicalParser.__init__(self)
+ Composer.__init__(self)
+ Constructor.__init__(self)
+ Resolver.__init__(self)
+
+
+ruyaml.CanonicalLoader = CanonicalLoader
+
+
+def canonical_scan(stream):
+ yaml = ruyaml.YAML()
+ yaml.scanner = CanonicalScanner
+ return yaml.scan(stream)
+
+
+ruyaml.canonical_scan = canonical_scan
+
+
+def canonical_parse(stream):
+ return ruyaml.parse(stream, Loader=CanonicalLoader)
+
+
+ruyaml.canonical_parse = canonical_parse
+
+
+def canonical_compose(stream):
+ return ruyaml.compose(stream, Loader=CanonicalLoader)
+
+
+ruyaml.canonical_compose = canonical_compose
+
+
+def canonical_compose_all(stream):
+ return ruyaml.compose_all(stream, Loader=CanonicalLoader)
+
+
+ruyaml.canonical_compose_all = canonical_compose_all
+
+
+def canonical_load(stream):
+ return ruyaml.load(stream, Loader=CanonicalLoader)
+
+
+ruyaml.canonical_load = canonical_load
+
+
+def canonical_load_all(stream):
+ yaml = ruyaml.YAML(typ='safe', pure=True)
+ yaml.Loader = CanonicalLoader
+ return yaml.load_all(stream)
+
+
+ruyaml.canonical_load_all = canonical_load_all