From cbbc936ed9811bdb5dd480bc2c5e10c3062532be Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 02:33:55 +0200 Subject: Merging upstream version 0.18.6. Signed-off-by: Daniel Baumann --- _test/lib/canonical.py | 388 +++++++++++++++++++++++++++++++++++++++ _test/lib/test_all.py | 20 ++ _test/lib/test_appliance.py | 209 +++++++++++++++++++++ _test/lib/test_build.py | 16 ++ _test/lib/test_build_ext.py | 17 ++ _test/lib/test_canonical.py | 51 ++++++ _test/lib/test_constructor.py | 372 +++++++++++++++++++++++++++++++++++++ _test/lib/test_emitter.py | 132 ++++++++++++++ _test/lib/test_errors.py | 92 ++++++++++ _test/lib/test_input_output.py | 180 ++++++++++++++++++ _test/lib/test_mark.py | 35 ++++ _test/lib/test_reader.py | 44 +++++ _test/lib/test_recursive.py | 58 ++++++ _test/lib/test_representer.py | 51 ++++++ _test/lib/test_resolver.py | 111 ++++++++++++ _test/lib/test_structure.py | 225 +++++++++++++++++++++++ _test/lib/test_tokens.py | 86 +++++++++ _test/lib/test_yaml.py | 20 ++ _test/lib/test_yaml_ext.py | 403 +++++++++++++++++++++++++++++++++++++++++ 19 files changed, 2510 insertions(+) create mode 100644 _test/lib/canonical.py create mode 100644 _test/lib/test_all.py create mode 100644 _test/lib/test_appliance.py create mode 100644 _test/lib/test_build.py create mode 100644 _test/lib/test_build_ext.py create mode 100644 _test/lib/test_canonical.py create mode 100644 _test/lib/test_constructor.py create mode 100644 _test/lib/test_emitter.py create mode 100644 _test/lib/test_errors.py create mode 100644 _test/lib/test_input_output.py create mode 100644 _test/lib/test_mark.py create mode 100644 _test/lib/test_reader.py create mode 100644 _test/lib/test_recursive.py create mode 100644 _test/lib/test_representer.py create mode 100644 _test/lib/test_resolver.py create mode 100644 _test/lib/test_structure.py create mode 100644 _test/lib/test_tokens.py create mode 100644 _test/lib/test_yaml.py create mode 100644 _test/lib/test_yaml_ext.py (limited to '_test/lib') diff --git a/_test/lib/canonical.py b/_test/lib/canonical.py new file mode 100644 index 0000000..31c9728 --- /dev/null +++ b/_test/lib/canonical.py @@ -0,0 +1,388 @@ + +import ruamel.yaml +from ruamel.yaml.composer import Composer +from ruamel.yaml.constructor import Constructor +from ruamel.yaml.resolver import Resolver + + +class CanonicalError(ruamel.yaml.YAMLError): + pass + + +class CanonicalScanner: + def __init__(self, data): + try: + if isinstance(data, bytes): + data = data.decode('utf-8') + except UnicodeDecodeError: + raise CanonicalError('utf-8 stream is expected') + self.data = data + '\0' + self.index = 0 + self.tokens = [] + self.scanned = False + + def check_token(self, *choices): + if not self.scanned: + self.scan() + if self.tokens: + if not choices: + return True + for choice in choices: + if isinstance(self.tokens[0], choice): + return True + return False + + def peek_token(self): + if not self.scanned: + self.scan() + if self.tokens: + return self.tokens[0] + + def get_token(self, choice=None): + if not self.scanned: + self.scan() + token = self.tokens.pop(0) + if choice and not isinstance(token, choice): + raise CanonicalError('unexpected token ' + repr(token)) + return token + + def get_token_value(self): + token = self.get_token() + return token.value + + def scan(self): + self.tokens.append(ruamel.yaml.StreamStartToken(None, None)) + while True: + self.find_token() + ch = self.data[self.index] + if ch == '\0': + self.tokens.append(ruamel.yaml.StreamEndToken(None, None)) + break + elif ch == '%': + self.tokens.append(self.scan_directive()) + elif ch == '-' and self.data[self.index : self.index + 3] == '---': + self.index += 3 + self.tokens.append(ruamel.yaml.DocumentStartToken(None, None)) + elif ch == '[': + self.index += 1 + self.tokens.append(ruamel.yaml.FlowSequenceStartToken(None, None)) + elif ch == '{': + self.index += 1 + self.tokens.append(ruamel.yaml.FlowMappingStartToken(None, None)) + elif ch == ']': + self.index += 1 + self.tokens.append(ruamel.yaml.FlowSequenceEndToken(None, None)) + elif ch == '}': + self.index += 1 + self.tokens.append(ruamel.yaml.FlowMappingEndToken(None, None)) + elif ch == '?': + self.index += 1 + self.tokens.append(ruamel.yaml.KeyToken(None, None)) + elif ch == ':': + self.index += 1 + self.tokens.append(ruamel.yaml.ValueToken(None, None)) + elif ch == ',': + self.index += 1 + self.tokens.append(ruamel.yaml.FlowEntryToken(None, None)) + elif ch == '*' or ch == '&': + self.tokens.append(self.scan_alias()) + elif ch == '!': + self.tokens.append(self.scan_tag()) + elif ch == '"': + self.tokens.append(self.scan_scalar()) + else: + raise CanonicalError('invalid token') + self.scanned = True + + DIRECTIVE = '%YAML 1.1' + + def scan_directive(self): + if ( + self.data[self.index : self.index + len(self.DIRECTIVE)] == self.DIRECTIVE + and self.data[self.index + len(self.DIRECTIVE)] in ' \n\0' + ): + self.index += len(self.DIRECTIVE) + return ruamel.yaml.DirectiveToken('YAML', (1, 1), None, None) + else: + raise CanonicalError('invalid directive') + + def scan_alias(self): + if self.data[self.index] == '*': + TokenClass = ruamel.yaml.AliasToken + else: + TokenClass = ruamel.yaml.AnchorToken + self.index += 1 + start = self.index + while self.data[self.index] not in ', \n\0': + self.index += 1 + value = self.data[start : self.index] + return TokenClass(value, None, None) + + def scan_tag(self): + self.index += 1 + start = self.index + while self.data[self.index] not in ' \n\0': + self.index += 1 + value = self.data[start : self.index] + if not value: + value = '!' + elif value[0] == '!': + value = 'tag:yaml.org,2002:' + value[1:] + elif value[0] == '<' and value[-1] == '>': + value = value[1:-1] + else: + value = '!' + value + return ruamel.yaml.TagToken(value, None, None) + + QUOTE_CODES = {'x': 2, 'u': 4, 'U': 8} + + QUOTE_REPLACES = { + '\\': '\\', + '"': '"', + ' ': ' ', + 'a': '\x07', + 'b': '\x08', + 'e': '\x1B', + 'f': '\x0C', + 'n': '\x0A', + 'r': '\x0D', + 't': '\x09', + 'v': '\x0B', + 'N': '\u0085', + 'L': '\u2028', + 'P': '\u2029', + '_': '_', + '0': '\x00', + } + + def scan_scalar(self): + self.index += 1 + chunks = [] + start = self.index + ignore_spaces = False + while self.data[self.index] != '"': + if self.data[self.index] == '\\': + ignore_spaces = False + chunks.append(self.data[start : self.index]) + self.index += 1 + ch = self.data[self.index] + self.index += 1 + if ch == '\n': + ignore_spaces = True + elif ch in self.QUOTE_CODES: + length = self.QUOTE_CODES[ch] + code = int(self.data[self.index : self.index + length], 16) + chunks.append(chr(code)) + self.index += length + else: + if ch not in self.QUOTE_REPLACES: + raise CanonicalError('invalid escape code') + chunks.append(self.QUOTE_REPLACES[ch]) + start = self.index + elif self.data[self.index] == '\n': + chunks.append(self.data[start : self.index]) + chunks.append(' ') + self.index += 1 + start = self.index + ignore_spaces = True + elif ignore_spaces and self.data[self.index] == ' ': + self.index += 1 + start = self.index + else: + ignore_spaces = False + self.index += 1 + chunks.append(self.data[start : self.index]) + self.index += 1 + return ruamel.yaml.ScalarToken("".join(chunks), False, None, None) + + def find_token(self): + found = False + while not found: + while self.data[self.index] in ' \t': + self.index += 1 + if self.data[self.index] == '#': + while self.data[self.index] != '\n': + self.index += 1 + if self.data[self.index] == '\n': + self.index += 1 + else: + found = True + + +class CanonicalParser: + def __init__(self): + self.events = [] + self.parsed = False + + def dispose(self): + pass + + # stream: STREAM-START document* STREAM-END + def parse_stream(self): + self.get_token(ruamel.yaml.StreamStartToken) + self.events.append(ruamel.yaml.StreamStartEvent(None, None)) + while not self.check_token(ruamel.yaml.StreamEndToken): + if self.check_token(ruamel.yaml.DirectiveToken, ruamel.yaml.DocumentStartToken): + self.parse_document() + else: + raise CanonicalError('document is expected, got ' + repr(self.tokens[0])) + self.get_token(ruamel.yaml.StreamEndToken) + self.events.append(ruamel.yaml.StreamEndEvent(None, None)) + + # document: DIRECTIVE? DOCUMENT-START node + def parse_document(self): + # node = None + if self.check_token(ruamel.yaml.DirectiveToken): + self.get_token(ruamel.yaml.DirectiveToken) + self.get_token(ruamel.yaml.DocumentStartToken) + self.events.append(ruamel.yaml.DocumentStartEvent(None, None)) + self.parse_node() + self.events.append(ruamel.yaml.DocumentEndEvent(None, None)) + + # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) + def parse_node(self): + if self.check_token(ruamel.yaml.AliasToken): + self.events.append(ruamel.yaml.AliasEvent(self.get_token_value(), None, None)) + else: + anchor = None + if self.check_token(ruamel.yaml.AnchorToken): + anchor = self.get_token_value() + tag = None + if self.check_token(ruamel.yaml.TagToken): + tag = self.get_token_value() + if self.check_token(ruamel.yaml.ScalarToken): + self.events.append( + ruamel.yaml.ScalarEvent( + anchor, tag, (False, False), self.get_token_value(), None, None + ) + ) + elif self.check_token(ruamel.yaml.FlowSequenceStartToken): + self.events.append(ruamel.yaml.SequenceStartEvent(anchor, tag, None, None)) + self.parse_sequence() + elif self.check_token(ruamel.yaml.FlowMappingStartToken): + self.events.append(ruamel.yaml.MappingStartEvent(anchor, tag, None, None)) + self.parse_mapping() + else: + raise CanonicalError( + "SCALAR, '[', or '{' is expected, got " + repr(self.tokens[0]) + ) + + # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END + def parse_sequence(self): + self.get_token(ruamel.yaml.FlowSequenceStartToken) + if not self.check_token(ruamel.yaml.FlowSequenceEndToken): + self.parse_node() + while not self.check_token(ruamel.yaml.FlowSequenceEndToken): + self.get_token(ruamel.yaml.FlowEntryToken) + if not self.check_token(ruamel.yaml.FlowSequenceEndToken): + self.parse_node() + self.get_token(ruamel.yaml.FlowSequenceEndToken) + self.events.append(ruamel.yaml.SequenceEndEvent(None, None)) + + # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END + def parse_mapping(self): + self.get_token(ruamel.yaml.FlowMappingStartToken) + if not self.check_token(ruamel.yaml.FlowMappingEndToken): + self.parse_map_entry() + while not self.check_token(ruamel.yaml.FlowMappingEndToken): + self.get_token(ruamel.yaml.FlowEntryToken) + if not self.check_token(ruamel.yaml.FlowMappingEndToken): + self.parse_map_entry() + self.get_token(ruamel.yaml.FlowMappingEndToken) + self.events.append(ruamel.yaml.MappingEndEvent(None, None)) + + # map_entry: KEY node VALUE node + def parse_map_entry(self): + self.get_token(ruamel.yaml.KeyToken) + self.parse_node() + self.get_token(ruamel.yaml.ValueToken) + self.parse_node() + + def parse(self): + self.parse_stream() + self.parsed = True + + def get_event(self): + if not self.parsed: + self.parse() + return self.events.pop(0) + + def check_event(self, *choices): + if not self.parsed: + self.parse() + if self.events: + if not choices: + return True + for choice in choices: + if isinstance(self.events[0], choice): + return True + return False + + def peek_event(self): + if not self.parsed: + self.parse() + return self.events[0] + + +class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver): + def __init__(self, stream): + if hasattr(stream, 'read'): + stream = stream.read() + CanonicalScanner.__init__(self, stream) + CanonicalParser.__init__(self) + Composer.__init__(self) + Constructor.__init__(self) + Resolver.__init__(self) + + +ruamel.yaml.CanonicalLoader = CanonicalLoader + + +def canonical_scan(stream): + yaml = ruamel.yaml.YAML() + yaml.scanner = CanonicalScanner + return yaml.scan(stream) + + +ruamel.yaml.canonical_scan = canonical_scan + + +def canonical_parse(stream): + yaml = ruamel.yaml.YAML() + return yaml.parse(stream, Loader=CanonicalLoader) + + +ruamel.yaml.canonical_parse = canonical_parse + + +def canonical_compose(stream): + yaml = ruamel.yaml.YAML() + return yaml.compose(stream, Loader=CanonicalLoader) + + +ruamel.yaml.canonical_compose = canonical_compose + + +def canonical_compose_all(stream): + yaml = ruamel.yaml.YAML() + return yaml.compose_all(stream, Loader=CanonicalLoader) + + +ruamel.yaml.canonical_compose_all = canonical_compose_all + + +def canonical_load(stream): + yaml = ruamel.yaml.YAML() + return yaml.load(stream, Loader=CanonicalLoader) + + +ruamel.yaml.canonical_load = canonical_load + + +def canonical_load_all(stream): + yaml = ruamel.yaml.YAML(typ='safe', pure=True) + yaml.Loader = CanonicalLoader + return yaml.load_all(stream) + + +ruamel.yaml.canonical_load_all = canonical_load_all diff --git a/_test/lib/test_all.py b/_test/lib/test_all.py new file mode 100644 index 0000000..8099ec8 --- /dev/null +++ b/_test/lib/test_all.py @@ -0,0 +1,20 @@ + +import sys # NOQA +import ruamel.yaml +import test_appliance + + +def main(args=None): + collections = [] + import test_yaml + + collections.append(test_yaml) + if ruamel.yaml.__with_libyaml__: + import test_yaml_ext + + collections.append(test_yaml_ext) + test_appliance.run(collections, args) + + +if __name__ == '__main__': + main() diff --git a/_test/lib/test_appliance.py b/_test/lib/test_appliance.py new file mode 100644 index 0000000..d624ebe --- /dev/null +++ b/_test/lib/test_appliance.py @@ -0,0 +1,209 @@ + +import sys +import os +import types +import traceback +import pprint +import argparse + +# DATA = 'tests/data' +# determine the position of data dynamically relative to program +# this allows running test while the current path is not the top of the +# repository, e.g. from the tests/data directory: python ../test_yaml.py +DATA = __file__.rsplit(os.sep, 2)[0] + '/data' + + +def find_test_functions(collections): + if not isinstance(collections, list): + collections = [collections] + functions = [] + for collection in collections: + if not isinstance(collection, dict): + collection = vars(collection) + for key in sorted(collection): + value = collection[key] + if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): + functions.append(value) + return functions + + +def find_test_filenames(directory): + filenames = {} + for filename in os.listdir(directory): + if os.path.isfile(os.path.join(directory, filename)): + base, ext = os.path.splitext(filename) + # ToDo: remove + if base.endswith('-py2'): + continue + filenames.setdefault(base, []).append(ext) + filenames = sorted(filenames.items()) + return filenames + + +def parse_arguments(args): + """""" + parser = argparse.ArgumentParser( + usage=""" run the yaml tests. By default + all functions on all appropriate test_files are run. Functions have + unittest attributes that determine the required extensions to filenames + that need to be available in order to run that test. E.g.\n\n + python test_yaml.py test_constructor_types\n + python test_yaml.py --verbose test_tokens spec-02-05\n\n + The presence of an extension in the .skip attribute of a function + disables the test for that function.""" + ) + # ToDo: make into int and test > 0 in functions + parser.add_argument( + '--verbose', + '-v', + action='store_true', + default='YAML_TEST_VERBOSE' in os.environ, + help='set verbosity output', + ) + parser.add_argument( + '--list-functions', + action='store_true', + help="""list all functions with required file extensions for test files + """, + ) + parser.add_argument('function', nargs='?', help="""restrict function to run""") + parser.add_argument( + 'filenames', + nargs='*', + help="""basename of filename set, extensions (.code, .data) have to + be a superset of those in the unittest attribute of the selected + function""", + ) + args = parser.parse_args(args) + # print('args', args) + verbose = args.verbose + include_functions = [args.function] if args.function else [] + include_filenames = args.filenames + # if args is None: + # args = sys.argv[1:] + # verbose = False + # if '-v' in args: + # verbose = True + # args.remove('-v') + # if '--verbose' in args: + # verbose = True + # args.remove('--verbose') # never worked without this + # if 'YAML_TEST_VERBOSE' in os.environ: + # verbose = True + # include_functions = [] + # if args: + # include_functions.append(args.pop(0)) + if 'YAML_TEST_FUNCTIONS' in os.environ: + include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) + # include_filenames = [] + # include_filenames.extend(args) + if 'YAML_TEST_FILENAMES' in os.environ: + include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) + return include_functions, include_filenames, verbose, args + + +def execute(function, filenames, verbose): + name = function.__name__ + if verbose: + sys.stdout.write('=' * 75 + '\n') + sys.stdout.write('%s(%s)...\n' % (name, ', '.join(filenames))) + try: + function(verbose=verbose, *filenames) + except Exception as exc: + info = sys.exc_info() + if isinstance(exc, AssertionError): + kind = 'FAILURE' + else: + kind = 'ERROR' + if verbose: + traceback.print_exc(limit=1, file=sys.stdout) + else: + sys.stdout.write(kind[0]) + sys.stdout.flush() + else: + kind = 'SUCCESS' + info = None + if not verbose: + sys.stdout.write('.') + sys.stdout.flush() + return (name, filenames, kind, info) + + +def display(results, verbose): + if results and not verbose: + sys.stdout.write('\n') + total = len(results) + failures = 0 + errors = 0 + for name, filenames, kind, info in results: + if kind == 'SUCCESS': + continue + if kind == 'FAILURE': + failures += 1 + if kind == 'ERROR': + errors += 1 + sys.stdout.write('=' * 75 + '\n') + sys.stdout.write('%s(%s): %s\n' % (name, ', '.join(filenames), kind)) + if kind == 'ERROR': + traceback.print_exception(file=sys.stdout, *info) + else: + sys.stdout.write('Traceback (most recent call last):\n') + traceback.print_tb(info[2], file=sys.stdout) + sys.stdout.write('%s: see below\n' % info[0].__name__) + sys.stdout.write('~' * 75 + '\n') + for arg in info[1].args: + pprint.pprint(arg, stream=sys.stdout) + for filename in filenames: + sys.stdout.write('-' * 75 + '\n') + sys.stdout.write('%s:\n' % filename) + with open(filename, 'r', errors='replace') as fp: + data = fp.read() + sys.stdout.write(data) + if data and data[-1] != '\n': + sys.stdout.write('\n') + sys.stdout.write('=' * 75 + '\n') + sys.stdout.write('TESTS: %s\n' % total) + ret_val = 0 + if failures: + sys.stdout.write('FAILURES: %s\n' % failures) + ret_val = 1 + if errors: + sys.stdout.write('ERRORS: %s\n' % errors) + ret_val = 2 + return ret_val + + +def run(collections, args=None): + test_functions = find_test_functions(collections) + test_filenames = find_test_filenames(DATA) + include_functions, include_filenames, verbose, a = parse_arguments(args) + if a.list_functions: + print('test functions:') + for f in test_functions: + print(' {:30s} {}'.format(f.__name__, f.unittest)) + return + results = [] + for function in test_functions: + if include_functions and function.__name__ not in include_functions: + continue + if function.unittest: + for base, exts in test_filenames: + if include_filenames and base not in include_filenames: + continue + filenames = [] + for ext in function.unittest: + if ext not in exts: + break + filenames.append(os.path.join(DATA, base + ext)) + else: + skip_exts = getattr(function, 'skip', []) + for skip_ext in skip_exts: + if skip_ext in exts: + break + else: + result = execute(function, filenames, verbose) + results.append(result) + else: + result = execute(function, [], verbose) + results.append(result) + return display(results, verbose=verbose) diff --git a/_test/lib/test_build.py b/_test/lib/test_build.py new file mode 100644 index 0000000..f7837eb --- /dev/null +++ b/_test/lib/test_build.py @@ -0,0 +1,16 @@ + +if __name__ == '__main__': + import sys + import os + import distutils.util + + build_lib = 'build/lib' + build_lib_ext = os.path.join( + 'build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]) + ) + sys.path.insert(0, build_lib) + sys.path.insert(0, build_lib_ext) + import test_yaml + import test_appliance + + test_appliance.run(test_yaml) diff --git a/_test/lib/test_build_ext.py b/_test/lib/test_build_ext.py new file mode 100644 index 0000000..1a58fd2 --- /dev/null +++ b/_test/lib/test_build_ext.py @@ -0,0 +1,17 @@ + + +if __name__ == '__main__': + import sys + import os + import distutils.util + + build_lib = 'build/lib' + build_lib_ext = os.path.join( + 'build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]) + ) + sys.path.insert(0, build_lib) + sys.path.insert(0, build_lib_ext) + import test_yaml_ext + import test_appliance + + test_appliance.run(test_yaml_ext) diff --git a/_test/lib/test_canonical.py b/_test/lib/test_canonical.py new file mode 100644 index 0000000..b5cd14d --- /dev/null +++ b/_test/lib/test_canonical.py @@ -0,0 +1,51 @@ + + +import ruamel.yaml +import canonical # NOQA + + +def test_canonical_scanner(canonical_filename, verbose=False): + with open(canonical_filename, 'rb') as fp0: + data = fp0.read() + tokens = list(ruamel.yaml.canonical_scan(data)) + assert tokens, tokens + if verbose: + for token in tokens: + print(token) + + +test_canonical_scanner.unittest = ['.canonical'] + + +def test_canonical_parser(canonical_filename, verbose=False): + with open(canonical_filename, 'rb') as fp0: + data = fp0.read() + events = list(ruamel.yaml.canonical_parse(data)) + assert events, events + if verbose: + for event in events: + print(event) + + +test_canonical_parser.unittest = ['.canonical'] + + +def test_canonical_error(data_filename, canonical_filename, verbose=False): + with open(data_filename, 'rb') as fp0: + data = fp0.read() + try: + output = list(ruamel.yaml.canonical_load_all(data)) # NOQA + except ruamel.yaml.YAMLError as exc: + if verbose: + print(exc) + else: + raise AssertionError('expected an exception') + + +test_canonical_error.unittest = ['.data', '.canonical'] +test_canonical_error.skip = ['.empty'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_constructor.py b/_test/lib/test_constructor.py new file mode 100644 index 0000000..b38bf2f --- /dev/null +++ b/_test/lib/test_constructor.py @@ -0,0 +1,372 @@ + +import ruamel.yaml +import pprint + +import datetime + +try: + set +except NameError: + from sets import Set as set # NOQA +import ruamel.yaml.tokens + + +def execute(code): + global value + exec(code) + return value + + +def _make_objects(): + global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLobject1, YAMLobject2, AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, FixedOffset, today, execute + + class MyLoader(ruamel.yaml.Loader): + pass + + class MyDumper(ruamel.yaml.Dumper): + pass + + class MyTestClass1: + def __init__(self, x, y=0, z=0): + self.x = x + self.y = y + self.z = z + + def __eq__(self, other): + if isinstance(other, MyTestClass1): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + def construct1(constructor, node): + mapping = constructor.construct_mapping(node) + return MyTestClass1(**mapping) + + def represent1(representer, native): + return representer.represent_mapping('!tag1', native.__dict__) + + ruamel.yaml.add_constructor('!tag1', construct1, Loader=MyLoader) + ruamel.yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper) + + class MyTestClass2(MyTestClass1, ruamel.yaml.YAMLObject): + ruamel.yaml.loader = MyLoader + ruamel.yaml.dumper = MyDumper + ruamel.yaml.tag = '!tag2' + + def from_yaml(cls, constructor, node): + x = constructor.construct_yaml_int(node) + return cls(x=x) + + from_yaml = classmethod(from_yaml) + + def to_yaml(cls, representer, native): + return representer.represent_scalar(cls.yaml_tag, str(native.x)) + + to_yaml = classmethod(to_yaml) + + class MyTestClass3(MyTestClass2): + ruamel.yaml.tag = '!tag3' + + def from_yaml(cls, constructor, node): + mapping = constructor.construct_mapping(node) + if '=' in mapping: + x = mapping['='] + del mapping['='] + mapping['x'] = x + return cls(**mapping) + + from_yaml = classmethod(from_yaml) + + def to_yaml(cls, representer, native): + return representer.represent_mapping(cls.yaml_tag, native.__dict__) + + to_yaml = classmethod(to_yaml) + + class YAMLobject1(ruamel.yaml.YAMLObject): + ruamel.yaml.loader = MyLoader + ruamel.yaml.dumper = MyDumper + ruamel.yaml.tag = '!foo' + + def __init__(self, my_parameter=None, my_another_parameter=None): + self.my_parameter = my_parameter + self.my_another_parameter = my_another_parameter + + def __eq__(self, other): + if isinstance(other, YAMLobject1): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + class YAMLobject2(ruamel.yaml.YAMLObject): + ruamel.yaml.loader = MyLoader + ruamel.yaml.dumper = MyDumper + ruamel.yaml.tag = '!bar' + + def __init__(self, foo=1, bar=2, baz=3): + self.foo = foo + self.bar = bar + self.baz = baz + + def __getstate__(self): + return {1: self.foo, 2: self.bar, 3: self.baz} + + def __setstate__(self, state): + self.foo = state[1] + self.bar = state[2] + self.baz = state[3] + + def __eq__(self, other): + if isinstance(other, YAMLobject2): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + class AnObject: + def __new__(cls, foo=None, bar=None, baz=None): + self = object.__new__(cls) + self.foo = foo + self.bar = bar + self.baz = baz + return self + + def __cmp__(self, other): + return cmp( + (type(self), self.foo, self.bar, self.baz), # NOQA + (type(other), other.foo, other.bar, other.baz), + ) + + def __eq__(self, other): + return type(self) is type(other) and (self.foo, self.bar, self.baz) == ( + other.foo, + other.bar, + other.baz, + ) + + class AnInstance: + def __init__(self, foo=None, bar=None, baz=None): + self.foo = foo + self.bar = bar + self.baz = baz + + def __cmp__(self, other): + return cmp( + (type(self), self.foo, self.bar, self.baz), # NOQA + (type(other), other.foo, other.bar, other.baz), + ) + + def __eq__(self, other): + return type(self) is type(other) and (self.foo, self.bar, self.baz) == ( + other.foo, + other.bar, + other.baz, + ) + + class AState(AnInstance): + def __getstate__(self): + return {'_foo': self.foo, '_bar': self.bar, '_baz': self.baz} + + def __setstate__(self, state): + self.foo = state['_foo'] + self.bar = state['_bar'] + self.baz = state['_baz'] + + class ACustomState(AnInstance): + def __getstate__(self): + return (self.foo, self.bar, self.baz) + + def __setstate__(self, state): + self.foo, self.bar, self.baz = state + + # class InitArgs(AnInstance): + # def __getinitargs__(self): + # return (self.foo, self.bar, self.baz) + # def __getstate__(self): + # return {} + + # class InitArgsWithState(AnInstance): + # def __getinitargs__(self): + # return (self.foo, self.bar) + # def __getstate__(self): + # return self.baz + # def __setstate__(self, state): + # self.baz = state + + class NewArgs(AnObject): + def __getnewargs__(self): + return (self.foo, self.bar, self.baz) + + def __getstate__(self): + return {} + + class NewArgsWithState(AnObject): + def __getnewargs__(self): + return (self.foo, self.bar) + + def __getstate__(self): + return self.baz + + def __setstate__(self, state): + self.baz = state + + InitArgs = NewArgs + + InitArgsWithState = NewArgsWithState + + class Reduce(AnObject): + def __reduce__(self): + return self.__class__, (self.foo, self.bar, self.baz) + + class ReduceWithState(AnObject): + def __reduce__(self): + return self.__class__, (self.foo, self.bar), self.baz + + def __setstate__(self, state): + self.baz = state + + class MyInt(int): + def __eq__(self, other): + return type(self) is type(other) and int(self) == int(other) + + class MyList(list): + def __init__(self, n=1): + self.extend([None] * n) + + def __eq__(self, other): + return type(self) is type(other) and list(self) == list(other) + + class MyDict(dict): + def __init__(self, n=1): + for k in range(n): + self[k] = None + + def __eq__(self, other): + return type(self) is type(other) and dict(self) == dict(other) + + class FixedOffset(datetime.tzinfo): + def __init__(self, offset, name): + self.__offset = datetime.timedelta(minutes=offset) + self.__name = name + + def utcoffset(self, dt): + return self.__offset + + def tzname(self, dt): + return self.__name + + def dst(self, dt): + return datetime.timedelta(0) + + today = datetime.date.today() + + +try: + from ruamel.ordereddict import ordereddict +except ImportError: + from collections import OrderedDict + + # to get the right name import ... as ordereddict doesn't do that + + class ordereddict(OrderedDict): + pass + + +def _load_code(expression): + return eval(expression, globals()) + + +def _serialize_value(data): + if isinstance(data, list): + return '[%s]' % ', '.join(map(_serialize_value, data)) + elif isinstance(data, dict): + items = [] + for key, value in data.items(): + key = _serialize_value(key) + value = _serialize_value(value) + items.append('%s: %s' % (key, value)) + items.sort() + return '{%s}' % ', '.join(items) + elif isinstance(data, datetime.datetime): + return repr(data.utctimetuple()) + elif isinstance(data, float) and data != data: + return '?' + else: + return str(data) + + +def test_constructor_types(data_filename, code_filename, verbose=False): + _make_objects() + native1 = None + native2 = None + yaml = ruamel.yaml.YAML(typ='safe', pure=True) + yaml.loader = MyLoader + try: + with open(data_filename, 'rb') as fp0: + native1 = list(yaml.load_all(fp0)) + if len(native1) == 1: + native1 = native1[0] + with open(code_filename, 'rb') as fp0: + native2 = _load_code(fp0.read()) + try: + if native1 == native2: + return + except TypeError: + pass + # print('native1', native1) + if verbose: + print('SERIALIZED NATIVE1:') + print(_serialize_value(native1)) + print('SERIALIZED NATIVE2:') + print(_serialize_value(native2)) + assert _serialize_value(native1) == _serialize_value(native2), (native1, native2) + finally: + if verbose: + print('NATIVE1:') + pprint.pprint(native1) + print('NATIVE2:') + pprint.pprint(native2) + + +test_constructor_types.unittest = ['.data', '.code'] + + +def test_roundtrip_data(code_filename, roundtrip_filename, verbose=False): + _make_objects() + with open(code_filename, 'rb') as fp0: + value1 = fp0.read() + yaml = YAML(typ='safe', pure=True) + yaml.Loader = MyLoader + native2 = list(yaml.load_all(value1)) + if len(native2) == 1: + native2 = native2[0] + try: + value2 = ruamel.yaml.dump( + native2, + Dumper=MyDumper, + default_flow_style=False, + allow_unicode=True, + encoding='utf-8', + ) + # value2 += x + if verbose: + print('SERIALIZED NATIVE1:') + print(value1) + print('SERIALIZED NATIVE2:') + print(value2) + assert value1 == value2, (value1, value2) + finally: + if verbose: + print('NATIVE2:') + pprint.pprint(native2) + + +test_roundtrip_data.unittest = ['.data', '.roundtrip'] + + +if __name__ == '__main__': + import sys + import test_constructor # NOQA + + sys.modules['test_constructor'] = sys.modules['__main__'] + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_emitter.py b/_test/lib/test_emitter.py new file mode 100644 index 0000000..b1991e3 --- /dev/null +++ b/_test/lib/test_emitter.py @@ -0,0 +1,132 @@ +from __future__ import absolute_import +from __future__ import print_function + +import ruamel.yaml +from ruamel.yaml import YAML + + +def _compare_events(events1, events2): + assert len(events1) == len(events2), (events1, events2) + for event1, event2 in zip(events1, events2): + assert event1.__class__ == event2.__class__, (event1, event2) + if isinstance(event1, yaml.NodeEvent): + assert event1.anchor == event2.anchor, (event1, event2) + if isinstance(event1, yaml.CollectionStartEvent): + assert event1.tag == event2.tag, (event1, event2) + if isinstance(event1, yaml.ScalarEvent): + if True not in event1.implicit + event2.implicit: + assert event1.tag == event2.tag, (event1, event2) + assert event1.value == event2.value, (event1, event2) + + +def test_emitter_on_data(data_filename, canonical_filename, verbose=False): + with open(data_filename, 'rb') as fp0: + events = list(YAML().parse(fp0)) + output = YAML().emit(events) + if verbose: + print('OUTPUT:') + print(output) + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + + +test_emitter_on_data.unittest = ['.data', '.canonical'] + + +def test_emitter_on_canonical(canonical_filename, verbose=False): + with open(canonical_filename, 'rb') as fp0: + events = list(YAML().parse(fp0)) + for canonical in [False, True]: + output = YAML().emit(events, canonical=canonical) + if verbose: + print('OUTPUT (canonical=%s):' % canonical) + print(output) + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + + +test_emitter_on_canonical.unittest = ['.canonical'] + + +def test_emitter_styles(data_filename, canonical_filename, verbose=False): + for filename in [data_filename, canonical_filename]: + with open(filename, 'rb') as fp0: + events = list(YAML().parse(fp0)) + for flow_style in [False, True]: + for style in ['|', '>', '"', "'", ""]: + styled_events = [] + for event in events: + if isinstance(event, yaml.ScalarEvent): + event = yaml.ScalarEvent( + event.anchor, event.tag, event.implicit, event.value, style=style + ) + elif isinstance(event, yaml.SequenceStartEvent): + event = yaml.SequenceStartEvent( + event.anchor, event.tag, event.implicit, flow_style=flow_style + ) + elif isinstance(event, yaml.MappingStartEvent): + event = yaml.MappingStartEvent( + event.anchor, event.tag, event.implicit, flow_style=flow_style + ) + styled_events.append(event) + output = YAML().emit(styled_events) + if verbose: + print( + 'OUTPUT (filename=%r, flow_style=%r, style=%r)' + % (filename, flow_style, style) + ) + print(output) + new_events = list(YAML().parse(output)) + _compare_events(events, new_events) + + +test_emitter_styles.unittest = ['.data', '.canonical'] + + +class EventsLoader(ruamel.yaml.Loader): + def construct_event(self, node): + if isinstance(node, ruamel.yaml.ScalarNode): + mapping = {} + else: + mapping = self.construct_mapping(node) + class_name = str(node.tag[1:]) + 'Event' + if class_name in [ + 'AliasEvent', + 'ScalarEvent', + 'SequenceStartEvent', + 'MappingStartEvent', + ]: + mapping.setdefault('anchor', None) + if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: + mapping.setdefault('tag', None) + if class_name in ['SequenceStartEvent', 'MappingStartEvent']: + mapping.setdefault('implicit', True) + if class_name == 'ScalarEvent': + mapping.setdefault('implicit', (False, True)) + mapping.setdefault('value', "") + value = getattr(yaml, class_name)(**mapping) + return value + + +# if Loader is not a composite, add this function +# EventsLoader.add_constructor = yaml.constructor.Constructor.add_constructor + + +EventsLoader.add_constructor(None, EventsLoader.construct_event) + + +def test_emitter_events(events_filename, verbose=False): + with open(events_filename, 'rb') as fp0: + events = list(YAML().load(fp0, Loader=EventsLoader)) + output = YAML().emit(events) + if verbose: + print('OUTPUT:') + print(output) + new_events = list(YAML().parse(output)) + _compare_events(events, new_events) + + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_errors.py b/_test/lib/test_errors.py new file mode 100644 index 0000000..c0fd3df --- /dev/null +++ b/_test/lib/test_errors.py @@ -0,0 +1,92 @@ + +import ruamel.yaml +YAML = ruamel.yaml.YAML + +import test_emitter +import warnings + +warnings.simplefilter('ignore', ruamel.yaml.error.UnsafeLoaderWarning) + + +def test_loader_error(error_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + try: + with open(error_filename, 'rb') as fp0: + list(yaml.load_all(fp0)) + except yaml.YAMLError as exc: + if verbose: + print('%s:' % exc.__class__.__name__, exc) + else: + raise AssertionError('expected an exception') + + +test_loader_error.unittest = ['.loader-error'] + + +def test_loader_error_string(error_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + try: + with open(error_filename, 'rb') as fp0: + list(yaml.load_all(fp0.read())) + except yaml.YAMLError as exc: + if verbose: + print('%s:' % exc.__class__.__name__, exc) + else: + raise AssertionError('expected an exception') + + +test_loader_error_string.unittest = ['.loader-error'] + + +def test_loader_error_single(error_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + try: + with open(error_filename, 'rb') as fp0: + yaml.load(fp0.read()) + except yaml.YAMLError as exc: + if verbose: + print('%s:' % exc.__class__.__name__, exc) + else: + raise AssertionError('expected an exception') + + +test_loader_error_single.unittest = ['.single-loader-error'] + + +def test_emitter_error(error_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(error_filename, 'rb') as fp0: + events = list(yaml.load(fp0, Loader=test_emitter.EventsLoader)) + try: + ruamel.yaml.emit(events) + except yaml.YAMLError as exc: + if verbose: + print('%s:' % exc.__class__.__name__, exc) + else: + raise AssertionError('expected an exception') + + +test_emitter_error.unittest = ['.emitter-error'] + + +def test_dumper_error(error_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(error_filename, 'rb') as fp0: + code = fp0.read() + try: + import yaml + + exec(code) + except yaml.YAMLError as exc: + if verbose: + print('%s:' % exc.__class__.__name__, exc) + else: + raise AssertionError('expected an exception') + + +test_dumper_error.unittest = ['.dumper-error'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_input_output.py b/_test/lib/test_input_output.py new file mode 100644 index 0000000..37bda3d --- /dev/null +++ b/_test/lib/test_input_output.py @@ -0,0 +1,180 @@ + +from ruamel.yaml import YAML +import codecs +import tempfile +import os +import os.path +from ruamel.yaml.compat import StringIO, BytesIO + +def test_unicode_input(unicode_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(unicode_filename, 'rb') as fp: + data = fp.read().decode('utf-8') + value = ' '.join(data.split()) + output = yaml.load(data) + assert output == value, (output, value) + output = yaml.load(StringIO(data)) + assert output == value, (output, value) + for input in [ + data.encode('utf-8'), + codecs.BOM_UTF8 + data.encode('utf-8'), + codecs.BOM_UTF16_BE + data.encode('utf-16-be'), + codecs.BOM_UTF16_LE + data.encode('utf-16-le'), + ]: + if verbose: + print('INPUT:', repr(input[:10]), '...') + output = yaml.load(input) + assert output == value, (output, value) + output = yaml.load(BytesIO(input)) + assert output == value, (output, value) + + +test_unicode_input.unittest = ['.unicode'] + + +def test_unicode_input_errors(unicode_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(unicode_filename, 'rb') as fp: + data = fp.read().decode('utf-8') + for input in [ + data.encode('latin1', 'ignore'), + data.encode('utf-16-be'), + data.encode('utf-16-le'), + codecs.BOM_UTF8 + data.encode('utf-16-be'), + codecs.BOM_UTF16_BE + data.encode('utf-16-le'), + codecs.BOM_UTF16_LE + data.encode('utf-8') + b'!', + ]: + try: + yaml.load(input) + except yaml.YAMLError as exc: + if verbose: + print(exc) + else: + raise AssertionError('expected an exception') + try: + yaml.load(BytesIO(input)) + except yaml.YAMLError as exc: + if verbose: + print(exc) + else: + raise AssertionError('expected an exception') + + +test_unicode_input_errors.unittest = ['.unicode'] + + +def test_unicode_output(unicode_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(unicode_filename, 'rb') as fp: + data = fp.read().decode('utf-8') + value = ' '.join(data.split()) + for allow_unicode in [False, True]: + data1 = yaml.dump(value, allow_unicode=allow_unicode) + for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']: + stream = StringIO() + yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode) + data2 = stream.getvalue() + data3 = yaml.dump(value, encoding=encoding, allow_unicode=allow_unicode) + if encoding is not None: + assert isinstance(data3, bytes) + data3 = data3.decode(encoding) + stream = BytesIO() + if encoding is None: + try: + yaml.dump( + value, stream, encoding=encoding, allow_unicode=allow_unicode + ) + except TypeError as exc: + if verbose: + print(exc) + data4 = None + else: + raise AssertionError('expected an exception') + else: + yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode) + data4 = stream.getvalue() + if verbose: + print('BYTES:', data4[:50]) + data4 = data4.decode(encoding) + for copy in [data1, data2, data3, data4]: + if copy is None: + continue + assert isinstance(copy, str) + if allow_unicode: + try: + copy[4:].encode('ascii') + except UnicodeEncodeError as exc: + if verbose: + print(exc) + else: + raise AssertionError('expected an exception') + else: + copy[4:].encode('ascii') + assert isinstance(data1, str), (type(data1), encoding) + assert isinstance(data2, str), (type(data2), encoding) + + +test_unicode_output.unittest = ['.unicode'] + + +def test_file_output(unicode_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(unicode_filename, 'rb') as fp: + data = fp.read().decode('utf-8') + handle, filename = tempfile.mkstemp() + os.close(handle) + try: + stream = StringIO() + yaml.dump(data, stream, allow_unicode=True) + data1 = stream.getvalue() + stream = BytesIO() + yaml.dump(data, stream, encoding='utf-16-le', allow_unicode=True) + data2 = stream.getvalue().decode('utf-16-le')[1:] + with open(filename, 'w', encoding='utf-16-le') as stream: + yaml.dump(data, stream, allow_unicode=True) + with open(filename, 'r', encoding='utf-16-le') as fp0: + data3 = fp0.read() + with open(filename, 'wb') as stream: + yaml.dump(data, stream, encoding='utf-8', allow_unicode=True) + with open(filename, 'r', encoding='utf-8') as fp0: + data4 = fp0.read() + assert data1 == data2, (data1, data2) + assert data1 == data3, (data1, data3) + assert data1 == data4, (data1, data4) + finally: + if os.path.exists(filename): + os.unlink(filename) + + +test_file_output.unittest = ['.unicode'] + + +def test_unicode_transfer(unicode_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + with open(unicode_filename, 'rb') as fp: + data = fp.read().decode('utf-8') + for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']: + input = data + if encoding is not None: + input = ('\ufeff' + input).encode(encoding) + output1 = yaml.emit(yaml.parse(input), allow_unicode=True) + if encoding is None: + stream = StringIO() + else: + stream = BytesIO() + yaml.emit(yaml.parse(input), stream, allow_unicode=True) + output2 = stream.getvalue() + assert isinstance(output1, str), (type(output1), encoding) + if encoding is None: + assert isinstance(output2, str), (type(output1), encoding) + else: + assert isinstance(output2, bytes), (type(output1), encoding) + output2.decode(encoding) + + +test_unicode_transfer.unittest = ['.unicode'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_mark.py b/_test/lib/test_mark.py new file mode 100644 index 0000000..2644a79 --- /dev/null +++ b/_test/lib/test_mark.py @@ -0,0 +1,35 @@ + +import ruamel.yaml as yaml + + +def test_marks(marks_filename, verbose=False): + with open(marks_filename, 'r') as fp0: + inputs = fp0.read().split('---\n')[1:] + for input in inputs: + index = 0 + line = 0 + column = 0 + while input[index] != '*': + if input[index] == '\n': + line += 1 + column = 0 + else: + column += 1 + index += 1 + mark = yaml.Mark(marks_filename, index, line, column, str(input), index) + snippet = mark.get_snippet(indent=2, max_length=79) + if verbose: + print(snippet) + assert isinstance(snippet, str), type(snippet) + assert snippet.count('\n') == 1, snippet.count('\n') + data, pointer = snippet.split('\n') + assert len(data) < 82, len(data) + assert data[len(pointer) - 1] == '*', data[len(pointer) - 1] + + +test_marks.unittest = ['.marks'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_reader.py b/_test/lib/test_reader.py new file mode 100644 index 0000000..16b9cd7 --- /dev/null +++ b/_test/lib/test_reader.py @@ -0,0 +1,44 @@ + +import codecs # NOQA +import io + +import ruamel.yaml.reader + + +def _run_reader(data, verbose): + try: + stream = ruamel.yaml.py.reader.Reader(data) + while stream.peek() != '\0': + stream.forward() + except ruamel.yaml.py.reader.ReaderError as exc: + if verbose: + print(exc) + else: + raise AssertionError('expected an exception') + + +def test_stream_error(error_filename, verbose=False): + with open(error_filename, 'rb') as fp0: + _run_reader(fp0, verbose) + with open(error_filename, 'rb') as fp0: + _run_reader(fp0.read(), verbose) + for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']: + try: + with open(error_filename, 'rb') as fp0: + data = fp0.read().decode(encoding) + break + except UnicodeDecodeError: + pass + else: + return + _run_reader(data, verbose) + with io.open(error_filename, encoding=encoding) as fp: + _run_reader(fp, verbose) + + +test_stream_error.unittest = ['.stream-error'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_recursive.py b/_test/lib/test_recursive.py new file mode 100644 index 0000000..88858e4 --- /dev/null +++ b/_test/lib/test_recursive.py @@ -0,0 +1,58 @@ + +import ruamel.yaml + + +class AnInstance: + def __init__(self, foo, bar): + self.foo = foo + self.bar = bar + + def __repr__(self): + try: + return '%s(foo=%r, bar=%r)' % (self.__class__.__name__, self.foo, self.bar) + except RuntimeError: + return '%s(foo=..., bar=...)' % self.__class__.__name__ + + +class AnInstanceWithState(AnInstance): + def __getstate__(self): + return {'attributes': [self.foo, self.bar]} + + def __setstate__(self, state): + self.foo, self.bar = state['attributes'] + + +def test_recursive(recursive_filename, verbose=False): + yaml = ruamel.yaml.YAML(typ='safe', pure=True) + context = globals().copy() + with open(recursive_filename, 'rb') as fp0: + exec(fp0.read(), context) + value1 = context['value'] + output1 = None + value2 = None + output2 = None + try: + buf = ruamel.yaml.compat.StringIO() + output1 = yaml.dump(value1, buf) + yaml.load(output1) + value2 = buf.getvalue() + buf = ruamel.yaml.compat.StringIO() + yaml.dump(value2, buf) + output2 = buf.getvalue() + assert output1 == output2, (output1, output2) + finally: + if verbose: + print('VALUE1:', value1) + print('VALUE2:', value2) + print('OUTPUT1:') + print(output1) + print('OUTPUT2:') + print(output2) + + +test_recursive.unittest = ['.recursive'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_representer.py b/_test/lib/test_representer.py new file mode 100644 index 0000000..5b2415d --- /dev/null +++ b/_test/lib/test_representer.py @@ -0,0 +1,51 @@ + +from ruamel.yaml import YAML +import test_constructor +import pprint + + +def test_representer_types(code_filename, verbose=False): + yaml = YAML(typ='safe', pure=True) + test_constructor._make_objects() + for allow_unicode in [False, True]: + for encoding in ['utf-8', 'utf-16-be', 'utf-16-le']: + with open(code_filename, 'rb') as fp0: + native1 = test_constructor._load_code(fp0.read()) + native2 = None + try: + output = yaml.dump( + native1, + Dumper=test_constructor.MyDumper, + allow_unicode=allow_unicode, + encoding=encoding, + ) + native2 = yaml.load(output, Loader=test_constructor.MyLoader) + try: + if native1 == native2: + continue + except TypeError: + pass + value1 = test_constructor._serialize_value(native1) + value2 = test_constructor._serialize_value(native2) + if verbose: + print('SERIALIZED NATIVE1:') + print(value1) + print('SERIALIZED NATIVE2:') + print(value2) + assert value1 == value2, (native1, native2) + finally: + if verbose: + print('NATIVE1:') + pprint.pprint(native1) + print('NATIVE2:') + pprint.pprint(native2) + print('OUTPUT:') + print(output) + + +test_representer_types.unittest = ['.code'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_resolver.py b/_test/lib/test_resolver.py new file mode 100644 index 0000000..b2b0839 --- /dev/null +++ b/_test/lib/test_resolver.py @@ -0,0 +1,111 @@ + +import ruamel.yaml +yaml = ruamel.yaml.YAML() +import pprint + + +def test_implicit_resolver(data_filename, detect_filename, verbose=False): + correct_tag = None + node = None + try: + with open(detect_filename, 'r') as fp0: + correct_tag = fp0.read().strip() + with open(data_filename, 'rb') as fp0: + node = yaml.compose(fp0) + assert isinstance(node, yaml.SequenceNode), node + for scalar in node.value: + assert isinstance(scalar, yaml.ScalarNode), scalar + assert scalar.tag == correct_tag, (scalar.tag, correct_tag) + finally: + if verbose: + print('CORRECT TAG:', correct_tag) + if hasattr(node, 'value'): + print('CHILDREN:') + pprint.pprint(node.value) + + +test_implicit_resolver.unittest = ['.data', '.detect'] + + +def _make_path_loader_and_dumper(): + global MyLoader, MyDumper + + class MyLoader(yaml.Loader): + pass + + class MyDumper(yaml.Dumper): + pass + + yaml.add_path_resolver('!root', [], Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver('!root/scalar', [], str, Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver( + '!root/key11/key12/*', ['key11', 'key12'], Loader=MyLoader, Dumper=MyDumper + ) + yaml.add_path_resolver('!root/key21/1/*', ['key21', 1], Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver( + '!root/key31/*/*/key14/map', + ['key31', None, None, 'key14'], + dict, + Loader=MyLoader, + Dumper=MyDumper, + ) + + return MyLoader, MyDumper + + +def _convert_node(node): + if isinstance(node, yaml.ScalarNode): + return (node.tag, node.value) + elif isinstance(node, yaml.SequenceNode): + value = [] + for item in node.value: + value.append(_convert_node(item)) + return (node.tag, value) + elif isinstance(node, yaml.MappingNode): + value = [] + for key, item in node.value: + value.append((_convert_node(key), _convert_node(item))) + return (node.tag, value) + + +def test_path_resolver_loader(data_filename, path_filename, verbose=False): + _make_path_loader_and_dumper() + with open(data_filename, 'rb') as fp0: + nodes1 = list(yaml.compose_all(fp0.read(), Loader=MyLoader)) + with open(path_filename, 'rb') as fp0: + nodes2 = list(yaml.compose_all(fp0.read())) + try: + for node1, node2 in zip(nodes1, nodes2): + data1 = _convert_node(node1) + data2 = _convert_node(node2) + assert data1 == data2, (data1, data2) + finally: + if verbose: + print(yaml.serialize_all(nodes1)) + + +test_path_resolver_loader.unittest = ['.data', '.path'] + + +def test_path_resolver_dumper(data_filename, path_filename, verbose=False): + _make_path_loader_and_dumper() + for filename in [data_filename, path_filename]: + with open(filename, 'rb') as fp0: + output = yaml.serialize_all(yaml.compose_all(fp0), Dumper=MyDumper) + if verbose: + print(output) + nodes1 = yaml.compose_all(output) + with open(data_filename, 'rb') as fp0: + nodes2 = yaml.compose_all(fp0) + for node1, node2 in zip(nodes1, nodes2): + data1 = _convert_node(node1) + data2 = _convert_node(node2) + assert data1 == data2, (data1, data2) + + +test_path_resolver_dumper.unittest = ['.data', '.path'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_structure.py b/_test/lib/test_structure.py new file mode 100644 index 0000000..8de24a3 --- /dev/null +++ b/_test/lib/test_structure.py @@ -0,0 +1,225 @@ + +import ruamel.yaml +import canonical # NOQA +import pprint + + +def _convert_structure(loader): + if loader.check_event(ruamel.yaml.ScalarEvent): + event = loader.get_event() + if event.tag or event.anchor or event.value: + return True + else: + return None + elif loader.check_event(ruamel.yaml.SequenceStartEvent): + loader.get_event() + sequence = [] + while not loader.check_event(ruamel.yaml.SequenceEndEvent): + sequence.append(_convert_structure(loader)) + loader.get_event() + return sequence + elif loader.check_event(ruamel.yaml.MappingStartEvent): + loader.get_event() + mapping = [] + while not loader.check_event(ruamel.yaml.MappingEndEvent): + key = _convert_structure(loader) + value = _convert_structure(loader) + mapping.append((key, value)) + loader.get_event() + return mapping + elif loader.check_event(ruamel.yaml.AliasEvent): + loader.get_event() + return '*' + else: + loader.get_event() + return '?' + + +def test_structure(data_filename, structure_filename, verbose=False): + nodes1 = [] + with open(structure_filename, 'r') as fp: + nodes2 = eval(fp.read()) + try: + with open(data_filename, 'rb') as fp: + loader = ruamel.yaml.Loader(fp) + while loader.check_event(): + if loader.check_event( + ruamel.yaml.StreamStartEvent, + ruamel.yaml.StreamEndEvent, + ruamel.yaml.DocumentStartEvent, + ruamel.yaml.DocumentEndEvent, + ): + loader.get_event() + continue + nodes1.append(_convert_structure(loader)) + if len(nodes1) == 1: + nodes1 = nodes1[0] + assert nodes1 == nodes2, (nodes1, nodes2) + finally: + if verbose: + print('NODES1:') + pprint.pprint(nodes1) + print('NODES2:') + pprint.pprint(nodes2) + + +test_structure.unittest = ['.data', '.structure'] + + +def _compare_events(events1, events2, full=False): + assert len(events1) == len(events2), (len(events1), len(events2)) + for event1, event2 in zip(events1, events2): + assert event1.__class__ == event2.__class__, (event1, event2) + if isinstance(event1, ruamel.yaml.AliasEvent) and full: + assert event1.anchor == event2.anchor, (event1, event2) + if isinstance(event1, (ruamel.yaml.ScalarEvent, ruamel.yaml.CollectionStartEvent)): + if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full: + assert event1.tag == event2.tag, (event1, event2) + if isinstance(event1, ruamel.yaml.ScalarEvent): + assert event1.value == event2.value, (event1, event2) + + +def test_parser(data_filename, canonical_filename, verbose=False): + events1 = None + events2 = None + try: + with open(data_filename, 'rb') as fp0: + events1 = list(ruamel.yaml.YAML().parse(fp0)) + with open(canonical_filename, 'rb') as fp0: + events2 = list(ruamel.yaml.YAML().canonical_parse(fp0)) + _compare_events(events1, events2) + finally: + if verbose: + print('EVENTS1:') + pprint.pprint(events1) + print('EVENTS2:') + pprint.pprint(events2) + + +test_parser.unittest = ['.data', '.canonical'] + + +def test_parser_on_canonical(canonical_filename, verbose=False): + events1 = None + events2 = None + try: + with open(canonical_filename, 'rb') as fp0: + events1 = list(ruamel.yaml.YAML().parse(fp0)) + with open(canonical_filename, 'rb') as fp0: + events2 = list(ruamel.yaml.YAML().canonical_parse(fp0)) + _compare_events(events1, events2, full=True) + finally: + if verbose: + print('EVENTS1:') + pprint.pprint(events1) + print('EVENTS2:') + pprint.pprint(events2) + + +test_parser_on_canonical.unittest = ['.canonical'] + + +def _compare_nodes(node1, node2): + assert node1.__class__ == node2.__class__, (node1, node2) + assert node1.tag == node2.tag, (node1, node2) + if isinstance(node1, ruamel.yaml.ScalarNode): + assert node1.value == node2.value, (node1, node2) + else: + assert len(node1.value) == len(node2.value), (node1, node2) + for item1, item2 in zip(node1.value, node2.value): + if not isinstance(item1, tuple): + item1 = (item1,) + item2 = (item2,) + for subnode1, subnode2 in zip(item1, item2): + _compare_nodes(subnode1, subnode2) + + +def test_composer(data_filename, canonical_filename, verbose=False): + nodes1 = None + nodes2 = None + try: + yaml = ruamel.yaml.YAML() + with open(data_filename, 'rb') as fp0: + nodes1 = list(yaml.compose_all(fp0)) + with open(canonical_filename, 'rb') as fp0: + nodes2 = list(yaml.canonical_compose_all(fp0)) + assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) + for node1, node2 in zip(nodes1, nodes2): + _compare_nodes(node1, node2) + finally: + if verbose: + print('NODES1:') + pprint.pprint(nodes1) + print('NODES2:') + pprint.pprint(nodes2) + + +test_composer.unittest = ['.data', '.canonical'] + + +def _make_loader(): + global MyLoader + + class MyLoader(ruamel.yaml.Loader): + def construct_sequence(self, node): + return tuple(ruamel.yaml.Loader.construct_sequence(self, node)) + + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort(key=(lambda i: str(i))) + return pairs + + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping) + MyLoader.add_constructor(None, MyLoader.construct_undefined) + + +def _make_canonical_loader(): + global MyCanonicalLoader + + class MyCanonicalLoader(ruamel.yaml.CanonicalLoader): + def construct_sequence(self, node): + return tuple(ruamel.yaml.CanonicalLoader.construct_sequence(self, node)) + + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort(key=(lambda i: str(i))) + return pairs + + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyCanonicalLoader.add_constructor( + 'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping + ) + MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) + + +def test_constructor(data_filename, canonical_filename, verbose=False): + _make_loader() + _make_canonical_loader() + native1 = None + native2 = None + yaml = YAML(typ='safe') + try: + with open(data_filename, 'rb') as fp0: + native1 = list(yaml.load(fp0, Loader=MyLoader)) + with open(canonical_filename, 'rb') as fp0: + native2 = list(yaml.load(fp0, Loader=MyCanonicalLoader)) + assert native1 == native2, (native1, native2) + finally: + if verbose: + print('NATIVE1:') + pprint.pprint(native1) + print('NATIVE2:') + pprint.pprint(native2) + + +test_constructor.unittest = ['.data', '.canonical'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_tokens.py b/_test/lib/test_tokens.py new file mode 100644 index 0000000..575e95c --- /dev/null +++ b/_test/lib/test_tokens.py @@ -0,0 +1,86 @@ + +import ruamel.yaml +import pprint + +# Tokens mnemonic: +# directive: % +# document_start: --- +# document_end: ... +# alias: * +# anchor: & +# tag: ! +# scalar _ +# block_sequence_start: [[ +# block_mapping_start: {{ +# block_end: ]} +# flow_sequence_start: [ +# flow_sequence_end: ] +# flow_mapping_start: { +# flow_mapping_end: } +# entry: , +# key: ? +# value: : + +_replaces = { + ruamel.yaml.DirectiveToken: '%', + ruamel.yaml.DocumentStartToken: '---', + ruamel.yaml.DocumentEndToken: '...', + ruamel.yaml.AliasToken: '*', + ruamel.yaml.AnchorToken: '&', + ruamel.yaml.TagToken: '!', + ruamel.yaml.ScalarToken: '_', + ruamel.yaml.BlockSequenceStartToken: '[[', + ruamel.yaml.BlockMappingStartToken: '{{', + ruamel.yaml.BlockEndToken: ']}', + ruamel.yaml.FlowSequenceStartToken: '[', + ruamel.yaml.FlowSequenceEndToken: ']', + ruamel.yaml.FlowMappingStartToken: '{', + ruamel.yaml.FlowMappingEndToken: '}', + ruamel.yaml.BlockEntryToken: ',', + ruamel.yaml.FlowEntryToken: ',', + ruamel.yaml.KeyToken: '?', + ruamel.yaml.ValueToken: ':', +} + + +def test_tokens(data_filename, tokens_filename, verbose=False): + tokens1 = [] + with open(tokens_filename, 'r') as fp: + tokens2 = fp.read().split() + try: + yaml = ruamel.yaml.YAML(typ='unsafe', pure=True) + with open(data_filename, 'rb') as fp1: + for token in yaml.scan(fp1): + if not isinstance(token, (ruamel.yaml.StreamStartToken, ruamel.yaml.StreamEndToken)): + tokens1.append(_replaces[token.__class__]) + finally: + if verbose: + print('TOKENS1:', ' '.join(tokens1)) + print('TOKENS2:', ' '.join(tokens2)) + assert len(tokens1) == len(tokens2), (tokens1, tokens2) + for token1, token2 in zip(tokens1, tokens2): + assert token1 == token2, (token1, token2) + + +test_tokens.unittest = ['.data', '.tokens'] + + +def test_scanner(data_filename, canonical_filename, verbose=False): + for filename in [data_filename, canonical_filename]: + tokens = [] + try: + yaml = ruamel.yaml.YAML(typ='unsafe', pure=False) + with open(filename, 'rb') as fp: + for token in yaml.scan(fp): + tokens.append(token.__class__.__name__) + finally: + if verbose: + pprint.pprint(tokens) + + +test_scanner.unittest = ['.data', '.canonical'] + +if __name__ == '__main__': + import test_appliance + + test_appliance.run(globals()) diff --git a/_test/lib/test_yaml.py b/_test/lib/test_yaml.py new file mode 100644 index 0000000..cf64a73 --- /dev/null +++ b/_test/lib/test_yaml.py @@ -0,0 +1,20 @@ +# coding: utf-8 + +from test_mark import * # NOQA +from test_reader import * # NOQA +from test_canonical import * # NOQA +from test_tokens import * # NOQA +from test_structure import * # NOQA +from test_errors import * # NOQA +from test_resolver import * # NOQA +from test_constructor import * # NOQA +from test_emitter import * # NOQA +from test_representer import * # NOQA +from test_recursive import * # NOQA +from test_input_output import * # NOQA + +if __name__ == '__main__': + import sys + import test_appliance + + sys.exit(test_appliance.run(globals())) diff --git a/_test/lib/test_yaml_ext.py b/_test/lib/test_yaml_ext.py new file mode 100644 index 0000000..a6fa287 --- /dev/null +++ b/_test/lib/test_yaml_ext.py @@ -0,0 +1,403 @@ +# coding: utf-8 + +import _ruamel_yaml +import ruamel.yaml +import types +import pprint + +ruamel.yaml.PyBaseLoader = ruamel.yaml.BaseLoader +ruamel.yaml.PySafeLoader = ruamel.yaml.SafeLoader +ruamel.yaml.PyLoader = ruamel.yaml.Loader +ruamel.yaml.PyBaseDumper = ruamel.yaml.BaseDumper +ruamel.yaml.PySafeDumper = ruamel.yaml.SafeDumper +ruamel.yaml.PyDumper = ruamel.yaml.Dumper + +old_scan = ruamel.yaml.scan + + +def new_scan(stream, Loader=ruamel.yaml.CLoader): + return old_scan(stream, Loader) + + +old_parse = ruamel.yaml.parse + + +def new_parse(stream, Loader=ruamel.yaml.CLoader): + return old_parse(stream, Loader) + + +old_compose = ruamel.yaml.compose + + +def new_compose(stream, Loader=ruamel.yaml.CLoader): + return old_compose(stream, Loader) + + +old_compose_all = ruamel.yaml.compose_all + + +def new_compose_all(stream, Loader=ruamel.yaml.CLoader): + return old_compose_all(stream, Loader) + + +old_load = ruamel.yaml.load + + +def new_load(stream, Loader=ruamel.yaml.CLoader): + return old_load(stream, Loader) + + +old_load_all = ruamel.yaml.load_all + + +def new_load_all(stream, Loader=ruamel.yaml.CLoader): + return old_load_all(stream, Loader) + + +old_safe_load = ruamel.yaml.safe_load + + +def new_safe_load(stream): + return old_load(stream, ruamel.yaml.CSafeLoader) + + +old_safe_load_all = ruamel.yaml.safe_load_all + + +def new_safe_load_all(stream): + return old_load_all(stream, ruamel.yaml.CSafeLoader) + + +old_emit = ruamel.yaml.emit + + +def new_emit(events, stream=None, Dumper=ruamel.yaml.CDumper, **kwds): + return old_emit(events, stream, Dumper, **kwds) + + +old_serialize = ruamel.yaml.serialize + + +def new_serialize(node, stream, Dumper=ruamel.yaml.CDumper, **kwds): + return old_serialize(node, stream, Dumper, **kwds) + + +old_serialize_all = ruamel.yaml.serialize_all + + +def new_serialize_all(nodes, stream=None, Dumper=ruamel.yaml.CDumper, **kwds): + return old_serialize_all(nodes, stream, Dumper, **kwds) + + +old_dump = ruamel.yaml.dump + + +def new_dump(data, stream=None, Dumper=ruamel.yaml.CDumper, **kwds): + return old_dump(data, stream, Dumper, **kwds) + + +old_dump_all = ruamel.yaml.dump_all + + +def new_dump_all(documents, stream=None, Dumper=ruamel.yaml.CDumper, **kwds): + return old_dump_all(documents, stream, Dumper, **kwds) + + +old_safe_dump = ruamel.yaml.safe_dump + + +def new_safe_dump(data, stream=None, **kwds): + return old_dump(data, stream, ruamel.yaml.CSafeDumper, **kwds) + + +# old_safe_dump_all = ruamel.yaml.safe_dump_all + + +def new_safe_dump_all(documents, stream=None, **kwds): + return old_dump_all(documents, stream, ruamel.yaml.CSafeDumper, **kwds) + + +def _set_up(): + ruamel.yaml.BaseLoader = ruamel.yaml.CBaseLoader + ruamel.yaml.SafeLoader = ruamel.yaml.CSafeLoader + ruamel.yaml.Loader = ruamel.yaml.CLoader + ruamel.yaml.BaseDumper = ruamel.yaml.CBaseDumper + ruamel.yaml.SafeDumper = ruamel.yaml.CSafeDumper + ruamel.yaml.Dumper = ruamel.yaml.CDumper + ruamel.yaml.scan = new_scan + ruamel.yaml.parse = new_parse + ruamel.yaml.compose = new_compose + ruamel.yaml.compose_all = new_compose_all + ruamel.yaml.load = new_load + ruamel.yaml.load_all = new_load_all + ruamel.yaml.safe_load = new_safe_load + ruamel.yaml.safe_load_all = new_safe_load_all + ruamel.yaml.emit = new_emit + ruamel.yaml.serialize = new_serialize + ruamel.yaml.serialize_all = new_serialize_all + ruamel.yaml.dump = new_dump + ruamel.yaml.dump_all = new_dump_all + ruamel.yaml.safe_dump = new_safe_dump + ruamel.yaml.safe_dump_all = new_safe_dump_all + + +def _tear_down(): + ruamel.yaml.BaseLoader = ruamel.yaml.PyBaseLoader + ruamel.yaml.SafeLoader = ruamel.yaml.PySafeLoader + ruamel.yaml.Loader = ruamel.yaml.PyLoader + ruamel.yaml.BaseDumper = ruamel.yaml.PyBaseDumper + ruamel.yaml.SafeDumper = ruamel.yaml.PySafeDumper + ruamel.yaml.Dumper = ruamel.yaml.PyDumper + ruamel.yaml.scan = old_scan + ruamel.yaml.parse = old_parse + ruamel.yaml.compose = old_compose + ruamel.yaml.compose_all = old_compose_all + ruamel.yaml.load = old_load + ruamel.yaml.load_all = old_load_all + ruamel.yaml.safe_load = old_safe_load + ruamel.yaml.safe_load_all = old_safe_load_all + ruamel.yaml.emit = old_emit + ruamel.yaml.serialize = old_serialize + ruamel.yaml.serialize_all = old_serialize_all + ruamel.yaml.dump = old_dump + ruamel.yaml.dump_all = old_dump_all + ruamel.yaml.safe_dump = old_safe_dump + ruamel.yaml.safe_dump_all = old_safe_dump_all + + +def test_c_version(verbose=False): + if verbose: + print(_ruamel_yaml.get_version()) + print(_ruamel_yaml.get_version_string()) + assert ('%s.%s.%s' % _ruamel_yaml.get_version()) == _ruamel_yaml.get_version_string(), ( + _ruamel_yaml.get_version(), + _ruamel_yaml.get_version_string(), + ) + + +def _compare_scanners(py_data, c_data, verbose): + yaml = ruamel.yaml.YAML(typ='unsafe', pure=True) + py_tokens = list(yaml.scan(py_data, Loader=ruamel.yaml.PyLoader)) + c_tokens = [] + try: + yaml = ruamel.yaml.YAML(typ='unsafe', pure=False) + for token in yaml.scan(c_data, Loader=ruamel.yaml.CLoader): + c_tokens.append(token) + assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) + for py_token, c_token in zip(py_tokens, c_tokens): + assert py_token.__class__ == c_token.__class__, (py_token, c_token) + if hasattr(py_token, 'value'): + assert py_token.value == c_token.value, (py_token, c_token) + if isinstance(py_token, ruamel.yaml.StreamEndToken): + continue + py_start = ( + py_token.start_mark.index, + py_token.start_mark.line, + py_token.start_mark.column, + ) + py_end = ( + py_token.end_mark.index, + py_token.end_mark.line, + py_token.end_mark.column, + ) + c_start = ( + c_token.start_mark.index, + c_token.start_mark.line, + c_token.start_mark.column, + ) + c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) + assert py_start == c_start, (py_start, c_start) + assert py_end == c_end, (py_end, c_end) + finally: + if verbose: + print('PY_TOKENS:') + pprint.pprint(py_tokens) + print('C_TOKENS:') + pprint.pprint(c_tokens) + + +def test_c_scanner(data_filename, canonical_filename, verbose=False): + with open(data_filename, 'rb') as fp0: + with open(data_filename, 'rb') as fp1: + _compare_scanners(fp0, fp1, verbose) + with open(data_filename, 'rb') as fp0: + with open(data_filename, 'rb') as fp1: + _compare_scanners(fp0.read(), fp1.read(), verbose) + with open(canonical_filename, 'rb') as fp0: + with open(canonical_filename, 'rb') as fp1: + _compare_scanners(fp0, fp1, verbose) + with open(canonical_filename, 'rb') as fp0: + with open(canonical_filename, 'rb') as fp1: + _compare_scanners(fp0.read(), fp1.read(), verbose) + + +test_c_scanner.unittest = ['.data', '.canonical'] +test_c_scanner.skip = ['.skip-ext'] + + +def _compare_parsers(py_data, c_data, verbose): + yaml = ruamel.yaml.YAML(typ='unsafe', pure=True) + py_events = list(yaml.parse(py_data, Loader=ruamel.yaml.PyLoader)) + c_events = [] + try: + yaml = ruamel.yaml.YAML(typ='unsafe', pure=False) + for event in yaml.parse(c_data, Loader=ruamel.yaml.CLoader): + c_events.append(event) + assert len(py_events) == len(c_events), (len(py_events), len(c_events)) + for py_event, c_event in zip(py_events, c_events): + for attribute in [ + '__class__', + 'anchor', + 'tag', + 'implicit', + 'value', + 'explicit', + 'version', + 'tags', + ]: + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + assert py_value == c_value, (py_event, c_event, attribute) + finally: + if verbose: + print('PY_EVENTS:') + pprint.pprint(py_events) + print('C_EVENTS:') + pprint.pprint(c_events) + + +def test_c_parser(data_filename, canonical_filename, verbose=False): + with open(data_filename, 'rb') as fp0: + with open(data_filename, 'rb') as fp1: + _compare_parsers(fp0, fp1, verbose) + with open(data_filename, 'rb') as fp0: + with open(data_filename, 'rb') as fp1: + _compare_parsers(fp0.read(), fp1.read(), verbose) + with open(canonical_filename, 'rb') as fp0: + with open(canonical_filename, 'rb') as fp1: + _compare_parsers(fp0, fp1, verbose) + with open(canonical_filename, 'rb') as fp0: + with open(canonical_filename, 'rb') as fp1: + _compare_parsers(fp0.read(), fp1.read(), verbose) + + +test_c_parser.unittest = ['.data', '.canonical'] +test_c_parser.skip = ['.skip-ext'] + + +def _compare_emitters(data, verbose): + yaml = ruamel.yaml.YAML(typ='unsafe', pure=True) + events = list(yaml.parse(py_data, Loader=ruamel.yaml.PyLoader)) + c_data = yaml.emit(events, Dumper=ruamel.yaml.CDumper) + if verbose: + print(c_data) + py_events = list(yaml.parse(c_data, Loader=ruamel.yaml.PyLoader)) + c_events = list(yaml.parse(c_data, Loader=ruamel.yaml.CLoader)) + try: + assert len(events) == len(py_events), (len(events), len(py_events)) + assert len(events) == len(c_events), (len(events), len(c_events)) + for event, py_event, c_event in zip(events, py_events, c_events): + for attribute in [ + '__class__', + 'anchor', + 'tag', + 'implicit', + 'value', + 'explicit', + 'version', + 'tags', + ]: + value = getattr(event, attribute, None) + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + if ( + attribute == 'tag' + and value in [None, '!'] + and py_value in [None, '!'] + and c_value in [None, '!'] + ): + continue + if attribute == 'explicit' and (py_value or c_value): + continue + assert value == py_value, (event, py_event, attribute) + assert value == c_value, (event, c_event, attribute) + finally: + if verbose: + print('EVENTS:') + pprint.pprint(events) + print('PY_EVENTS:') + pprint.pprint(py_events) + print('C_EVENTS:') + pprint.pprint(c_events) + + +def test_c_emitter(data_filename, canonical_filename, verbose=False): + with open(data_filename, 'rb') as fp0: + _compare_emitters(fp0.read(), verbose) + with open(canonical_filename, 'rb') as fp0: + _compare_emitters(fp0.read(), verbose) + + +test_c_emitter.unittest = ['.data', '.canonical'] +test_c_emitter.skip = ['.skip-ext'] + + +def wrap_ext_function(function): + def wrapper(*args, **kwds): + _set_up() + try: + function(*args, **kwds) + finally: + _tear_down() + + wrapper.__name__ = '%s_ext' % function.__name__ + wrapper.unittest = function.unittest + wrapper.skip = getattr(function, 'skip', []) + ['.skip-ext'] + return wrapper + + +def wrap_ext(collections): + functions = [] + if not isinstance(collections, list): + collections = [collections] + for collection in collections: + if not isinstance(collection, dict): + collection = vars(collection) + for key in sorted(collection): + value = collection[key] + if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): + functions.append(wrap_ext_function(value)) + for function in functions: + assert function.__name__ not in globals() + globals()[function.__name__] = function + + +import test_tokens # NOQA +import test_structure # NOQA +import test_errors # NOQA +import test_resolver # NOQA +import test_constructor # NOQA +import test_emitter # NOQA +import test_representer # NOQA +import test_recursive # NOQA +import test_input_output # NOQA + +wrap_ext( + [ + test_tokens, + test_structure, + test_errors, + test_resolver, + test_constructor, + test_emitter, + test_representer, + test_recursive, + test_input_output, + ] +) + +if __name__ == '__main__': + import sys + import test_appliance + + sys.exit(test_appliance.run(globals())) -- cgit v1.2.3