summaryrefslogtreecommitdiffstats
path: root/_test/lib
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:33:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:33:55 +0000
commitcbbc936ed9811bdb5dd480bc2c5e10c3062532be (patch)
treeec1783c0aaa2ee6eaa6d6362f2bed4392943de8e /_test/lib
parentReleasing progress-linux version 0.18.5-1~exp1~progress7.99u1. (diff)
downloadruamel.yaml-cbbc936ed9811bdb5dd480bc2c5e10c3062532be.tar.xz
ruamel.yaml-cbbc936ed9811bdb5dd480bc2c5e10c3062532be.zip
Merging upstream version 0.18.6.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '_test/lib')
-rw-r--r--_test/lib/canonical.py388
-rw-r--r--_test/lib/test_all.py20
-rw-r--r--_test/lib/test_appliance.py209
-rw-r--r--_test/lib/test_build.py16
-rw-r--r--_test/lib/test_build_ext.py17
-rw-r--r--_test/lib/test_canonical.py51
-rw-r--r--_test/lib/test_constructor.py372
-rw-r--r--_test/lib/test_emitter.py132
-rw-r--r--_test/lib/test_errors.py92
-rw-r--r--_test/lib/test_input_output.py180
-rw-r--r--_test/lib/test_mark.py35
-rw-r--r--_test/lib/test_reader.py44
-rw-r--r--_test/lib/test_recursive.py58
-rw-r--r--_test/lib/test_representer.py51
-rw-r--r--_test/lib/test_resolver.py111
-rw-r--r--_test/lib/test_structure.py225
-rw-r--r--_test/lib/test_tokens.py86
-rw-r--r--_test/lib/test_yaml.py20
-rw-r--r--_test/lib/test_yaml_ext.py403
19 files changed, 2510 insertions, 0 deletions
diff --git a/_test/lib/canonical.py b/_test/lib/canonical.py
new file mode 100644
index 0000000..31c9728
--- /dev/null
+++ b/_test/lib/canonical.py
@@ -0,0 +1,388 @@
+
+import ruamel.yaml
+from ruamel.yaml.composer import Composer
+from ruamel.yaml.constructor import Constructor
+from ruamel.yaml.resolver import Resolver
+
+
+class CanonicalError(ruamel.yaml.YAMLError):
+ pass
+
+
+class CanonicalScanner:
+ def __init__(self, data):
+ try:
+ if isinstance(data, bytes):
+ data = data.decode('utf-8')
+ except UnicodeDecodeError:
+ raise CanonicalError('utf-8 stream is expected')
+ self.data = data + '\0'
+ self.index = 0
+ self.tokens = []
+ self.scanned = False
+
+ def check_token(self, *choices):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.tokens[0], choice):
+ return True
+ return False
+
+ def peek_token(self):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ return self.tokens[0]
+
+ def get_token(self, choice=None):
+ if not self.scanned:
+ self.scan()
+ token = self.tokens.pop(0)
+ if choice and not isinstance(token, choice):
+ raise CanonicalError('unexpected token ' + repr(token))
+ return token
+
+ def get_token_value(self):
+ token = self.get_token()
+ return token.value
+
+ def scan(self):
+ self.tokens.append(ruamel.yaml.StreamStartToken(None, None))
+ while True:
+ self.find_token()
+ ch = self.data[self.index]
+ if ch == '\0':
+ self.tokens.append(ruamel.yaml.StreamEndToken(None, None))
+ break
+ elif ch == '%':
+ self.tokens.append(self.scan_directive())
+ elif ch == '-' and self.data[self.index : self.index + 3] == '---':
+ self.index += 3
+ self.tokens.append(ruamel.yaml.DocumentStartToken(None, None))
+ elif ch == '[':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.FlowSequenceStartToken(None, None))
+ elif ch == '{':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.FlowMappingStartToken(None, None))
+ elif ch == ']':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.FlowSequenceEndToken(None, None))
+ elif ch == '}':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.FlowMappingEndToken(None, None))
+ elif ch == '?':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.KeyToken(None, None))
+ elif ch == ':':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.ValueToken(None, None))
+ elif ch == ',':
+ self.index += 1
+ self.tokens.append(ruamel.yaml.FlowEntryToken(None, None))
+ elif ch == '*' or ch == '&':
+ self.tokens.append(self.scan_alias())
+ elif ch == '!':
+ self.tokens.append(self.scan_tag())
+ elif ch == '"':
+ self.tokens.append(self.scan_scalar())
+ else:
+ raise CanonicalError('invalid token')
+ self.scanned = True
+
+ DIRECTIVE = '%YAML 1.1'
+
+ def scan_directive(self):
+ if (
+ self.data[self.index : self.index + len(self.DIRECTIVE)] == self.DIRECTIVE
+ and self.data[self.index + len(self.DIRECTIVE)] in ' \n\0'
+ ):
+ self.index += len(self.DIRECTIVE)
+ return ruamel.yaml.DirectiveToken('YAML', (1, 1), None, None)
+ else:
+ raise CanonicalError('invalid directive')
+
+ def scan_alias(self):
+ if self.data[self.index] == '*':
+ TokenClass = ruamel.yaml.AliasToken
+ else:
+ TokenClass = ruamel.yaml.AnchorToken
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in ', \n\0':
+ self.index += 1
+ value = self.data[start : self.index]
+ return TokenClass(value, None, None)
+
+ def scan_tag(self):
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in ' \n\0':
+ self.index += 1
+ value = self.data[start : self.index]
+ if not value:
+ value = '!'
+ elif value[0] == '!':
+ value = 'tag:yaml.org,2002:' + value[1:]
+ elif value[0] == '<' and value[-1] == '>':
+ value = value[1:-1]
+ else:
+ value = '!' + value
+ return ruamel.yaml.TagToken(value, None, None)
+
+ QUOTE_CODES = {'x': 2, 'u': 4, 'U': 8}
+
+ QUOTE_REPLACES = {
+ '\\': '\\',
+ '"': '"',
+ ' ': ' ',
+ 'a': '\x07',
+ 'b': '\x08',
+ 'e': '\x1B',
+ 'f': '\x0C',
+ 'n': '\x0A',
+ 'r': '\x0D',
+ 't': '\x09',
+ 'v': '\x0B',
+ 'N': '\u0085',
+ 'L': '\u2028',
+ 'P': '\u2029',
+ '_': '_',
+ '0': '\x00',
+ }
+
+ def scan_scalar(self):
+ self.index += 1
+ chunks = []
+ start = self.index
+ ignore_spaces = False
+ while self.data[self.index] != '"':
+ if self.data[self.index] == '\\':
+ ignore_spaces = False
+ chunks.append(self.data[start : self.index])
+ self.index += 1
+ ch = self.data[self.index]
+ self.index += 1
+ if ch == '\n':
+ ignore_spaces = True
+ elif ch in self.QUOTE_CODES:
+ length = self.QUOTE_CODES[ch]
+ code = int(self.data[self.index : self.index + length], 16)
+ chunks.append(chr(code))
+ self.index += length
+ else:
+ if ch not in self.QUOTE_REPLACES:
+ raise CanonicalError('invalid escape code')
+ chunks.append(self.QUOTE_REPLACES[ch])
+ start = self.index
+ elif self.data[self.index] == '\n':
+ chunks.append(self.data[start : self.index])
+ chunks.append(' ')
+ self.index += 1
+ start = self.index
+ ignore_spaces = True
+ elif ignore_spaces and self.data[self.index] == ' ':
+ self.index += 1
+ start = self.index
+ else:
+ ignore_spaces = False
+ self.index += 1
+ chunks.append(self.data[start : self.index])
+ self.index += 1
+ return ruamel.yaml.ScalarToken("".join(chunks), False, None, None)
+
+ def find_token(self):
+ found = False
+ while not found:
+ while self.data[self.index] in ' \t':
+ self.index += 1
+ if self.data[self.index] == '#':
+ while self.data[self.index] != '\n':
+ self.index += 1
+ if self.data[self.index] == '\n':
+ self.index += 1
+ else:
+ found = True
+
+
+class CanonicalParser:
+ def __init__(self):
+ self.events = []
+ self.parsed = False
+
+ def dispose(self):
+ pass
+
+ # stream: STREAM-START document* STREAM-END
+ def parse_stream(self):
+ self.get_token(ruamel.yaml.StreamStartToken)
+ self.events.append(ruamel.yaml.StreamStartEvent(None, None))
+ while not self.check_token(ruamel.yaml.StreamEndToken):
+ if self.check_token(ruamel.yaml.DirectiveToken, ruamel.yaml.DocumentStartToken):
+ self.parse_document()
+ else:
+ raise CanonicalError('document is expected, got ' + repr(self.tokens[0]))
+ self.get_token(ruamel.yaml.StreamEndToken)
+ self.events.append(ruamel.yaml.StreamEndEvent(None, None))
+
+ # document: DIRECTIVE? DOCUMENT-START node
+ def parse_document(self):
+ # node = None
+ if self.check_token(ruamel.yaml.DirectiveToken):
+ self.get_token(ruamel.yaml.DirectiveToken)
+ self.get_token(ruamel.yaml.DocumentStartToken)
+ self.events.append(ruamel.yaml.DocumentStartEvent(None, None))
+ self.parse_node()
+ self.events.append(ruamel.yaml.DocumentEndEvent(None, None))
+
+ # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
+ def parse_node(self):
+ if self.check_token(ruamel.yaml.AliasToken):
+ self.events.append(ruamel.yaml.AliasEvent(self.get_token_value(), None, None))
+ else:
+ anchor = None
+ if self.check_token(ruamel.yaml.AnchorToken):
+ anchor = self.get_token_value()
+ tag = None
+ if self.check_token(ruamel.yaml.TagToken):
+ tag = self.get_token_value()
+ if self.check_token(ruamel.yaml.ScalarToken):
+ self.events.append(
+ ruamel.yaml.ScalarEvent(
+ anchor, tag, (False, False), self.get_token_value(), None, None
+ )
+ )
+ elif self.check_token(ruamel.yaml.FlowSequenceStartToken):
+ self.events.append(ruamel.yaml.SequenceStartEvent(anchor, tag, None, None))
+ self.parse_sequence()
+ elif self.check_token(ruamel.yaml.FlowMappingStartToken):
+ self.events.append(ruamel.yaml.MappingStartEvent(anchor, tag, None, None))
+ self.parse_mapping()
+ else:
+ raise CanonicalError(
+ "SCALAR, '[', or '{' is expected, got " + repr(self.tokens[0])
+ )
+
+ # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
+ def parse_sequence(self):
+ self.get_token(ruamel.yaml.FlowSequenceStartToken)
+ if not self.check_token(ruamel.yaml.FlowSequenceEndToken):
+ self.parse_node()
+ while not self.check_token(ruamel.yaml.FlowSequenceEndToken):
+ self.get_token(ruamel.yaml.FlowEntryToken)
+ if not self.check_token(ruamel.yaml.FlowSequenceEndToken):
+ self.parse_node()
+ self.get_token(ruamel.yaml.FlowSequenceEndToken)
+ self.events.append(ruamel.yaml.SequenceEndEvent(None, None))
+
+ # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
+ def parse_mapping(self):
+ self.get_token(ruamel.yaml.FlowMappingStartToken)
+ if not self.check_token(ruamel.yaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ while not self.check_token(ruamel.yaml.FlowMappingEndToken):
+ self.get_token(ruamel.yaml.FlowEntryToken)
+ if not self.check_token(ruamel.yaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ self.get_token(ruamel.yaml.FlowMappingEndToken)
+ self.events.append(ruamel.yaml.MappingEndEvent(None, None))
+
+ # map_entry: KEY node VALUE node
+ def parse_map_entry(self):
+ self.get_token(ruamel.yaml.KeyToken)
+ self.parse_node()
+ self.get_token(ruamel.yaml.ValueToken)
+ self.parse_node()
+
+ def parse(self):
+ self.parse_stream()
+ self.parsed = True
+
+ def get_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events.pop(0)
+
+ def check_event(self, *choices):
+ if not self.parsed:
+ self.parse()
+ if self.events:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.events[0], choice):
+ return True
+ return False
+
+ def peek_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events[0]
+
+
+class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
+ def __init__(self, stream):
+ if hasattr(stream, 'read'):
+ stream = stream.read()
+ CanonicalScanner.__init__(self, stream)
+ CanonicalParser.__init__(self)
+ Composer.__init__(self)
+ Constructor.__init__(self)
+ Resolver.__init__(self)
+
+
+ruamel.yaml.CanonicalLoader = CanonicalLoader
+
+
+def canonical_scan(stream):
+ yaml = ruamel.yaml.YAML()
+ yaml.scanner = CanonicalScanner
+ return yaml.scan(stream)
+
+
+ruamel.yaml.canonical_scan = canonical_scan
+
+
+def canonical_parse(stream):
+ yaml = ruamel.yaml.YAML()
+ return yaml.parse(stream, Loader=CanonicalLoader)
+
+
+ruamel.yaml.canonical_parse = canonical_parse
+
+
+def canonical_compose(stream):
+ yaml = ruamel.yaml.YAML()
+ return yaml.compose(stream, Loader=CanonicalLoader)
+
+
+ruamel.yaml.canonical_compose = canonical_compose
+
+
+def canonical_compose_all(stream):
+ yaml = ruamel.yaml.YAML()
+ return yaml.compose_all(stream, Loader=CanonicalLoader)
+
+
+ruamel.yaml.canonical_compose_all = canonical_compose_all
+
+
+def canonical_load(stream):
+ yaml = ruamel.yaml.YAML()
+ return yaml.load(stream, Loader=CanonicalLoader)
+
+
+ruamel.yaml.canonical_load = canonical_load
+
+
+def canonical_load_all(stream):
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ yaml.Loader = CanonicalLoader
+ return yaml.load_all(stream)
+
+
+ruamel.yaml.canonical_load_all = canonical_load_all
diff --git a/_test/lib/test_all.py b/_test/lib/test_all.py
new file mode 100644
index 0000000..8099ec8
--- /dev/null
+++ b/_test/lib/test_all.py
@@ -0,0 +1,20 @@
+
+import sys # NOQA
+import ruamel.yaml
+import test_appliance
+
+
+def main(args=None):
+ collections = []
+ import test_yaml
+
+ collections.append(test_yaml)
+ if ruamel.yaml.__with_libyaml__:
+ import test_yaml_ext
+
+ collections.append(test_yaml_ext)
+ test_appliance.run(collections, args)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/_test/lib/test_appliance.py b/_test/lib/test_appliance.py
new file mode 100644
index 0000000..d624ebe
--- /dev/null
+++ b/_test/lib/test_appliance.py
@@ -0,0 +1,209 @@
+
+import sys
+import os
+import types
+import traceback
+import pprint
+import argparse
+
+# DATA = 'tests/data'
+# determine the position of data dynamically relative to program
+# this allows running test while the current path is not the top of the
+# repository, e.g. from the tests/data directory: python ../test_yaml.py
+DATA = __file__.rsplit(os.sep, 2)[0] + '/data'
+
+
+def find_test_functions(collections):
+ if not isinstance(collections, list):
+ collections = [collections]
+ functions = []
+ for collection in collections:
+ if not isinstance(collection, dict):
+ collection = vars(collection)
+ for key in sorted(collection):
+ value = collection[key]
+ if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+ functions.append(value)
+ return functions
+
+
+def find_test_filenames(directory):
+ filenames = {}
+ for filename in os.listdir(directory):
+ if os.path.isfile(os.path.join(directory, filename)):
+ base, ext = os.path.splitext(filename)
+ # ToDo: remove
+ if base.endswith('-py2'):
+ continue
+ filenames.setdefault(base, []).append(ext)
+ filenames = sorted(filenames.items())
+ return filenames
+
+
+def parse_arguments(args):
+ """"""
+ parser = argparse.ArgumentParser(
+ usage=""" run the yaml tests. By default
+ all functions on all appropriate test_files are run. Functions have
+ unittest attributes that determine the required extensions to filenames
+ that need to be available in order to run that test. E.g.\n\n
+ python test_yaml.py test_constructor_types\n
+ python test_yaml.py --verbose test_tokens spec-02-05\n\n
+ The presence of an extension in the .skip attribute of a function
+ disables the test for that function."""
+ )
+ # ToDo: make into int and test > 0 in functions
+ parser.add_argument(
+ '--verbose',
+ '-v',
+ action='store_true',
+ default='YAML_TEST_VERBOSE' in os.environ,
+ help='set verbosity output',
+ )
+ parser.add_argument(
+ '--list-functions',
+ action='store_true',
+ help="""list all functions with required file extensions for test files
+ """,
+ )
+ parser.add_argument('function', nargs='?', help="""restrict function to run""")
+ parser.add_argument(
+ 'filenames',
+ nargs='*',
+ help="""basename of filename set, extensions (.code, .data) have to
+ be a superset of those in the unittest attribute of the selected
+ function""",
+ )
+ args = parser.parse_args(args)
+ # print('args', args)
+ verbose = args.verbose
+ include_functions = [args.function] if args.function else []
+ include_filenames = args.filenames
+ # if args is None:
+ # args = sys.argv[1:]
+ # verbose = False
+ # if '-v' in args:
+ # verbose = True
+ # args.remove('-v')
+ # if '--verbose' in args:
+ # verbose = True
+ # args.remove('--verbose') # never worked without this
+ # if 'YAML_TEST_VERBOSE' in os.environ:
+ # verbose = True
+ # include_functions = []
+ # if args:
+ # include_functions.append(args.pop(0))
+ if 'YAML_TEST_FUNCTIONS' in os.environ:
+ include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
+ # include_filenames = []
+ # include_filenames.extend(args)
+ if 'YAML_TEST_FILENAMES' in os.environ:
+ include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
+ return include_functions, include_filenames, verbose, args
+
+
+def execute(function, filenames, verbose):
+ name = function.__name__
+ if verbose:
+ sys.stdout.write('=' * 75 + '\n')
+ sys.stdout.write('%s(%s)...\n' % (name, ', '.join(filenames)))
+ try:
+ function(verbose=verbose, *filenames)
+ except Exception as exc:
+ info = sys.exc_info()
+ if isinstance(exc, AssertionError):
+ kind = 'FAILURE'
+ else:
+ kind = 'ERROR'
+ if verbose:
+ traceback.print_exc(limit=1, file=sys.stdout)
+ else:
+ sys.stdout.write(kind[0])
+ sys.stdout.flush()
+ else:
+ kind = 'SUCCESS'
+ info = None
+ if not verbose:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ return (name, filenames, kind, info)
+
+
+def display(results, verbose):
+ if results and not verbose:
+ sys.stdout.write('\n')
+ total = len(results)
+ failures = 0
+ errors = 0
+ for name, filenames, kind, info in results:
+ if kind == 'SUCCESS':
+ continue
+ if kind == 'FAILURE':
+ failures += 1
+ if kind == 'ERROR':
+ errors += 1
+ sys.stdout.write('=' * 75 + '\n')
+ sys.stdout.write('%s(%s): %s\n' % (name, ', '.join(filenames), kind))
+ if kind == 'ERROR':
+ traceback.print_exception(file=sys.stdout, *info)
+ else:
+ sys.stdout.write('Traceback (most recent call last):\n')
+ traceback.print_tb(info[2], file=sys.stdout)
+ sys.stdout.write('%s: see below\n' % info[0].__name__)
+ sys.stdout.write('~' * 75 + '\n')
+ for arg in info[1].args:
+ pprint.pprint(arg, stream=sys.stdout)
+ for filename in filenames:
+ sys.stdout.write('-' * 75 + '\n')
+ sys.stdout.write('%s:\n' % filename)
+ with open(filename, 'r', errors='replace') as fp:
+ data = fp.read()
+ sys.stdout.write(data)
+ if data and data[-1] != '\n':
+ sys.stdout.write('\n')
+ sys.stdout.write('=' * 75 + '\n')
+ sys.stdout.write('TESTS: %s\n' % total)
+ ret_val = 0
+ if failures:
+ sys.stdout.write('FAILURES: %s\n' % failures)
+ ret_val = 1
+ if errors:
+ sys.stdout.write('ERRORS: %s\n' % errors)
+ ret_val = 2
+ return ret_val
+
+
+def run(collections, args=None):
+ test_functions = find_test_functions(collections)
+ test_filenames = find_test_filenames(DATA)
+ include_functions, include_filenames, verbose, a = parse_arguments(args)
+ if a.list_functions:
+ print('test functions:')
+ for f in test_functions:
+ print(' {:30s} {}'.format(f.__name__, f.unittest))
+ return
+ results = []
+ for function in test_functions:
+ if include_functions and function.__name__ not in include_functions:
+ continue
+ if function.unittest:
+ for base, exts in test_filenames:
+ if include_filenames and base not in include_filenames:
+ continue
+ filenames = []
+ for ext in function.unittest:
+ if ext not in exts:
+ break
+ filenames.append(os.path.join(DATA, base + ext))
+ else:
+ skip_exts = getattr(function, 'skip', [])
+ for skip_ext in skip_exts:
+ if skip_ext in exts:
+ break
+ else:
+ result = execute(function, filenames, verbose)
+ results.append(result)
+ else:
+ result = execute(function, [], verbose)
+ results.append(result)
+ return display(results, verbose=verbose)
diff --git a/_test/lib/test_build.py b/_test/lib/test_build.py
new file mode 100644
index 0000000..f7837eb
--- /dev/null
+++ b/_test/lib/test_build.py
@@ -0,0 +1,16 @@
+
+if __name__ == '__main__':
+ import sys
+ import os
+ import distutils.util
+
+ build_lib = 'build/lib'
+ build_lib_ext = os.path.join(
+ 'build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3])
+ )
+ sys.path.insert(0, build_lib)
+ sys.path.insert(0, build_lib_ext)
+ import test_yaml
+ import test_appliance
+
+ test_appliance.run(test_yaml)
diff --git a/_test/lib/test_build_ext.py b/_test/lib/test_build_ext.py
new file mode 100644
index 0000000..1a58fd2
--- /dev/null
+++ b/_test/lib/test_build_ext.py
@@ -0,0 +1,17 @@
+
+
+if __name__ == '__main__':
+ import sys
+ import os
+ import distutils.util
+
+ build_lib = 'build/lib'
+ build_lib_ext = os.path.join(
+ 'build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3])
+ )
+ sys.path.insert(0, build_lib)
+ sys.path.insert(0, build_lib_ext)
+ import test_yaml_ext
+ import test_appliance
+
+ test_appliance.run(test_yaml_ext)
diff --git a/_test/lib/test_canonical.py b/_test/lib/test_canonical.py
new file mode 100644
index 0000000..b5cd14d
--- /dev/null
+++ b/_test/lib/test_canonical.py
@@ -0,0 +1,51 @@
+
+
+import ruamel.yaml
+import canonical # NOQA
+
+
+def test_canonical_scanner(canonical_filename, verbose=False):
+ with open(canonical_filename, 'rb') as fp0:
+ data = fp0.read()
+ tokens = list(ruamel.yaml.canonical_scan(data))
+ assert tokens, tokens
+ if verbose:
+ for token in tokens:
+ print(token)
+
+
+test_canonical_scanner.unittest = ['.canonical']
+
+
+def test_canonical_parser(canonical_filename, verbose=False):
+ with open(canonical_filename, 'rb') as fp0:
+ data = fp0.read()
+ events = list(ruamel.yaml.canonical_parse(data))
+ assert events, events
+ if verbose:
+ for event in events:
+ print(event)
+
+
+test_canonical_parser.unittest = ['.canonical']
+
+
+def test_canonical_error(data_filename, canonical_filename, verbose=False):
+ with open(data_filename, 'rb') as fp0:
+ data = fp0.read()
+ try:
+ output = list(ruamel.yaml.canonical_load_all(data)) # NOQA
+ except ruamel.yaml.YAMLError as exc:
+ if verbose:
+ print(exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_canonical_error.unittest = ['.data', '.canonical']
+test_canonical_error.skip = ['.empty']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_constructor.py b/_test/lib/test_constructor.py
new file mode 100644
index 0000000..b38bf2f
--- /dev/null
+++ b/_test/lib/test_constructor.py
@@ -0,0 +1,372 @@
+
+import ruamel.yaml
+import pprint
+
+import datetime
+
+try:
+ set
+except NameError:
+ from sets import Set as set # NOQA
+import ruamel.yaml.tokens
+
+
+def execute(code):
+ global value
+ exec(code)
+ return value
+
+
+def _make_objects():
+ global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLobject1, YAMLobject2, AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, FixedOffset, today, execute
+
+ class MyLoader(ruamel.yaml.Loader):
+ pass
+
+ class MyDumper(ruamel.yaml.Dumper):
+ pass
+
+ class MyTestClass1:
+ def __init__(self, x, y=0, z=0):
+ self.x = x
+ self.y = y
+ self.z = z
+
+ def __eq__(self, other):
+ if isinstance(other, MyTestClass1):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ def construct1(constructor, node):
+ mapping = constructor.construct_mapping(node)
+ return MyTestClass1(**mapping)
+
+ def represent1(representer, native):
+ return representer.represent_mapping('!tag1', native.__dict__)
+
+ ruamel.yaml.add_constructor('!tag1', construct1, Loader=MyLoader)
+ ruamel.yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper)
+
+ class MyTestClass2(MyTestClass1, ruamel.yaml.YAMLObject):
+ ruamel.yaml.loader = MyLoader
+ ruamel.yaml.dumper = MyDumper
+ ruamel.yaml.tag = '!tag2'
+
+ def from_yaml(cls, constructor, node):
+ x = constructor.construct_yaml_int(node)
+ return cls(x=x)
+
+ from_yaml = classmethod(from_yaml)
+
+ def to_yaml(cls, representer, native):
+ return representer.represent_scalar(cls.yaml_tag, str(native.x))
+
+ to_yaml = classmethod(to_yaml)
+
+ class MyTestClass3(MyTestClass2):
+ ruamel.yaml.tag = '!tag3'
+
+ def from_yaml(cls, constructor, node):
+ mapping = constructor.construct_mapping(node)
+ if '=' in mapping:
+ x = mapping['=']
+ del mapping['=']
+ mapping['x'] = x
+ return cls(**mapping)
+
+ from_yaml = classmethod(from_yaml)
+
+ def to_yaml(cls, representer, native):
+ return representer.represent_mapping(cls.yaml_tag, native.__dict__)
+
+ to_yaml = classmethod(to_yaml)
+
+ class YAMLobject1(ruamel.yaml.YAMLObject):
+ ruamel.yaml.loader = MyLoader
+ ruamel.yaml.dumper = MyDumper
+ ruamel.yaml.tag = '!foo'
+
+ def __init__(self, my_parameter=None, my_another_parameter=None):
+ self.my_parameter = my_parameter
+ self.my_another_parameter = my_another_parameter
+
+ def __eq__(self, other):
+ if isinstance(other, YAMLobject1):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ class YAMLobject2(ruamel.yaml.YAMLObject):
+ ruamel.yaml.loader = MyLoader
+ ruamel.yaml.dumper = MyDumper
+ ruamel.yaml.tag = '!bar'
+
+ def __init__(self, foo=1, bar=2, baz=3):
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+
+ def __getstate__(self):
+ return {1: self.foo, 2: self.bar, 3: self.baz}
+
+ def __setstate__(self, state):
+ self.foo = state[1]
+ self.bar = state[2]
+ self.baz = state[3]
+
+ def __eq__(self, other):
+ if isinstance(other, YAMLobject2):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ class AnObject:
+ def __new__(cls, foo=None, bar=None, baz=None):
+ self = object.__new__(cls)
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+ return self
+
+ def __cmp__(self, other):
+ return cmp(
+ (type(self), self.foo, self.bar, self.baz), # NOQA
+ (type(other), other.foo, other.bar, other.baz),
+ )
+
+ def __eq__(self, other):
+ return type(self) is type(other) and (self.foo, self.bar, self.baz) == (
+ other.foo,
+ other.bar,
+ other.baz,
+ )
+
+ class AnInstance:
+ def __init__(self, foo=None, bar=None, baz=None):
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+
+ def __cmp__(self, other):
+ return cmp(
+ (type(self), self.foo, self.bar, self.baz), # NOQA
+ (type(other), other.foo, other.bar, other.baz),
+ )
+
+ def __eq__(self, other):
+ return type(self) is type(other) and (self.foo, self.bar, self.baz) == (
+ other.foo,
+ other.bar,
+ other.baz,
+ )
+
+ class AState(AnInstance):
+ def __getstate__(self):
+ return {'_foo': self.foo, '_bar': self.bar, '_baz': self.baz}
+
+ def __setstate__(self, state):
+ self.foo = state['_foo']
+ self.bar = state['_bar']
+ self.baz = state['_baz']
+
+ class ACustomState(AnInstance):
+ def __getstate__(self):
+ return (self.foo, self.bar, self.baz)
+
+ def __setstate__(self, state):
+ self.foo, self.bar, self.baz = state
+
+ # class InitArgs(AnInstance):
+ # def __getinitargs__(self):
+ # return (self.foo, self.bar, self.baz)
+ # def __getstate__(self):
+ # return {}
+
+ # class InitArgsWithState(AnInstance):
+ # def __getinitargs__(self):
+ # return (self.foo, self.bar)
+ # def __getstate__(self):
+ # return self.baz
+ # def __setstate__(self, state):
+ # self.baz = state
+
+ class NewArgs(AnObject):
+ def __getnewargs__(self):
+ return (self.foo, self.bar, self.baz)
+
+ def __getstate__(self):
+ return {}
+
+ class NewArgsWithState(AnObject):
+ def __getnewargs__(self):
+ return (self.foo, self.bar)
+
+ def __getstate__(self):
+ return self.baz
+
+ def __setstate__(self, state):
+ self.baz = state
+
+ InitArgs = NewArgs
+
+ InitArgsWithState = NewArgsWithState
+
+ class Reduce(AnObject):
+ def __reduce__(self):
+ return self.__class__, (self.foo, self.bar, self.baz)
+
+ class ReduceWithState(AnObject):
+ def __reduce__(self):
+ return self.__class__, (self.foo, self.bar), self.baz
+
+ def __setstate__(self, state):
+ self.baz = state
+
+ class MyInt(int):
+ def __eq__(self, other):
+ return type(self) is type(other) and int(self) == int(other)
+
+ class MyList(list):
+ def __init__(self, n=1):
+ self.extend([None] * n)
+
+ def __eq__(self, other):
+ return type(self) is type(other) and list(self) == list(other)
+
+ class MyDict(dict):
+ def __init__(self, n=1):
+ for k in range(n):
+ self[k] = None
+
+ def __eq__(self, other):
+ return type(self) is type(other) and dict(self) == dict(other)
+
+ class FixedOffset(datetime.tzinfo):
+ def __init__(self, offset, name):
+ self.__offset = datetime.timedelta(minutes=offset)
+ self.__name = name
+
+ def utcoffset(self, dt):
+ return self.__offset
+
+ def tzname(self, dt):
+ return self.__name
+
+ def dst(self, dt):
+ return datetime.timedelta(0)
+
+ today = datetime.date.today()
+
+
+try:
+ from ruamel.ordereddict import ordereddict
+except ImportError:
+ from collections import OrderedDict
+
+ # to get the right name import ... as ordereddict doesn't do that
+
+ class ordereddict(OrderedDict):
+ pass
+
+
+def _load_code(expression):
+ return eval(expression, globals())
+
+
+def _serialize_value(data):
+ if isinstance(data, list):
+ return '[%s]' % ', '.join(map(_serialize_value, data))
+ elif isinstance(data, dict):
+ items = []
+ for key, value in data.items():
+ key = _serialize_value(key)
+ value = _serialize_value(value)
+ items.append('%s: %s' % (key, value))
+ items.sort()
+ return '{%s}' % ', '.join(items)
+ elif isinstance(data, datetime.datetime):
+ return repr(data.utctimetuple())
+ elif isinstance(data, float) and data != data:
+ return '?'
+ else:
+ return str(data)
+
+
+def test_constructor_types(data_filename, code_filename, verbose=False):
+ _make_objects()
+ native1 = None
+ native2 = None
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ yaml.loader = MyLoader
+ try:
+ with open(data_filename, 'rb') as fp0:
+ native1 = list(yaml.load_all(fp0))
+ if len(native1) == 1:
+ native1 = native1[0]
+ with open(code_filename, 'rb') as fp0:
+ native2 = _load_code(fp0.read())
+ try:
+ if native1 == native2:
+ return
+ except TypeError:
+ pass
+ # print('native1', native1)
+ if verbose:
+ print('SERIALIZED NATIVE1:')
+ print(_serialize_value(native1))
+ print('SERIALIZED NATIVE2:')
+ print(_serialize_value(native2))
+ assert _serialize_value(native1) == _serialize_value(native2), (native1, native2)
+ finally:
+ if verbose:
+ print('NATIVE1:')
+ pprint.pprint(native1)
+ print('NATIVE2:')
+ pprint.pprint(native2)
+
+
+test_constructor_types.unittest = ['.data', '.code']
+
+
+def test_roundtrip_data(code_filename, roundtrip_filename, verbose=False):
+ _make_objects()
+ with open(code_filename, 'rb') as fp0:
+ value1 = fp0.read()
+ yaml = YAML(typ='safe', pure=True)
+ yaml.Loader = MyLoader
+ native2 = list(yaml.load_all(value1))
+ if len(native2) == 1:
+ native2 = native2[0]
+ try:
+ value2 = ruamel.yaml.dump(
+ native2,
+ Dumper=MyDumper,
+ default_flow_style=False,
+ allow_unicode=True,
+ encoding='utf-8',
+ )
+ # value2 += x
+ if verbose:
+ print('SERIALIZED NATIVE1:')
+ print(value1)
+ print('SERIALIZED NATIVE2:')
+ print(value2)
+ assert value1 == value2, (value1, value2)
+ finally:
+ if verbose:
+ print('NATIVE2:')
+ pprint.pprint(native2)
+
+
+test_roundtrip_data.unittest = ['.data', '.roundtrip']
+
+
+if __name__ == '__main__':
+ import sys
+ import test_constructor # NOQA
+
+ sys.modules['test_constructor'] = sys.modules['__main__']
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_emitter.py b/_test/lib/test_emitter.py
new file mode 100644
index 0000000..b1991e3
--- /dev/null
+++ b/_test/lib/test_emitter.py
@@ -0,0 +1,132 @@
+from __future__ import absolute_import
+from __future__ import print_function
+
+import ruamel.yaml
+from ruamel.yaml import YAML
+
+
+def _compare_events(events1, events2):
+ assert len(events1) == len(events2), (events1, events2)
+ for event1, event2 in zip(events1, events2):
+ assert event1.__class__ == event2.__class__, (event1, event2)
+ if isinstance(event1, yaml.NodeEvent):
+ assert event1.anchor == event2.anchor, (event1, event2)
+ if isinstance(event1, yaml.CollectionStartEvent):
+ assert event1.tag == event2.tag, (event1, event2)
+ if isinstance(event1, yaml.ScalarEvent):
+ if True not in event1.implicit + event2.implicit:
+ assert event1.tag == event2.tag, (event1, event2)
+ assert event1.value == event2.value, (event1, event2)
+
+
+def test_emitter_on_data(data_filename, canonical_filename, verbose=False):
+ with open(data_filename, 'rb') as fp0:
+ events = list(YAML().parse(fp0))
+ output = YAML().emit(events)
+ if verbose:
+ print('OUTPUT:')
+ print(output)
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+
+test_emitter_on_data.unittest = ['.data', '.canonical']
+
+
+def test_emitter_on_canonical(canonical_filename, verbose=False):
+ with open(canonical_filename, 'rb') as fp0:
+ events = list(YAML().parse(fp0))
+ for canonical in [False, True]:
+ output = YAML().emit(events, canonical=canonical)
+ if verbose:
+ print('OUTPUT (canonical=%s):' % canonical)
+ print(output)
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+
+test_emitter_on_canonical.unittest = ['.canonical']
+
+
+def test_emitter_styles(data_filename, canonical_filename, verbose=False):
+ for filename in [data_filename, canonical_filename]:
+ with open(filename, 'rb') as fp0:
+ events = list(YAML().parse(fp0))
+ for flow_style in [False, True]:
+ for style in ['|', '>', '"', "'", ""]:
+ styled_events = []
+ for event in events:
+ if isinstance(event, yaml.ScalarEvent):
+ event = yaml.ScalarEvent(
+ event.anchor, event.tag, event.implicit, event.value, style=style
+ )
+ elif isinstance(event, yaml.SequenceStartEvent):
+ event = yaml.SequenceStartEvent(
+ event.anchor, event.tag, event.implicit, flow_style=flow_style
+ )
+ elif isinstance(event, yaml.MappingStartEvent):
+ event = yaml.MappingStartEvent(
+ event.anchor, event.tag, event.implicit, flow_style=flow_style
+ )
+ styled_events.append(event)
+ output = YAML().emit(styled_events)
+ if verbose:
+ print(
+ 'OUTPUT (filename=%r, flow_style=%r, style=%r)'
+ % (filename, flow_style, style)
+ )
+ print(output)
+ new_events = list(YAML().parse(output))
+ _compare_events(events, new_events)
+
+
+test_emitter_styles.unittest = ['.data', '.canonical']
+
+
+class EventsLoader(ruamel.yaml.Loader):
+ def construct_event(self, node):
+ if isinstance(node, ruamel.yaml.ScalarNode):
+ mapping = {}
+ else:
+ mapping = self.construct_mapping(node)
+ class_name = str(node.tag[1:]) + 'Event'
+ if class_name in [
+ 'AliasEvent',
+ 'ScalarEvent',
+ 'SequenceStartEvent',
+ 'MappingStartEvent',
+ ]:
+ mapping.setdefault('anchor', None)
+ if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
+ mapping.setdefault('tag', None)
+ if class_name in ['SequenceStartEvent', 'MappingStartEvent']:
+ mapping.setdefault('implicit', True)
+ if class_name == 'ScalarEvent':
+ mapping.setdefault('implicit', (False, True))
+ mapping.setdefault('value', "")
+ value = getattr(yaml, class_name)(**mapping)
+ return value
+
+
+# if Loader is not a composite, add this function
+# EventsLoader.add_constructor = yaml.constructor.Constructor.add_constructor
+
+
+EventsLoader.add_constructor(None, EventsLoader.construct_event)
+
+
+def test_emitter_events(events_filename, verbose=False):
+ with open(events_filename, 'rb') as fp0:
+ events = list(YAML().load(fp0, Loader=EventsLoader))
+ output = YAML().emit(events)
+ if verbose:
+ print('OUTPUT:')
+ print(output)
+ new_events = list(YAML().parse(output))
+ _compare_events(events, new_events)
+
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_errors.py b/_test/lib/test_errors.py
new file mode 100644
index 0000000..c0fd3df
--- /dev/null
+++ b/_test/lib/test_errors.py
@@ -0,0 +1,92 @@
+
+import ruamel.yaml
+YAML = ruamel.yaml.YAML
+
+import test_emitter
+import warnings
+
+warnings.simplefilter('ignore', ruamel.yaml.error.UnsafeLoaderWarning)
+
+
+def test_loader_error(error_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ try:
+ with open(error_filename, 'rb') as fp0:
+ list(yaml.load_all(fp0))
+ except yaml.YAMLError as exc:
+ if verbose:
+ print('%s:' % exc.__class__.__name__, exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_loader_error.unittest = ['.loader-error']
+
+
+def test_loader_error_string(error_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ try:
+ with open(error_filename, 'rb') as fp0:
+ list(yaml.load_all(fp0.read()))
+ except yaml.YAMLError as exc:
+ if verbose:
+ print('%s:' % exc.__class__.__name__, exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_loader_error_string.unittest = ['.loader-error']
+
+
+def test_loader_error_single(error_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ try:
+ with open(error_filename, 'rb') as fp0:
+ yaml.load(fp0.read())
+ except yaml.YAMLError as exc:
+ if verbose:
+ print('%s:' % exc.__class__.__name__, exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_loader_error_single.unittest = ['.single-loader-error']
+
+
+def test_emitter_error(error_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(error_filename, 'rb') as fp0:
+ events = list(yaml.load(fp0, Loader=test_emitter.EventsLoader))
+ try:
+ ruamel.yaml.emit(events)
+ except yaml.YAMLError as exc:
+ if verbose:
+ print('%s:' % exc.__class__.__name__, exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_emitter_error.unittest = ['.emitter-error']
+
+
+def test_dumper_error(error_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(error_filename, 'rb') as fp0:
+ code = fp0.read()
+ try:
+ import yaml
+
+ exec(code)
+ except yaml.YAMLError as exc:
+ if verbose:
+ print('%s:' % exc.__class__.__name__, exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_dumper_error.unittest = ['.dumper-error']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_input_output.py b/_test/lib/test_input_output.py
new file mode 100644
index 0000000..37bda3d
--- /dev/null
+++ b/_test/lib/test_input_output.py
@@ -0,0 +1,180 @@
+
+from ruamel.yaml import YAML
+import codecs
+import tempfile
+import os
+import os.path
+from ruamel.yaml.compat import StringIO, BytesIO
+
+def test_unicode_input(unicode_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(unicode_filename, 'rb') as fp:
+ data = fp.read().decode('utf-8')
+ value = ' '.join(data.split())
+ output = yaml.load(data)
+ assert output == value, (output, value)
+ output = yaml.load(StringIO(data))
+ assert output == value, (output, value)
+ for input in [
+ data.encode('utf-8'),
+ codecs.BOM_UTF8 + data.encode('utf-8'),
+ codecs.BOM_UTF16_BE + data.encode('utf-16-be'),
+ codecs.BOM_UTF16_LE + data.encode('utf-16-le'),
+ ]:
+ if verbose:
+ print('INPUT:', repr(input[:10]), '...')
+ output = yaml.load(input)
+ assert output == value, (output, value)
+ output = yaml.load(BytesIO(input))
+ assert output == value, (output, value)
+
+
+test_unicode_input.unittest = ['.unicode']
+
+
+def test_unicode_input_errors(unicode_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(unicode_filename, 'rb') as fp:
+ data = fp.read().decode('utf-8')
+ for input in [
+ data.encode('latin1', 'ignore'),
+ data.encode('utf-16-be'),
+ data.encode('utf-16-le'),
+ codecs.BOM_UTF8 + data.encode('utf-16-be'),
+ codecs.BOM_UTF16_BE + data.encode('utf-16-le'),
+ codecs.BOM_UTF16_LE + data.encode('utf-8') + b'!',
+ ]:
+ try:
+ yaml.load(input)
+ except yaml.YAMLError as exc:
+ if verbose:
+ print(exc)
+ else:
+ raise AssertionError('expected an exception')
+ try:
+ yaml.load(BytesIO(input))
+ except yaml.YAMLError as exc:
+ if verbose:
+ print(exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+test_unicode_input_errors.unittest = ['.unicode']
+
+
+def test_unicode_output(unicode_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(unicode_filename, 'rb') as fp:
+ data = fp.read().decode('utf-8')
+ value = ' '.join(data.split())
+ for allow_unicode in [False, True]:
+ data1 = yaml.dump(value, allow_unicode=allow_unicode)
+ for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
+ stream = StringIO()
+ yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode)
+ data2 = stream.getvalue()
+ data3 = yaml.dump(value, encoding=encoding, allow_unicode=allow_unicode)
+ if encoding is not None:
+ assert isinstance(data3, bytes)
+ data3 = data3.decode(encoding)
+ stream = BytesIO()
+ if encoding is None:
+ try:
+ yaml.dump(
+ value, stream, encoding=encoding, allow_unicode=allow_unicode
+ )
+ except TypeError as exc:
+ if verbose:
+ print(exc)
+ data4 = None
+ else:
+ raise AssertionError('expected an exception')
+ else:
+ yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode)
+ data4 = stream.getvalue()
+ if verbose:
+ print('BYTES:', data4[:50])
+ data4 = data4.decode(encoding)
+ for copy in [data1, data2, data3, data4]:
+ if copy is None:
+ continue
+ assert isinstance(copy, str)
+ if allow_unicode:
+ try:
+ copy[4:].encode('ascii')
+ except UnicodeEncodeError as exc:
+ if verbose:
+ print(exc)
+ else:
+ raise AssertionError('expected an exception')
+ else:
+ copy[4:].encode('ascii')
+ assert isinstance(data1, str), (type(data1), encoding)
+ assert isinstance(data2, str), (type(data2), encoding)
+
+
+test_unicode_output.unittest = ['.unicode']
+
+
+def test_file_output(unicode_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(unicode_filename, 'rb') as fp:
+ data = fp.read().decode('utf-8')
+ handle, filename = tempfile.mkstemp()
+ os.close(handle)
+ try:
+ stream = StringIO()
+ yaml.dump(data, stream, allow_unicode=True)
+ data1 = stream.getvalue()
+ stream = BytesIO()
+ yaml.dump(data, stream, encoding='utf-16-le', allow_unicode=True)
+ data2 = stream.getvalue().decode('utf-16-le')[1:]
+ with open(filename, 'w', encoding='utf-16-le') as stream:
+ yaml.dump(data, stream, allow_unicode=True)
+ with open(filename, 'r', encoding='utf-16-le') as fp0:
+ data3 = fp0.read()
+ with open(filename, 'wb') as stream:
+ yaml.dump(data, stream, encoding='utf-8', allow_unicode=True)
+ with open(filename, 'r', encoding='utf-8') as fp0:
+ data4 = fp0.read()
+ assert data1 == data2, (data1, data2)
+ assert data1 == data3, (data1, data3)
+ assert data1 == data4, (data1, data4)
+ finally:
+ if os.path.exists(filename):
+ os.unlink(filename)
+
+
+test_file_output.unittest = ['.unicode']
+
+
+def test_unicode_transfer(unicode_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ with open(unicode_filename, 'rb') as fp:
+ data = fp.read().decode('utf-8')
+ for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
+ input = data
+ if encoding is not None:
+ input = ('\ufeff' + input).encode(encoding)
+ output1 = yaml.emit(yaml.parse(input), allow_unicode=True)
+ if encoding is None:
+ stream = StringIO()
+ else:
+ stream = BytesIO()
+ yaml.emit(yaml.parse(input), stream, allow_unicode=True)
+ output2 = stream.getvalue()
+ assert isinstance(output1, str), (type(output1), encoding)
+ if encoding is None:
+ assert isinstance(output2, str), (type(output1), encoding)
+ else:
+ assert isinstance(output2, bytes), (type(output1), encoding)
+ output2.decode(encoding)
+
+
+test_unicode_transfer.unittest = ['.unicode']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_mark.py b/_test/lib/test_mark.py
new file mode 100644
index 0000000..2644a79
--- /dev/null
+++ b/_test/lib/test_mark.py
@@ -0,0 +1,35 @@
+
+import ruamel.yaml as yaml
+
+
+def test_marks(marks_filename, verbose=False):
+ with open(marks_filename, 'r') as fp0:
+ inputs = fp0.read().split('---\n')[1:]
+ for input in inputs:
+ index = 0
+ line = 0
+ column = 0
+ while input[index] != '*':
+ if input[index] == '\n':
+ line += 1
+ column = 0
+ else:
+ column += 1
+ index += 1
+ mark = yaml.Mark(marks_filename, index, line, column, str(input), index)
+ snippet = mark.get_snippet(indent=2, max_length=79)
+ if verbose:
+ print(snippet)
+ assert isinstance(snippet, str), type(snippet)
+ assert snippet.count('\n') == 1, snippet.count('\n')
+ data, pointer = snippet.split('\n')
+ assert len(data) < 82, len(data)
+ assert data[len(pointer) - 1] == '*', data[len(pointer) - 1]
+
+
+test_marks.unittest = ['.marks']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_reader.py b/_test/lib/test_reader.py
new file mode 100644
index 0000000..16b9cd7
--- /dev/null
+++ b/_test/lib/test_reader.py
@@ -0,0 +1,44 @@
+
+import codecs # NOQA
+import io
+
+import ruamel.yaml.reader
+
+
+def _run_reader(data, verbose):
+ try:
+ stream = ruamel.yaml.py.reader.Reader(data)
+ while stream.peek() != '\0':
+ stream.forward()
+ except ruamel.yaml.py.reader.ReaderError as exc:
+ if verbose:
+ print(exc)
+ else:
+ raise AssertionError('expected an exception')
+
+
+def test_stream_error(error_filename, verbose=False):
+ with open(error_filename, 'rb') as fp0:
+ _run_reader(fp0, verbose)
+ with open(error_filename, 'rb') as fp0:
+ _run_reader(fp0.read(), verbose)
+ for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
+ try:
+ with open(error_filename, 'rb') as fp0:
+ data = fp0.read().decode(encoding)
+ break
+ except UnicodeDecodeError:
+ pass
+ else:
+ return
+ _run_reader(data, verbose)
+ with io.open(error_filename, encoding=encoding) as fp:
+ _run_reader(fp, verbose)
+
+
+test_stream_error.unittest = ['.stream-error']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_recursive.py b/_test/lib/test_recursive.py
new file mode 100644
index 0000000..88858e4
--- /dev/null
+++ b/_test/lib/test_recursive.py
@@ -0,0 +1,58 @@
+
+import ruamel.yaml
+
+
+class AnInstance:
+ def __init__(self, foo, bar):
+ self.foo = foo
+ self.bar = bar
+
+ def __repr__(self):
+ try:
+ return '%s(foo=%r, bar=%r)' % (self.__class__.__name__, self.foo, self.bar)
+ except RuntimeError:
+ return '%s(foo=..., bar=...)' % self.__class__.__name__
+
+
+class AnInstanceWithState(AnInstance):
+ def __getstate__(self):
+ return {'attributes': [self.foo, self.bar]}
+
+ def __setstate__(self, state):
+ self.foo, self.bar = state['attributes']
+
+
+def test_recursive(recursive_filename, verbose=False):
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ context = globals().copy()
+ with open(recursive_filename, 'rb') as fp0:
+ exec(fp0.read(), context)
+ value1 = context['value']
+ output1 = None
+ value2 = None
+ output2 = None
+ try:
+ buf = ruamel.yaml.compat.StringIO()
+ output1 = yaml.dump(value1, buf)
+ yaml.load(output1)
+ value2 = buf.getvalue()
+ buf = ruamel.yaml.compat.StringIO()
+ yaml.dump(value2, buf)
+ output2 = buf.getvalue()
+ assert output1 == output2, (output1, output2)
+ finally:
+ if verbose:
+ print('VALUE1:', value1)
+ print('VALUE2:', value2)
+ print('OUTPUT1:')
+ print(output1)
+ print('OUTPUT2:')
+ print(output2)
+
+
+test_recursive.unittest = ['.recursive']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_representer.py b/_test/lib/test_representer.py
new file mode 100644
index 0000000..5b2415d
--- /dev/null
+++ b/_test/lib/test_representer.py
@@ -0,0 +1,51 @@
+
+from ruamel.yaml import YAML
+import test_constructor
+import pprint
+
+
+def test_representer_types(code_filename, verbose=False):
+ yaml = YAML(typ='safe', pure=True)
+ test_constructor._make_objects()
+ for allow_unicode in [False, True]:
+ for encoding in ['utf-8', 'utf-16-be', 'utf-16-le']:
+ with open(code_filename, 'rb') as fp0:
+ native1 = test_constructor._load_code(fp0.read())
+ native2 = None
+ try:
+ output = yaml.dump(
+ native1,
+ Dumper=test_constructor.MyDumper,
+ allow_unicode=allow_unicode,
+ encoding=encoding,
+ )
+ native2 = yaml.load(output, Loader=test_constructor.MyLoader)
+ try:
+ if native1 == native2:
+ continue
+ except TypeError:
+ pass
+ value1 = test_constructor._serialize_value(native1)
+ value2 = test_constructor._serialize_value(native2)
+ if verbose:
+ print('SERIALIZED NATIVE1:')
+ print(value1)
+ print('SERIALIZED NATIVE2:')
+ print(value2)
+ assert value1 == value2, (native1, native2)
+ finally:
+ if verbose:
+ print('NATIVE1:')
+ pprint.pprint(native1)
+ print('NATIVE2:')
+ pprint.pprint(native2)
+ print('OUTPUT:')
+ print(output)
+
+
+test_representer_types.unittest = ['.code']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_resolver.py b/_test/lib/test_resolver.py
new file mode 100644
index 0000000..b2b0839
--- /dev/null
+++ b/_test/lib/test_resolver.py
@@ -0,0 +1,111 @@
+
+import ruamel.yaml
+yaml = ruamel.yaml.YAML()
+import pprint
+
+
+def test_implicit_resolver(data_filename, detect_filename, verbose=False):
+ correct_tag = None
+ node = None
+ try:
+ with open(detect_filename, 'r') as fp0:
+ correct_tag = fp0.read().strip()
+ with open(data_filename, 'rb') as fp0:
+ node = yaml.compose(fp0)
+ assert isinstance(node, yaml.SequenceNode), node
+ for scalar in node.value:
+ assert isinstance(scalar, yaml.ScalarNode), scalar
+ assert scalar.tag == correct_tag, (scalar.tag, correct_tag)
+ finally:
+ if verbose:
+ print('CORRECT TAG:', correct_tag)
+ if hasattr(node, 'value'):
+ print('CHILDREN:')
+ pprint.pprint(node.value)
+
+
+test_implicit_resolver.unittest = ['.data', '.detect']
+
+
+def _make_path_loader_and_dumper():
+ global MyLoader, MyDumper
+
+ class MyLoader(yaml.Loader):
+ pass
+
+ class MyDumper(yaml.Dumper):
+ pass
+
+ yaml.add_path_resolver('!root', [], Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver('!root/scalar', [], str, Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(
+ '!root/key11/key12/*', ['key11', 'key12'], Loader=MyLoader, Dumper=MyDumper
+ )
+ yaml.add_path_resolver('!root/key21/1/*', ['key21', 1], Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(
+ '!root/key31/*/*/key14/map',
+ ['key31', None, None, 'key14'],
+ dict,
+ Loader=MyLoader,
+ Dumper=MyDumper,
+ )
+
+ return MyLoader, MyDumper
+
+
+def _convert_node(node):
+ if isinstance(node, yaml.ScalarNode):
+ return (node.tag, node.value)
+ elif isinstance(node, yaml.SequenceNode):
+ value = []
+ for item in node.value:
+ value.append(_convert_node(item))
+ return (node.tag, value)
+ elif isinstance(node, yaml.MappingNode):
+ value = []
+ for key, item in node.value:
+ value.append((_convert_node(key), _convert_node(item)))
+ return (node.tag, value)
+
+
+def test_path_resolver_loader(data_filename, path_filename, verbose=False):
+ _make_path_loader_and_dumper()
+ with open(data_filename, 'rb') as fp0:
+ nodes1 = list(yaml.compose_all(fp0.read(), Loader=MyLoader))
+ with open(path_filename, 'rb') as fp0:
+ nodes2 = list(yaml.compose_all(fp0.read()))
+ try:
+ for node1, node2 in zip(nodes1, nodes2):
+ data1 = _convert_node(node1)
+ data2 = _convert_node(node2)
+ assert data1 == data2, (data1, data2)
+ finally:
+ if verbose:
+ print(yaml.serialize_all(nodes1))
+
+
+test_path_resolver_loader.unittest = ['.data', '.path']
+
+
+def test_path_resolver_dumper(data_filename, path_filename, verbose=False):
+ _make_path_loader_and_dumper()
+ for filename in [data_filename, path_filename]:
+ with open(filename, 'rb') as fp0:
+ output = yaml.serialize_all(yaml.compose_all(fp0), Dumper=MyDumper)
+ if verbose:
+ print(output)
+ nodes1 = yaml.compose_all(output)
+ with open(data_filename, 'rb') as fp0:
+ nodes2 = yaml.compose_all(fp0)
+ for node1, node2 in zip(nodes1, nodes2):
+ data1 = _convert_node(node1)
+ data2 = _convert_node(node2)
+ assert data1 == data2, (data1, data2)
+
+
+test_path_resolver_dumper.unittest = ['.data', '.path']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_structure.py b/_test/lib/test_structure.py
new file mode 100644
index 0000000..8de24a3
--- /dev/null
+++ b/_test/lib/test_structure.py
@@ -0,0 +1,225 @@
+
+import ruamel.yaml
+import canonical # NOQA
+import pprint
+
+
+def _convert_structure(loader):
+ if loader.check_event(ruamel.yaml.ScalarEvent):
+ event = loader.get_event()
+ if event.tag or event.anchor or event.value:
+ return True
+ else:
+ return None
+ elif loader.check_event(ruamel.yaml.SequenceStartEvent):
+ loader.get_event()
+ sequence = []
+ while not loader.check_event(ruamel.yaml.SequenceEndEvent):
+ sequence.append(_convert_structure(loader))
+ loader.get_event()
+ return sequence
+ elif loader.check_event(ruamel.yaml.MappingStartEvent):
+ loader.get_event()
+ mapping = []
+ while not loader.check_event(ruamel.yaml.MappingEndEvent):
+ key = _convert_structure(loader)
+ value = _convert_structure(loader)
+ mapping.append((key, value))
+ loader.get_event()
+ return mapping
+ elif loader.check_event(ruamel.yaml.AliasEvent):
+ loader.get_event()
+ return '*'
+ else:
+ loader.get_event()
+ return '?'
+
+
+def test_structure(data_filename, structure_filename, verbose=False):
+ nodes1 = []
+ with open(structure_filename, 'r') as fp:
+ nodes2 = eval(fp.read())
+ try:
+ with open(data_filename, 'rb') as fp:
+ loader = ruamel.yaml.Loader(fp)
+ while loader.check_event():
+ if loader.check_event(
+ ruamel.yaml.StreamStartEvent,
+ ruamel.yaml.StreamEndEvent,
+ ruamel.yaml.DocumentStartEvent,
+ ruamel.yaml.DocumentEndEvent,
+ ):
+ loader.get_event()
+ continue
+ nodes1.append(_convert_structure(loader))
+ if len(nodes1) == 1:
+ nodes1 = nodes1[0]
+ assert nodes1 == nodes2, (nodes1, nodes2)
+ finally:
+ if verbose:
+ print('NODES1:')
+ pprint.pprint(nodes1)
+ print('NODES2:')
+ pprint.pprint(nodes2)
+
+
+test_structure.unittest = ['.data', '.structure']
+
+
+def _compare_events(events1, events2, full=False):
+ assert len(events1) == len(events2), (len(events1), len(events2))
+ for event1, event2 in zip(events1, events2):
+ assert event1.__class__ == event2.__class__, (event1, event2)
+ if isinstance(event1, ruamel.yaml.AliasEvent) and full:
+ assert event1.anchor == event2.anchor, (event1, event2)
+ if isinstance(event1, (ruamel.yaml.ScalarEvent, ruamel.yaml.CollectionStartEvent)):
+ if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full:
+ assert event1.tag == event2.tag, (event1, event2)
+ if isinstance(event1, ruamel.yaml.ScalarEvent):
+ assert event1.value == event2.value, (event1, event2)
+
+
+def test_parser(data_filename, canonical_filename, verbose=False):
+ events1 = None
+ events2 = None
+ try:
+ with open(data_filename, 'rb') as fp0:
+ events1 = list(ruamel.yaml.YAML().parse(fp0))
+ with open(canonical_filename, 'rb') as fp0:
+ events2 = list(ruamel.yaml.YAML().canonical_parse(fp0))
+ _compare_events(events1, events2)
+ finally:
+ if verbose:
+ print('EVENTS1:')
+ pprint.pprint(events1)
+ print('EVENTS2:')
+ pprint.pprint(events2)
+
+
+test_parser.unittest = ['.data', '.canonical']
+
+
+def test_parser_on_canonical(canonical_filename, verbose=False):
+ events1 = None
+ events2 = None
+ try:
+ with open(canonical_filename, 'rb') as fp0:
+ events1 = list(ruamel.yaml.YAML().parse(fp0))
+ with open(canonical_filename, 'rb') as fp0:
+ events2 = list(ruamel.yaml.YAML().canonical_parse(fp0))
+ _compare_events(events1, events2, full=True)
+ finally:
+ if verbose:
+ print('EVENTS1:')
+ pprint.pprint(events1)
+ print('EVENTS2:')
+ pprint.pprint(events2)
+
+
+test_parser_on_canonical.unittest = ['.canonical']
+
+
+def _compare_nodes(node1, node2):
+ assert node1.__class__ == node2.__class__, (node1, node2)
+ assert node1.tag == node2.tag, (node1, node2)
+ if isinstance(node1, ruamel.yaml.ScalarNode):
+ assert node1.value == node2.value, (node1, node2)
+ else:
+ assert len(node1.value) == len(node2.value), (node1, node2)
+ for item1, item2 in zip(node1.value, node2.value):
+ if not isinstance(item1, tuple):
+ item1 = (item1,)
+ item2 = (item2,)
+ for subnode1, subnode2 in zip(item1, item2):
+ _compare_nodes(subnode1, subnode2)
+
+
+def test_composer(data_filename, canonical_filename, verbose=False):
+ nodes1 = None
+ nodes2 = None
+ try:
+ yaml = ruamel.yaml.YAML()
+ with open(data_filename, 'rb') as fp0:
+ nodes1 = list(yaml.compose_all(fp0))
+ with open(canonical_filename, 'rb') as fp0:
+ nodes2 = list(yaml.canonical_compose_all(fp0))
+ assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2))
+ for node1, node2 in zip(nodes1, nodes2):
+ _compare_nodes(node1, node2)
+ finally:
+ if verbose:
+ print('NODES1:')
+ pprint.pprint(nodes1)
+ print('NODES2:')
+ pprint.pprint(nodes2)
+
+
+test_composer.unittest = ['.data', '.canonical']
+
+
+def _make_loader():
+ global MyLoader
+
+ class MyLoader(ruamel.yaml.Loader):
+ def construct_sequence(self, node):
+ return tuple(ruamel.yaml.Loader.construct_sequence(self, node))
+
+ def construct_mapping(self, node):
+ pairs = self.construct_pairs(node)
+ pairs.sort(key=(lambda i: str(i)))
+ return pairs
+
+ def construct_undefined(self, node):
+ return self.construct_scalar(node)
+
+ MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping)
+ MyLoader.add_constructor(None, MyLoader.construct_undefined)
+
+
+def _make_canonical_loader():
+ global MyCanonicalLoader
+
+ class MyCanonicalLoader(ruamel.yaml.CanonicalLoader):
+ def construct_sequence(self, node):
+ return tuple(ruamel.yaml.CanonicalLoader.construct_sequence(self, node))
+
+ def construct_mapping(self, node):
+ pairs = self.construct_pairs(node)
+ pairs.sort(key=(lambda i: str(i)))
+ return pairs
+
+ def construct_undefined(self, node):
+ return self.construct_scalar(node)
+
+ MyCanonicalLoader.add_constructor(
+ 'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping
+ )
+ MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
+
+
+def test_constructor(data_filename, canonical_filename, verbose=False):
+ _make_loader()
+ _make_canonical_loader()
+ native1 = None
+ native2 = None
+ yaml = YAML(typ='safe')
+ try:
+ with open(data_filename, 'rb') as fp0:
+ native1 = list(yaml.load(fp0, Loader=MyLoader))
+ with open(canonical_filename, 'rb') as fp0:
+ native2 = list(yaml.load(fp0, Loader=MyCanonicalLoader))
+ assert native1 == native2, (native1, native2)
+ finally:
+ if verbose:
+ print('NATIVE1:')
+ pprint.pprint(native1)
+ print('NATIVE2:')
+ pprint.pprint(native2)
+
+
+test_constructor.unittest = ['.data', '.canonical']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_tokens.py b/_test/lib/test_tokens.py
new file mode 100644
index 0000000..575e95c
--- /dev/null
+++ b/_test/lib/test_tokens.py
@@ -0,0 +1,86 @@
+
+import ruamel.yaml
+import pprint
+
+# Tokens mnemonic:
+# directive: %
+# document_start: ---
+# document_end: ...
+# alias: *
+# anchor: &
+# tag: !
+# scalar _
+# block_sequence_start: [[
+# block_mapping_start: {{
+# block_end: ]}
+# flow_sequence_start: [
+# flow_sequence_end: ]
+# flow_mapping_start: {
+# flow_mapping_end: }
+# entry: ,
+# key: ?
+# value: :
+
+_replaces = {
+ ruamel.yaml.DirectiveToken: '%',
+ ruamel.yaml.DocumentStartToken: '---',
+ ruamel.yaml.DocumentEndToken: '...',
+ ruamel.yaml.AliasToken: '*',
+ ruamel.yaml.AnchorToken: '&',
+ ruamel.yaml.TagToken: '!',
+ ruamel.yaml.ScalarToken: '_',
+ ruamel.yaml.BlockSequenceStartToken: '[[',
+ ruamel.yaml.BlockMappingStartToken: '{{',
+ ruamel.yaml.BlockEndToken: ']}',
+ ruamel.yaml.FlowSequenceStartToken: '[',
+ ruamel.yaml.FlowSequenceEndToken: ']',
+ ruamel.yaml.FlowMappingStartToken: '{',
+ ruamel.yaml.FlowMappingEndToken: '}',
+ ruamel.yaml.BlockEntryToken: ',',
+ ruamel.yaml.FlowEntryToken: ',',
+ ruamel.yaml.KeyToken: '?',
+ ruamel.yaml.ValueToken: ':',
+}
+
+
+def test_tokens(data_filename, tokens_filename, verbose=False):
+ tokens1 = []
+ with open(tokens_filename, 'r') as fp:
+ tokens2 = fp.read().split()
+ try:
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=True)
+ with open(data_filename, 'rb') as fp1:
+ for token in yaml.scan(fp1):
+ if not isinstance(token, (ruamel.yaml.StreamStartToken, ruamel.yaml.StreamEndToken)):
+ tokens1.append(_replaces[token.__class__])
+ finally:
+ if verbose:
+ print('TOKENS1:', ' '.join(tokens1))
+ print('TOKENS2:', ' '.join(tokens2))
+ assert len(tokens1) == len(tokens2), (tokens1, tokens2)
+ for token1, token2 in zip(tokens1, tokens2):
+ assert token1 == token2, (token1, token2)
+
+
+test_tokens.unittest = ['.data', '.tokens']
+
+
+def test_scanner(data_filename, canonical_filename, verbose=False):
+ for filename in [data_filename, canonical_filename]:
+ tokens = []
+ try:
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=False)
+ with open(filename, 'rb') as fp:
+ for token in yaml.scan(fp):
+ tokens.append(token.__class__.__name__)
+ finally:
+ if verbose:
+ pprint.pprint(tokens)
+
+
+test_scanner.unittest = ['.data', '.canonical']
+
+if __name__ == '__main__':
+ import test_appliance
+
+ test_appliance.run(globals())
diff --git a/_test/lib/test_yaml.py b/_test/lib/test_yaml.py
new file mode 100644
index 0000000..cf64a73
--- /dev/null
+++ b/_test/lib/test_yaml.py
@@ -0,0 +1,20 @@
+# coding: utf-8
+
+from test_mark import * # NOQA
+from test_reader import * # NOQA
+from test_canonical import * # NOQA
+from test_tokens import * # NOQA
+from test_structure import * # NOQA
+from test_errors import * # NOQA
+from test_resolver import * # NOQA
+from test_constructor import * # NOQA
+from test_emitter import * # NOQA
+from test_representer import * # NOQA
+from test_recursive import * # NOQA
+from test_input_output import * # NOQA
+
+if __name__ == '__main__':
+ import sys
+ import test_appliance
+
+ sys.exit(test_appliance.run(globals()))
diff --git a/_test/lib/test_yaml_ext.py b/_test/lib/test_yaml_ext.py
new file mode 100644
index 0000000..a6fa287
--- /dev/null
+++ b/_test/lib/test_yaml_ext.py
@@ -0,0 +1,403 @@
+# coding: utf-8
+
+import _ruamel_yaml
+import ruamel.yaml
+import types
+import pprint
+
+ruamel.yaml.PyBaseLoader = ruamel.yaml.BaseLoader
+ruamel.yaml.PySafeLoader = ruamel.yaml.SafeLoader
+ruamel.yaml.PyLoader = ruamel.yaml.Loader
+ruamel.yaml.PyBaseDumper = ruamel.yaml.BaseDumper
+ruamel.yaml.PySafeDumper = ruamel.yaml.SafeDumper
+ruamel.yaml.PyDumper = ruamel.yaml.Dumper
+
+old_scan = ruamel.yaml.scan
+
+
+def new_scan(stream, Loader=ruamel.yaml.CLoader):
+ return old_scan(stream, Loader)
+
+
+old_parse = ruamel.yaml.parse
+
+
+def new_parse(stream, Loader=ruamel.yaml.CLoader):
+ return old_parse(stream, Loader)
+
+
+old_compose = ruamel.yaml.compose
+
+
+def new_compose(stream, Loader=ruamel.yaml.CLoader):
+ return old_compose(stream, Loader)
+
+
+old_compose_all = ruamel.yaml.compose_all
+
+
+def new_compose_all(stream, Loader=ruamel.yaml.CLoader):
+ return old_compose_all(stream, Loader)
+
+
+old_load = ruamel.yaml.load
+
+
+def new_load(stream, Loader=ruamel.yaml.CLoader):
+ return old_load(stream, Loader)
+
+
+old_load_all = ruamel.yaml.load_all
+
+
+def new_load_all(stream, Loader=ruamel.yaml.CLoader):
+ return old_load_all(stream, Loader)
+
+
+old_safe_load = ruamel.yaml.safe_load
+
+
+def new_safe_load(stream):
+ return old_load(stream, ruamel.yaml.CSafeLoader)
+
+
+old_safe_load_all = ruamel.yaml.safe_load_all
+
+
+def new_safe_load_all(stream):
+ return old_load_all(stream, ruamel.yaml.CSafeLoader)
+
+
+old_emit = ruamel.yaml.emit
+
+
+def new_emit(events, stream=None, Dumper=ruamel.yaml.CDumper, **kwds):
+ return old_emit(events, stream, Dumper, **kwds)
+
+
+old_serialize = ruamel.yaml.serialize
+
+
+def new_serialize(node, stream, Dumper=ruamel.yaml.CDumper, **kwds):
+ return old_serialize(node, stream, Dumper, **kwds)
+
+
+old_serialize_all = ruamel.yaml.serialize_all
+
+
+def new_serialize_all(nodes, stream=None, Dumper=ruamel.yaml.CDumper, **kwds):
+ return old_serialize_all(nodes, stream, Dumper, **kwds)
+
+
+old_dump = ruamel.yaml.dump
+
+
+def new_dump(data, stream=None, Dumper=ruamel.yaml.CDumper, **kwds):
+ return old_dump(data, stream, Dumper, **kwds)
+
+
+old_dump_all = ruamel.yaml.dump_all
+
+
+def new_dump_all(documents, stream=None, Dumper=ruamel.yaml.CDumper, **kwds):
+ return old_dump_all(documents, stream, Dumper, **kwds)
+
+
+old_safe_dump = ruamel.yaml.safe_dump
+
+
+def new_safe_dump(data, stream=None, **kwds):
+ return old_dump(data, stream, ruamel.yaml.CSafeDumper, **kwds)
+
+
+# old_safe_dump_all = ruamel.yaml.safe_dump_all
+
+
+def new_safe_dump_all(documents, stream=None, **kwds):
+ return old_dump_all(documents, stream, ruamel.yaml.CSafeDumper, **kwds)
+
+
+def _set_up():
+ ruamel.yaml.BaseLoader = ruamel.yaml.CBaseLoader
+ ruamel.yaml.SafeLoader = ruamel.yaml.CSafeLoader
+ ruamel.yaml.Loader = ruamel.yaml.CLoader
+ ruamel.yaml.BaseDumper = ruamel.yaml.CBaseDumper
+ ruamel.yaml.SafeDumper = ruamel.yaml.CSafeDumper
+ ruamel.yaml.Dumper = ruamel.yaml.CDumper
+ ruamel.yaml.scan = new_scan
+ ruamel.yaml.parse = new_parse
+ ruamel.yaml.compose = new_compose
+ ruamel.yaml.compose_all = new_compose_all
+ ruamel.yaml.load = new_load
+ ruamel.yaml.load_all = new_load_all
+ ruamel.yaml.safe_load = new_safe_load
+ ruamel.yaml.safe_load_all = new_safe_load_all
+ ruamel.yaml.emit = new_emit
+ ruamel.yaml.serialize = new_serialize
+ ruamel.yaml.serialize_all = new_serialize_all
+ ruamel.yaml.dump = new_dump
+ ruamel.yaml.dump_all = new_dump_all
+ ruamel.yaml.safe_dump = new_safe_dump
+ ruamel.yaml.safe_dump_all = new_safe_dump_all
+
+
+def _tear_down():
+ ruamel.yaml.BaseLoader = ruamel.yaml.PyBaseLoader
+ ruamel.yaml.SafeLoader = ruamel.yaml.PySafeLoader
+ ruamel.yaml.Loader = ruamel.yaml.PyLoader
+ ruamel.yaml.BaseDumper = ruamel.yaml.PyBaseDumper
+ ruamel.yaml.SafeDumper = ruamel.yaml.PySafeDumper
+ ruamel.yaml.Dumper = ruamel.yaml.PyDumper
+ ruamel.yaml.scan = old_scan
+ ruamel.yaml.parse = old_parse
+ ruamel.yaml.compose = old_compose
+ ruamel.yaml.compose_all = old_compose_all
+ ruamel.yaml.load = old_load
+ ruamel.yaml.load_all = old_load_all
+ ruamel.yaml.safe_load = old_safe_load
+ ruamel.yaml.safe_load_all = old_safe_load_all
+ ruamel.yaml.emit = old_emit
+ ruamel.yaml.serialize = old_serialize
+ ruamel.yaml.serialize_all = old_serialize_all
+ ruamel.yaml.dump = old_dump
+ ruamel.yaml.dump_all = old_dump_all
+ ruamel.yaml.safe_dump = old_safe_dump
+ ruamel.yaml.safe_dump_all = old_safe_dump_all
+
+
+def test_c_version(verbose=False):
+ if verbose:
+ print(_ruamel_yaml.get_version())
+ print(_ruamel_yaml.get_version_string())
+ assert ('%s.%s.%s' % _ruamel_yaml.get_version()) == _ruamel_yaml.get_version_string(), (
+ _ruamel_yaml.get_version(),
+ _ruamel_yaml.get_version_string(),
+ )
+
+
+def _compare_scanners(py_data, c_data, verbose):
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=True)
+ py_tokens = list(yaml.scan(py_data, Loader=ruamel.yaml.PyLoader))
+ c_tokens = []
+ try:
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=False)
+ for token in yaml.scan(c_data, Loader=ruamel.yaml.CLoader):
+ c_tokens.append(token)
+ assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
+ for py_token, c_token in zip(py_tokens, c_tokens):
+ assert py_token.__class__ == c_token.__class__, (py_token, c_token)
+ if hasattr(py_token, 'value'):
+ assert py_token.value == c_token.value, (py_token, c_token)
+ if isinstance(py_token, ruamel.yaml.StreamEndToken):
+ continue
+ py_start = (
+ py_token.start_mark.index,
+ py_token.start_mark.line,
+ py_token.start_mark.column,
+ )
+ py_end = (
+ py_token.end_mark.index,
+ py_token.end_mark.line,
+ py_token.end_mark.column,
+ )
+ c_start = (
+ c_token.start_mark.index,
+ c_token.start_mark.line,
+ c_token.start_mark.column,
+ )
+ c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
+ assert py_start == c_start, (py_start, c_start)
+ assert py_end == c_end, (py_end, c_end)
+ finally:
+ if verbose:
+ print('PY_TOKENS:')
+ pprint.pprint(py_tokens)
+ print('C_TOKENS:')
+ pprint.pprint(c_tokens)
+
+
+def test_c_scanner(data_filename, canonical_filename, verbose=False):
+ with open(data_filename, 'rb') as fp0:
+ with open(data_filename, 'rb') as fp1:
+ _compare_scanners(fp0, fp1, verbose)
+ with open(data_filename, 'rb') as fp0:
+ with open(data_filename, 'rb') as fp1:
+ _compare_scanners(fp0.read(), fp1.read(), verbose)
+ with open(canonical_filename, 'rb') as fp0:
+ with open(canonical_filename, 'rb') as fp1:
+ _compare_scanners(fp0, fp1, verbose)
+ with open(canonical_filename, 'rb') as fp0:
+ with open(canonical_filename, 'rb') as fp1:
+ _compare_scanners(fp0.read(), fp1.read(), verbose)
+
+
+test_c_scanner.unittest = ['.data', '.canonical']
+test_c_scanner.skip = ['.skip-ext']
+
+
+def _compare_parsers(py_data, c_data, verbose):
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=True)
+ py_events = list(yaml.parse(py_data, Loader=ruamel.yaml.PyLoader))
+ c_events = []
+ try:
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=False)
+ for event in yaml.parse(c_data, Loader=ruamel.yaml.CLoader):
+ c_events.append(event)
+ assert len(py_events) == len(c_events), (len(py_events), len(c_events))
+ for py_event, c_event in zip(py_events, c_events):
+ for attribute in [
+ '__class__',
+ 'anchor',
+ 'tag',
+ 'implicit',
+ 'value',
+ 'explicit',
+ 'version',
+ 'tags',
+ ]:
+ py_value = getattr(py_event, attribute, None)
+ c_value = getattr(c_event, attribute, None)
+ assert py_value == c_value, (py_event, c_event, attribute)
+ finally:
+ if verbose:
+ print('PY_EVENTS:')
+ pprint.pprint(py_events)
+ print('C_EVENTS:')
+ pprint.pprint(c_events)
+
+
+def test_c_parser(data_filename, canonical_filename, verbose=False):
+ with open(data_filename, 'rb') as fp0:
+ with open(data_filename, 'rb') as fp1:
+ _compare_parsers(fp0, fp1, verbose)
+ with open(data_filename, 'rb') as fp0:
+ with open(data_filename, 'rb') as fp1:
+ _compare_parsers(fp0.read(), fp1.read(), verbose)
+ with open(canonical_filename, 'rb') as fp0:
+ with open(canonical_filename, 'rb') as fp1:
+ _compare_parsers(fp0, fp1, verbose)
+ with open(canonical_filename, 'rb') as fp0:
+ with open(canonical_filename, 'rb') as fp1:
+ _compare_parsers(fp0.read(), fp1.read(), verbose)
+
+
+test_c_parser.unittest = ['.data', '.canonical']
+test_c_parser.skip = ['.skip-ext']
+
+
+def _compare_emitters(data, verbose):
+ yaml = ruamel.yaml.YAML(typ='unsafe', pure=True)
+ events = list(yaml.parse(py_data, Loader=ruamel.yaml.PyLoader))
+ c_data = yaml.emit(events, Dumper=ruamel.yaml.CDumper)
+ if verbose:
+ print(c_data)
+ py_events = list(yaml.parse(c_data, Loader=ruamel.yaml.PyLoader))
+ c_events = list(yaml.parse(c_data, Loader=ruamel.yaml.CLoader))
+ try:
+ assert len(events) == len(py_events), (len(events), len(py_events))
+ assert len(events) == len(c_events), (len(events), len(c_events))
+ for event, py_event, c_event in zip(events, py_events, c_events):
+ for attribute in [
+ '__class__',
+ 'anchor',
+ 'tag',
+ 'implicit',
+ 'value',
+ 'explicit',
+ 'version',
+ 'tags',
+ ]:
+ value = getattr(event, attribute, None)
+ py_value = getattr(py_event, attribute, None)
+ c_value = getattr(c_event, attribute, None)
+ if (
+ attribute == 'tag'
+ and value in [None, '!']
+ and py_value in [None, '!']
+ and c_value in [None, '!']
+ ):
+ continue
+ if attribute == 'explicit' and (py_value or c_value):
+ continue
+ assert value == py_value, (event, py_event, attribute)
+ assert value == c_value, (event, c_event, attribute)
+ finally:
+ if verbose:
+ print('EVENTS:')
+ pprint.pprint(events)
+ print('PY_EVENTS:')
+ pprint.pprint(py_events)
+ print('C_EVENTS:')
+ pprint.pprint(c_events)
+
+
+def test_c_emitter(data_filename, canonical_filename, verbose=False):
+ with open(data_filename, 'rb') as fp0:
+ _compare_emitters(fp0.read(), verbose)
+ with open(canonical_filename, 'rb') as fp0:
+ _compare_emitters(fp0.read(), verbose)
+
+
+test_c_emitter.unittest = ['.data', '.canonical']
+test_c_emitter.skip = ['.skip-ext']
+
+
+def wrap_ext_function(function):
+ def wrapper(*args, **kwds):
+ _set_up()
+ try:
+ function(*args, **kwds)
+ finally:
+ _tear_down()
+
+ wrapper.__name__ = '%s_ext' % function.__name__
+ wrapper.unittest = function.unittest
+ wrapper.skip = getattr(function, 'skip', []) + ['.skip-ext']
+ return wrapper
+
+
+def wrap_ext(collections):
+ functions = []
+ if not isinstance(collections, list):
+ collections = [collections]
+ for collection in collections:
+ if not isinstance(collection, dict):
+ collection = vars(collection)
+ for key in sorted(collection):
+ value = collection[key]
+ if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+ functions.append(wrap_ext_function(value))
+ for function in functions:
+ assert function.__name__ not in globals()
+ globals()[function.__name__] = function
+
+
+import test_tokens # NOQA
+import test_structure # NOQA
+import test_errors # NOQA
+import test_resolver # NOQA
+import test_constructor # NOQA
+import test_emitter # NOQA
+import test_representer # NOQA
+import test_recursive # NOQA
+import test_input_output # NOQA
+
+wrap_ext(
+ [
+ test_tokens,
+ test_structure,
+ test_errors,
+ test_resolver,
+ test_constructor,
+ test_emitter,
+ test_representer,
+ test_recursive,
+ test_input_output,
+ ]
+)
+
+if __name__ == '__main__':
+ import sys
+ import test_appliance
+
+ sys.exit(test_appliance.run(globals()))