summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/constructor.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ruyaml/constructor.py')
-rw-r--r--lib/ruyaml/constructor.py1920
1 files changed, 1920 insertions, 0 deletions
diff --git a/lib/ruyaml/constructor.py b/lib/ruyaml/constructor.py
new file mode 100644
index 0000000..3931f39
--- /dev/null
+++ b/lib/ruyaml/constructor.py
@@ -0,0 +1,1920 @@
+# coding: utf-8
+
+import base64
+import binascii
+import datetime
+import sys
+import types
+import warnings
+from collections.abc import Hashable, MutableMapping, MutableSequence # type: ignore
+
+from ruyaml.comments import * # NOQA
+from ruyaml.comments import (
+ C_KEY_EOL,
+ C_KEY_POST,
+ C_KEY_PRE,
+ C_VALUE_EOL,
+ C_VALUE_POST,
+ C_VALUE_PRE,
+ CommentedKeyMap,
+ CommentedKeySeq,
+ CommentedMap,
+ CommentedOrderedMap,
+ CommentedSeq,
+ CommentedSet,
+ TaggedScalar,
+)
+from ruyaml.compat import builtins_module # NOQA
+from ruyaml.compat import ordereddict # type: ignore
+from ruyaml.compat import _F, nprintf
+
+# fmt: off
+from ruyaml.error import (
+ MantissaNoDotYAML1_1Warning,
+ MarkedYAMLError,
+ MarkedYAMLFutureWarning,
+)
+from ruyaml.nodes import * # NOQA
+from ruyaml.nodes import MappingNode, ScalarNode, SequenceNode
+from ruyaml.scalarbool import ScalarBoolean
+from ruyaml.scalarfloat import ScalarFloat
+from ruyaml.scalarint import BinaryInt, HexCapsInt, HexInt, OctalInt, ScalarInt
+from ruyaml.scalarstring import (
+ DoubleQuotedScalarString,
+ FoldedScalarString,
+ LiteralScalarString,
+ PlainScalarString,
+ ScalarString,
+ SingleQuotedScalarString,
+)
+from ruyaml.timestamp import TimeStamp
+from ruyaml.util import create_timestamp, timestamp_regexp
+
+if False: # MYPY
+ from typing import Any, Dict, Generator, List, Optional, Set, Union # NOQA
+
+
+__all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor',
+ 'ConstructorError', 'RoundTripConstructor']
+# fmt: on
+
+
+class ConstructorError(MarkedYAMLError):
+ pass
+
+
+class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning):
+ pass
+
+
+class DuplicateKeyError(MarkedYAMLError):
+ pass
+
+
+class BaseConstructor:
+
+ yaml_constructors = {} # type: Dict[Any, Any]
+ yaml_multi_constructors = {} # type: Dict[Any, Any]
+
+ def __init__(self, preserve_quotes=None, loader=None):
+ # type: (Optional[bool], Any) -> None
+ self.loader = loader
+ if (
+ self.loader is not None
+ and getattr(self.loader, '_constructor', None) is None
+ ):
+ self.loader._constructor = self
+ self.loader = loader
+ self.yaml_base_dict_type = dict
+ self.yaml_base_list_type = list
+ self.constructed_objects = {} # type: Dict[Any, Any]
+ self.recursive_objects = {} # type: Dict[Any, Any]
+ self.state_generators = [] # type: List[Any]
+ self.deep_construct = False
+ self._preserve_quotes = preserve_quotes
+ self.allow_duplicate_keys = False
+
+ @property
+ def composer(self):
+ # type: () -> Any
+ if hasattr(self.loader, 'typ'):
+ return self.loader.composer # type: ignore
+ try:
+ return self.loader._composer # type: ignore
+ except AttributeError:
+ sys.stdout.write('slt {}\n'.format(type(self)))
+ sys.stdout.write('slc {}\n'.format(self.loader._composer)) # type: ignore
+ sys.stdout.write('{}\n'.format(dir(self)))
+ raise
+
+ @property
+ def resolver(self):
+ # type: () -> Any
+ if hasattr(self.loader, 'typ'):
+ return self.loader.resolver # type: ignore
+ return self.loader._resolver # type: ignore
+
+ @property
+ def scanner(self):
+ # type: () -> Any
+ # needed to get to the expanded comments
+ if hasattr(self.loader, 'typ'):
+ return self.loader.scanner # type: ignore
+ return self.loader._scanner # type: ignore
+
+ def check_data(self):
+ # type: () -> Any
+ # If there are more documents available?
+ return self.composer.check_node()
+
+ def get_data(self):
+ # type: () -> Any
+ # Construct and return the next document.
+ if self.composer.check_node():
+ return self.construct_document(self.composer.get_node())
+
+ def get_single_data(self):
+ # type: () -> Any
+ # Ensure that the stream contains a single document and construct it.
+ node = self.composer.get_single_node()
+ if node is not None:
+ return self.construct_document(node)
+ return None
+
+ def construct_document(self, node):
+ # type: (Any) -> Any
+ data = self.construct_object(node)
+ while bool(self.state_generators):
+ state_generators = self.state_generators
+ self.state_generators = []
+ for generator in state_generators:
+ for _dummy in generator:
+ pass
+ self.constructed_objects = {}
+ self.recursive_objects = {}
+ self.deep_construct = False
+ return data
+
+ def construct_object(self, node, deep=False):
+ # type: (Any, bool) -> Any
+ """deep is True when creating an object/mapping recursively,
+ in that case want the underlying elements available during construction
+ """
+ if node in self.constructed_objects:
+ return self.constructed_objects[node]
+ if deep:
+ old_deep = self.deep_construct
+ self.deep_construct = True
+ if node in self.recursive_objects:
+ return self.recursive_objects[node]
+ # raise ConstructorError(
+ # None, None, 'found unconstructable recursive node', node.start_mark
+ # )
+ self.recursive_objects[node] = None
+ data = self.construct_non_recursive_object(node)
+
+ self.constructed_objects[node] = data
+ del self.recursive_objects[node]
+ if deep:
+ self.deep_construct = old_deep
+ return data
+
+ def construct_non_recursive_object(self, node, tag=None):
+ # type: (Any, Optional[str]) -> Any
+ constructor = None # type: Any
+ tag_suffix = None
+ if tag is None:
+ tag = node.tag
+ if tag in self.yaml_constructors:
+ constructor = self.yaml_constructors[tag]
+ else:
+ for tag_prefix in self.yaml_multi_constructors:
+ if tag.startswith(tag_prefix):
+ tag_suffix = tag[len(tag_prefix) :]
+ constructor = self.yaml_multi_constructors[tag_prefix]
+ break
+ else:
+ if None in self.yaml_multi_constructors:
+ tag_suffix = tag
+ constructor = self.yaml_multi_constructors[None]
+ elif None in self.yaml_constructors:
+ constructor = self.yaml_constructors[None]
+ elif isinstance(node, ScalarNode):
+ constructor = self.__class__.construct_scalar
+ elif isinstance(node, SequenceNode):
+ constructor = self.__class__.construct_sequence
+ elif isinstance(node, MappingNode):
+ constructor = self.__class__.construct_mapping
+ if tag_suffix is None:
+ data = constructor(self, node)
+ else:
+ data = constructor(self, tag_suffix, node)
+ if isinstance(data, types.GeneratorType):
+ generator = data
+ data = next(generator)
+ if self.deep_construct:
+ for _dummy in generator:
+ pass
+ else:
+ self.state_generators.append(generator)
+ return data
+
+ def construct_scalar(self, node):
+ # type: (Any) -> Any
+ if not isinstance(node, ScalarNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a scalar node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ return node.value
+
+ def construct_sequence(self, node, deep=False):
+ # type: (Any, bool) -> Any
+ """deep is True when creating an object/mapping recursively,
+ in that case want the underlying elements available during construction
+ """
+ if not isinstance(node, SequenceNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a sequence node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ return [self.construct_object(child, deep=deep) for child in node.value]
+
+ def construct_mapping(self, node, deep=False):
+ # type: (Any, bool) -> Any
+ """deep is True when creating an object/mapping recursively,
+ in that case want the underlying elements available during construction
+ """
+ if not isinstance(node, MappingNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ total_mapping = self.yaml_base_dict_type()
+ if getattr(node, 'merge', None) is not None:
+ todo = [(node.merge, False), (node.value, False)]
+ else:
+ todo = [(node.value, True)]
+ for values, check in todo:
+ mapping = self.yaml_base_dict_type() # type: Dict[Any, Any]
+ for key_node, value_node in values:
+ # keys can be list -> deep
+ key = self.construct_object(key_node, deep=True)
+ # lists are not hashable, but tuples are
+ if not isinstance(key, Hashable):
+ if isinstance(key, list):
+ key = tuple(key)
+ if not isinstance(key, Hashable):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found unhashable key',
+ key_node.start_mark,
+ )
+
+ value = self.construct_object(value_node, deep=deep)
+ if check:
+ if self.check_mapping_key(node, key_node, mapping, key, value):
+ mapping[key] = value
+ else:
+ mapping[key] = value
+ total_mapping.update(mapping)
+ return total_mapping
+
+ def check_mapping_key(self, node, key_node, mapping, key, value):
+ # type: (Any, Any, Any, Any, Any) -> bool
+ """return True if key is unique"""
+ if key in mapping:
+ if not self.allow_duplicate_keys:
+ mk = mapping.get(key)
+ args = [
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found duplicate key "{}" with value "{}" '
+ '(original value: "{}")'.format(key, value, mk),
+ key_node.start_mark,
+ """
+ To suppress this check see:
+ http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
+ """,
+ """\
+ Duplicate keys will become an error in future releases, and are errors
+ by default when using the new API.
+ """,
+ ]
+ if self.allow_duplicate_keys is None:
+ warnings.warn(DuplicateKeyFutureWarning(*args))
+ else:
+ raise DuplicateKeyError(*args)
+ return False
+ return True
+
+ def check_set_key(self, node, key_node, setting, key):
+ # type: (Any, Any, Any, Any, Any) -> None
+ if key in setting:
+ if not self.allow_duplicate_keys:
+ args = [
+ 'while constructing a set',
+ node.start_mark,
+ 'found duplicate key "{}"'.format(key),
+ key_node.start_mark,
+ """
+ To suppress this check see:
+ http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
+ """,
+ """\
+ Duplicate keys will become an error in future releases, and are errors
+ by default when using the new API.
+ """,
+ ]
+ if self.allow_duplicate_keys is None:
+ warnings.warn(DuplicateKeyFutureWarning(*args))
+ else:
+ raise DuplicateKeyError(*args)
+
+ def construct_pairs(self, node, deep=False):
+ # type: (Any, bool) -> Any
+ if not isinstance(node, MappingNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ pairs = []
+ for key_node, value_node in node.value:
+ key = self.construct_object(key_node, deep=deep)
+ value = self.construct_object(value_node, deep=deep)
+ pairs.append((key, value))
+ return pairs
+
+ @classmethod
+ def add_constructor(cls, tag, constructor):
+ # type: (Any, Any) -> None
+ if 'yaml_constructors' not in cls.__dict__:
+ cls.yaml_constructors = cls.yaml_constructors.copy()
+ cls.yaml_constructors[tag] = constructor
+
+ @classmethod
+ def add_multi_constructor(cls, tag_prefix, multi_constructor):
+ # type: (Any, Any) -> None
+ if 'yaml_multi_constructors' not in cls.__dict__:
+ cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy()
+ cls.yaml_multi_constructors[tag_prefix] = multi_constructor
+
+
+class SafeConstructor(BaseConstructor):
+ def construct_scalar(self, node):
+ # type: (Any) -> Any
+ if isinstance(node, MappingNode):
+ for key_node, value_node in node.value:
+ if key_node.tag == 'tag:yaml.org,2002:value':
+ return self.construct_scalar(value_node)
+ return BaseConstructor.construct_scalar(self, node)
+
+ def flatten_mapping(self, node):
+ # type: (Any) -> Any
+ """
+ This implements the merge key feature http://yaml.org/type/merge.html
+ by inserting keys from the merge dict/list of dicts if not yet
+ available in this node
+ """
+ merge = [] # type: List[Any]
+ index = 0
+ while index < len(node.value):
+ key_node, value_node = node.value[index]
+ if key_node.tag == 'tag:yaml.org,2002:merge':
+ if merge: # double << key
+ if self.allow_duplicate_keys:
+ del node.value[index]
+ index += 1
+ continue
+ args = [
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found duplicate key "{}"'.format(key_node.value),
+ key_node.start_mark,
+ """
+ To suppress this check see:
+ http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
+ """,
+ """\
+ Duplicate keys will become an error in future releases, and are errors
+ by default when using the new API.
+ """,
+ ]
+ if self.allow_duplicate_keys is None:
+ warnings.warn(DuplicateKeyFutureWarning(*args))
+ else:
+ raise DuplicateKeyError(*args)
+ del node.value[index]
+ if isinstance(value_node, MappingNode):
+ self.flatten_mapping(value_node)
+ merge.extend(value_node.value)
+ elif isinstance(value_node, SequenceNode):
+ submerge = []
+ for subnode in value_node.value:
+ if not isinstance(subnode, MappingNode):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ _F(
+ 'expected a mapping for merging, but found {subnode_id!s}',
+ subnode_id=subnode.id,
+ ),
+ subnode.start_mark,
+ )
+ self.flatten_mapping(subnode)
+ submerge.append(subnode.value)
+ submerge.reverse()
+ for value in submerge:
+ merge.extend(value)
+ else:
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ _F(
+ 'expected a mapping or list of mappings for merging, '
+ 'but found {value_node_id!s}',
+ value_node_id=value_node.id,
+ ),
+ value_node.start_mark,
+ )
+ elif key_node.tag == 'tag:yaml.org,2002:value':
+ key_node.tag = 'tag:yaml.org,2002:str'
+ index += 1
+ else:
+ index += 1
+ if bool(merge):
+ node.merge = (
+ merge # separate merge keys to be able to update without duplicate
+ )
+ node.value = merge + node.value
+
+ def construct_mapping(self, node, deep=False):
+ # type: (Any, bool) -> Any
+ """deep is True when creating an object/mapping recursively,
+ in that case want the underlying elements available during construction
+ """
+ if isinstance(node, MappingNode):
+ self.flatten_mapping(node)
+ return BaseConstructor.construct_mapping(self, node, deep=deep)
+
+ def construct_yaml_null(self, node):
+ # type: (Any) -> Any
+ self.construct_scalar(node)
+ return None
+
+ # YAML 1.2 spec doesn't mention yes/no etc any more, 1.1 does
+ bool_values = {
+ 'yes': True,
+ 'no': False,
+ 'y': True,
+ 'n': False,
+ 'true': True,
+ 'false': False,
+ 'on': True,
+ 'off': False,
+ }
+
+ def construct_yaml_bool(self, node):
+ # type: (Any) -> bool
+ value = self.construct_scalar(node)
+ return self.bool_values[value.lower()]
+
+ def construct_yaml_int(self, node):
+ # type: (Any) -> int
+ value_s = self.construct_scalar(node)
+ value_s = value_s.replace('_', "")
+ sign = +1
+ if value_s[0] == '-':
+ sign = -1
+ if value_s[0] in '+-':
+ value_s = value_s[1:]
+ if value_s == '0':
+ return 0
+ elif value_s.startswith('0b'):
+ return sign * int(value_s[2:], 2)
+ elif value_s.startswith('0x'):
+ return sign * int(value_s[2:], 16)
+ elif value_s.startswith('0o'):
+ return sign * int(value_s[2:], 8)
+ elif self.resolver.processing_version == (1, 1) and value_s[0] == '0':
+ return sign * int(value_s, 8)
+ elif self.resolver.processing_version == (1, 1) and ':' in value_s:
+ digits = [int(part) for part in value_s.split(':')]
+ digits.reverse()
+ base = 1
+ value = 0
+ for digit in digits:
+ value += digit * base
+ base *= 60
+ return sign * value
+ else:
+ return sign * int(value_s)
+
+ inf_value = 1e300
+ while inf_value != inf_value * inf_value:
+ inf_value *= inf_value
+ nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99).
+
+ def construct_yaml_float(self, node):
+ # type: (Any) -> float
+ value_so = self.construct_scalar(node)
+ value_s = value_so.replace('_', "").lower()
+ sign = +1
+ if value_s[0] == '-':
+ sign = -1
+ if value_s[0] in '+-':
+ value_s = value_s[1:]
+ if value_s == '.inf':
+ return sign * self.inf_value
+ elif value_s == '.nan':
+ return self.nan_value
+ elif self.resolver.processing_version != (1, 2) and ':' in value_s:
+ digits = [float(part) for part in value_s.split(':')]
+ digits.reverse()
+ base = 1
+ value = 0.0
+ for digit in digits:
+ value += digit * base
+ base *= 60
+ return sign * value
+ else:
+ if self.resolver.processing_version != (1, 2) and 'e' in value_s:
+ # value_s is lower case independent of input
+ mantissa, exponent = value_s.split('e')
+ if '.' not in mantissa:
+ warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
+ return sign * float(value_s)
+
+ def construct_yaml_binary(self, node):
+ # type: (Any) -> Any
+ try:
+ value = self.construct_scalar(node).encode('ascii')
+ except UnicodeEncodeError as exc:
+ raise ConstructorError(
+ None,
+ None,
+ _F('failed to convert base64 data into ascii: {exc!s}', exc=exc),
+ node.start_mark,
+ )
+ try:
+ return base64.decodebytes(value)
+ except binascii.Error as exc:
+ raise ConstructorError(
+ None,
+ None,
+ _F('failed to decode base64 data: {exc!s}', exc=exc),
+ node.start_mark,
+ )
+
+ timestamp_regexp = timestamp_regexp # moved to util 0.17.17
+
+ def construct_yaml_timestamp(self, node, values=None):
+ # type: (Any, Any) -> Any
+ if values is None:
+ try:
+ match = self.timestamp_regexp.match(node.value)
+ except TypeError:
+ match = None
+ if match is None:
+ raise ConstructorError(
+ None,
+ None,
+ 'failed to construct timestamp from "{}"'.format(node.value),
+ node.start_mark,
+ )
+ values = match.groupdict()
+ return create_timestamp(**values)
+
+ def construct_yaml_omap(self, node):
+ # type: (Any) -> Any
+ # Note: we do now check for duplicate keys
+ omap = ordereddict()
+ yield omap
+ if not isinstance(node, SequenceNode):
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ for subnode in node.value:
+ if not isinstance(subnode, MappingNode):
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F(
+ 'expected a mapping of length 1, but found {subnode_id!s}',
+ subnode_id=subnode.id,
+ ),
+ subnode.start_mark,
+ )
+ if len(subnode.value) != 1:
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F(
+ 'expected a single mapping item, but found {len_subnode_val:d} items',
+ len_subnode_val=len(subnode.value),
+ ),
+ subnode.start_mark,
+ )
+ key_node, value_node = subnode.value[0]
+ key = self.construct_object(key_node)
+ assert key not in omap
+ value = self.construct_object(value_node)
+ omap[key] = value
+
+ def construct_yaml_pairs(self, node):
+ # type: (Any) -> Any
+ # Note: the same code as `construct_yaml_omap`.
+ pairs = [] # type: List[Any]
+ yield pairs
+ if not isinstance(node, SequenceNode):
+ raise ConstructorError(
+ 'while constructing pairs',
+ node.start_mark,
+ _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ for subnode in node.value:
+ if not isinstance(subnode, MappingNode):
+ raise ConstructorError(
+ 'while constructing pairs',
+ node.start_mark,
+ _F(
+ 'expected a mapping of length 1, but found {subnode_id!s}',
+ subnode_id=subnode.id,
+ ),
+ subnode.start_mark,
+ )
+ if len(subnode.value) != 1:
+ raise ConstructorError(
+ 'while constructing pairs',
+ node.start_mark,
+ _F(
+ 'expected a single mapping item, but found {len_subnode_val:d} items',
+ len_subnode_val=len(subnode.value),
+ ),
+ subnode.start_mark,
+ )
+ key_node, value_node = subnode.value[0]
+ key = self.construct_object(key_node)
+ value = self.construct_object(value_node)
+ pairs.append((key, value))
+
+ def construct_yaml_set(self, node):
+ # type: (Any) -> Any
+ data = set() # type: Set[Any]
+ yield data
+ value = self.construct_mapping(node)
+ data.update(value)
+
+ def construct_yaml_str(self, node):
+ # type: (Any) -> Any
+ value = self.construct_scalar(node)
+ return value
+
+ def construct_yaml_seq(self, node):
+ # type: (Any) -> Any
+ data = self.yaml_base_list_type() # type: List[Any]
+ yield data
+ data.extend(self.construct_sequence(node))
+
+ def construct_yaml_map(self, node):
+ # type: (Any) -> Any
+ data = self.yaml_base_dict_type() # type: Dict[Any, Any]
+ yield data
+ value = self.construct_mapping(node)
+ data.update(value)
+
+ def construct_yaml_object(self, node, cls):
+ # type: (Any, Any) -> Any
+ data = cls.__new__(cls)
+ yield data
+ if hasattr(data, '__setstate__'):
+ state = self.construct_mapping(node, deep=True)
+ data.__setstate__(state)
+ else:
+ state = self.construct_mapping(node)
+ data.__dict__.update(state)
+
+ def construct_undefined(self, node):
+ # type: (Any) -> None
+ raise ConstructorError(
+ None,
+ None,
+ _F(
+ 'could not determine a constructor for the tag {node_tag!r}',
+ node_tag=node.tag,
+ ),
+ node.start_mark,
+ )
+
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:null', SafeConstructor.construct_yaml_null
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:bool', SafeConstructor.construct_yaml_bool
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:int', SafeConstructor.construct_yaml_int
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:float', SafeConstructor.construct_yaml_float
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:binary', SafeConstructor.construct_yaml_binary
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:timestamp', SafeConstructor.construct_yaml_timestamp
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:omap', SafeConstructor.construct_yaml_omap
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:pairs', SafeConstructor.construct_yaml_pairs
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:set', SafeConstructor.construct_yaml_set
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:str', SafeConstructor.construct_yaml_str
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:seq', SafeConstructor.construct_yaml_seq
+)
+
+SafeConstructor.add_constructor(
+ 'tag:yaml.org,2002:map', SafeConstructor.construct_yaml_map
+)
+
+SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined)
+
+
+class Constructor(SafeConstructor):
+ def construct_python_str(self, node):
+ # type: (Any) -> Any
+ return self.construct_scalar(node)
+
+ def construct_python_unicode(self, node):
+ # type: (Any) -> Any
+ return self.construct_scalar(node)
+
+ def construct_python_bytes(self, node):
+ # type: (Any) -> Any
+ try:
+ value = self.construct_scalar(node).encode('ascii')
+ except UnicodeEncodeError as exc:
+ raise ConstructorError(
+ None,
+ None,
+ _F('failed to convert base64 data into ascii: {exc!s}', exc=exc),
+ node.start_mark,
+ )
+ try:
+ return base64.decodebytes(value)
+ except binascii.Error as exc:
+ raise ConstructorError(
+ None,
+ None,
+ _F('failed to decode base64 data: {exc!s}', exc=exc),
+ node.start_mark,
+ )
+
+ def construct_python_long(self, node):
+ # type: (Any) -> int
+ val = self.construct_yaml_int(node)
+ return val
+
+ def construct_python_complex(self, node):
+ # type: (Any) -> Any
+ return complex(self.construct_scalar(node))
+
+ def construct_python_tuple(self, node):
+ # type: (Any) -> Any
+ return tuple(self.construct_sequence(node))
+
+ def find_python_module(self, name, mark):
+ # type: (Any, Any) -> Any
+ if not name:
+ raise ConstructorError(
+ 'while constructing a Python module',
+ mark,
+ 'expected non-empty name appended to the tag',
+ mark,
+ )
+ try:
+ __import__(name)
+ except ImportError as exc:
+ raise ConstructorError(
+ 'while constructing a Python module',
+ mark,
+ _F('cannot find module {name!r} ({exc!s})', name=name, exc=exc),
+ mark,
+ )
+ return sys.modules[name]
+
+ def find_python_name(self, name, mark):
+ # type: (Any, Any) -> Any
+ if not name:
+ raise ConstructorError(
+ 'while constructing a Python object',
+ mark,
+ 'expected non-empty name appended to the tag',
+ mark,
+ )
+ if '.' in name:
+ lname = name.split('.')
+ lmodule_name = lname
+ lobject_name = [] # type: List[Any]
+ while len(lmodule_name) > 1:
+ lobject_name.insert(0, lmodule_name.pop())
+ module_name = '.'.join(lmodule_name)
+ try:
+ __import__(module_name)
+ # object_name = '.'.join(object_name)
+ break
+ except ImportError:
+ continue
+ else:
+ module_name = builtins_module
+ lobject_name = [name]
+ try:
+ __import__(module_name)
+ except ImportError as exc:
+ raise ConstructorError(
+ 'while constructing a Python object',
+ mark,
+ _F(
+ 'cannot find module {module_name!r} ({exc!s})',
+ module_name=module_name,
+ exc=exc,
+ ),
+ mark,
+ )
+ module = sys.modules[module_name]
+ object_name = '.'.join(lobject_name)
+ obj = module
+ while lobject_name:
+ if not hasattr(obj, lobject_name[0]):
+
+ raise ConstructorError(
+ 'while constructing a Python object',
+ mark,
+ _F(
+ 'cannot find {object_name!r} in the module {module_name!r}',
+ object_name=object_name,
+ module_name=module.__name__,
+ ),
+ mark,
+ )
+ obj = getattr(obj, lobject_name.pop(0))
+ return obj
+
+ def construct_python_name(self, suffix, node):
+ # type: (Any, Any) -> Any
+ value = self.construct_scalar(node)
+ if value:
+ raise ConstructorError(
+ 'while constructing a Python name',
+ node.start_mark,
+ _F('expected the empty value, but found {value!r}', value=value),
+ node.start_mark,
+ )
+ return self.find_python_name(suffix, node.start_mark)
+
+ def construct_python_module(self, suffix, node):
+ # type: (Any, Any) -> Any
+ value = self.construct_scalar(node)
+ if value:
+ raise ConstructorError(
+ 'while constructing a Python module',
+ node.start_mark,
+ _F('expected the empty value, but found {value!r}', value=value),
+ node.start_mark,
+ )
+ return self.find_python_module(suffix, node.start_mark)
+
+ def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False):
+ # type: (Any, Any, Any, Any, bool) -> Any
+ if not args:
+ args = []
+ if not kwds:
+ kwds = {}
+ cls = self.find_python_name(suffix, node.start_mark)
+ if newobj and isinstance(cls, type):
+ return cls.__new__(cls, *args, **kwds)
+ else:
+ return cls(*args, **kwds)
+
+ def set_python_instance_state(self, instance, state):
+ # type: (Any, Any) -> None
+ if hasattr(instance, '__setstate__'):
+ instance.__setstate__(state)
+ else:
+ slotstate = {} # type: Dict[Any, Any]
+ if isinstance(state, tuple) and len(state) == 2:
+ state, slotstate = state
+ if hasattr(instance, '__dict__'):
+ instance.__dict__.update(state)
+ elif state:
+ slotstate.update(state)
+ for key, value in slotstate.items():
+ setattr(instance, key, value)
+
+ def construct_python_object(self, suffix, node):
+ # type: (Any, Any) -> Any
+ # Format:
+ # !!python/object:module.name { ... state ... }
+ instance = self.make_python_instance(suffix, node, newobj=True)
+ self.recursive_objects[node] = instance
+ yield instance
+ deep = hasattr(instance, '__setstate__')
+ state = self.construct_mapping(node, deep=deep)
+ self.set_python_instance_state(instance, state)
+
+ def construct_python_object_apply(self, suffix, node, newobj=False):
+ # type: (Any, Any, bool) -> Any
+ # Format:
+ # !!python/object/apply # (or !!python/object/new)
+ # args: [ ... arguments ... ]
+ # kwds: { ... keywords ... }
+ # state: ... state ...
+ # listitems: [ ... listitems ... ]
+ # dictitems: { ... dictitems ... }
+ # or short format:
+ # !!python/object/apply [ ... arguments ... ]
+ # The difference between !!python/object/apply and !!python/object/new
+ # is how an object is created, check make_python_instance for details.
+ if isinstance(node, SequenceNode):
+ args = self.construct_sequence(node, deep=True)
+ kwds = {} # type: Dict[Any, Any]
+ state = {} # type: Dict[Any, Any]
+ listitems = [] # type: List[Any]
+ dictitems = {} # type: Dict[Any, Any]
+ else:
+ value = self.construct_mapping(node, deep=True)
+ args = value.get('args', [])
+ kwds = value.get('kwds', {})
+ state = value.get('state', {})
+ listitems = value.get('listitems', [])
+ dictitems = value.get('dictitems', {})
+ instance = self.make_python_instance(suffix, node, args, kwds, newobj)
+ if bool(state):
+ self.set_python_instance_state(instance, state)
+ if bool(listitems):
+ instance.extend(listitems)
+ if bool(dictitems):
+ for key in dictitems:
+ instance[key] = dictitems[key]
+ return instance
+
+ def construct_python_object_new(self, suffix, node):
+ # type: (Any, Any) -> Any
+ return self.construct_python_object_apply(suffix, node, newobj=True)
+
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/none', Constructor.construct_yaml_null
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/str', Constructor.construct_python_str
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/bytes', Constructor.construct_python_bytes
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/int', Constructor.construct_yaml_int
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/long', Constructor.construct_python_long
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/float', Constructor.construct_yaml_float
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/complex', Constructor.construct_python_complex
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple
+)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map
+)
+
+Constructor.add_multi_constructor(
+ 'tag:yaml.org,2002:python/name:', Constructor.construct_python_name
+)
+
+Constructor.add_multi_constructor(
+ 'tag:yaml.org,2002:python/module:', Constructor.construct_python_module
+)
+
+Constructor.add_multi_constructor(
+ 'tag:yaml.org,2002:python/object:', Constructor.construct_python_object
+)
+
+Constructor.add_multi_constructor(
+ 'tag:yaml.org,2002:python/object/apply:', Constructor.construct_python_object_apply
+)
+
+Constructor.add_multi_constructor(
+ 'tag:yaml.org,2002:python/object/new:', Constructor.construct_python_object_new
+)
+
+
+class RoundTripConstructor(SafeConstructor):
+ """need to store the comments on the node itself,
+ as well as on the items
+ """
+
+ def comment(self, idx):
+ # type: (Any) -> Any
+ assert self.loader.comment_handling is not None # type: ignore
+ x = self.scanner.comments[idx]
+ x.set_assigned()
+ return x
+
+ def comments(self, list_of_comments, idx=None):
+ # type: (Any, Optional[Any]) -> Any
+ # hand in the comment and optional pre, eol, post segment
+ if list_of_comments is None:
+ return []
+ if idx is not None:
+ if list_of_comments[idx] is None:
+ return []
+ list_of_comments = list_of_comments[idx]
+ for x in list_of_comments:
+ yield self.comment(x)
+
+ def construct_scalar(self, node):
+ # type: (Any) -> Any
+ if not isinstance(node, ScalarNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a scalar node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+
+ if node.style == '|' and isinstance(node.value, str):
+ lss = LiteralScalarString(node.value, anchor=node.anchor)
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment and node.comment[1]:
+ lss.comment = node.comment[1][0] # type: ignore
+ else:
+ # NEWCMNT
+ if node.comment is not None and node.comment[1]:
+ # nprintf('>>>>nc1', node.comment)
+ # EOL comment after |
+ lss.comment = self.comment(node.comment[1][0]) # type: ignore
+ return lss
+ if node.style == '>' and isinstance(node.value, str):
+ fold_positions = [] # type: List[int]
+ idx = -1
+ while True:
+ idx = node.value.find('\a', idx + 1)
+ if idx < 0:
+ break
+ fold_positions.append(idx - len(fold_positions))
+ fss = FoldedScalarString(node.value.replace('\a', ''), anchor=node.anchor)
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment and node.comment[1]:
+ fss.comment = node.comment[1][0] # type: ignore
+ else:
+ # NEWCMNT
+ if node.comment is not None and node.comment[1]:
+ # nprintf('>>>>nc2', node.comment)
+ # EOL comment after >
+ lss.comment = self.comment(node.comment[1][0]) # type: ignore
+ if fold_positions:
+ fss.fold_pos = fold_positions # type: ignore
+ return fss
+ elif bool(self._preserve_quotes) and isinstance(node.value, str):
+ if node.style == "'":
+ return SingleQuotedScalarString(node.value, anchor=node.anchor)
+ if node.style == '"':
+ return DoubleQuotedScalarString(node.value, anchor=node.anchor)
+ if node.anchor:
+ return PlainScalarString(node.value, anchor=node.anchor)
+ return node.value
+
+ def construct_yaml_int(self, node):
+ # type: (Any) -> Any
+ width = None # type: Any
+ value_su = self.construct_scalar(node)
+ try:
+ sx = value_su.rstrip('_')
+ underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any
+ except ValueError:
+ underscore = None
+ except IndexError:
+ underscore = None
+ value_s = value_su.replace('_', "")
+ sign = +1
+ if value_s[0] == '-':
+ sign = -1
+ if value_s[0] in '+-':
+ value_s = value_s[1:]
+ if value_s == '0':
+ return 0
+ elif value_s.startswith('0b'):
+ if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
+ width = len(value_s[2:])
+ if underscore is not None:
+ underscore[1] = value_su[2] == '_'
+ underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
+ return BinaryInt(
+ sign * int(value_s[2:], 2),
+ width=width,
+ underscore=underscore,
+ anchor=node.anchor,
+ )
+ elif value_s.startswith('0x'):
+ # default to lower-case if no a-fA-F in string
+ if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
+ width = len(value_s[2:])
+ hex_fun = HexInt # type: Any
+ for ch in value_s[2:]:
+ if ch in 'ABCDEF': # first non-digit is capital
+ hex_fun = HexCapsInt
+ break
+ if ch in 'abcdef':
+ break
+ if underscore is not None:
+ underscore[1] = value_su[2] == '_'
+ underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
+ return hex_fun(
+ sign * int(value_s[2:], 16),
+ width=width,
+ underscore=underscore,
+ anchor=node.anchor,
+ )
+ elif value_s.startswith('0o'):
+ if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
+ width = len(value_s[2:])
+ if underscore is not None:
+ underscore[1] = value_su[2] == '_'
+ underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
+ return OctalInt(
+ sign * int(value_s[2:], 8),
+ width=width,
+ underscore=underscore,
+ anchor=node.anchor,
+ )
+ elif self.resolver.processing_version != (1, 2) and value_s[0] == '0':
+ return sign * int(value_s, 8)
+ elif self.resolver.processing_version != (1, 2) and ':' in value_s:
+ digits = [int(part) for part in value_s.split(':')]
+ digits.reverse()
+ base = 1
+ value = 0
+ for digit in digits:
+ value += digit * base
+ base *= 60
+ return sign * value
+ elif self.resolver.processing_version > (1, 1) and value_s[0] == '0':
+ # not an octal, an integer with leading zero(s)
+ if underscore is not None:
+ # cannot have a leading underscore
+ underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
+ return ScalarInt(
+ sign * int(value_s), width=len(value_s), underscore=underscore
+ )
+ elif underscore:
+ # cannot have a leading underscore
+ underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
+ return ScalarInt(
+ sign * int(value_s),
+ width=None,
+ underscore=underscore,
+ anchor=node.anchor,
+ )
+ elif node.anchor:
+ return ScalarInt(sign * int(value_s), width=None, anchor=node.anchor)
+ else:
+ return sign * int(value_s)
+
+ def construct_yaml_float(self, node):
+ # type: (Any) -> Any
+ def leading_zeros(v):
+ # type: (Any) -> int
+ lead0 = 0
+ idx = 0
+ while idx < len(v) and v[idx] in '0.':
+ if v[idx] == '0':
+ lead0 += 1
+ idx += 1
+ return lead0
+
+ # underscore = None
+ m_sign = False # type: Any
+ value_so = self.construct_scalar(node)
+ value_s = value_so.replace('_', "").lower()
+ sign = +1
+ if value_s[0] == '-':
+ sign = -1
+ if value_s[0] in '+-':
+ m_sign = value_s[0]
+ value_s = value_s[1:]
+ if value_s == '.inf':
+ return sign * self.inf_value
+ if value_s == '.nan':
+ return self.nan_value
+ if self.resolver.processing_version != (1, 2) and ':' in value_s:
+ digits = [float(part) for part in value_s.split(':')]
+ digits.reverse()
+ base = 1
+ value = 0.0
+ for digit in digits:
+ value += digit * base
+ base *= 60
+ return sign * value
+ if 'e' in value_s:
+ try:
+ mantissa, exponent = value_so.split('e')
+ exp = 'e'
+ except ValueError:
+ mantissa, exponent = value_so.split('E')
+ exp = 'E'
+ if self.resolver.processing_version != (1, 2):
+ # value_s is lower case independent of input
+ if '.' not in mantissa:
+ warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
+ lead0 = leading_zeros(mantissa)
+ width = len(mantissa)
+ prec = mantissa.find('.')
+ if m_sign:
+ width -= 1
+ e_width = len(exponent)
+ e_sign = exponent[0] in '+-'
+ # nprint('sf', width, prec, m_sign, exp, e_width, e_sign)
+ return ScalarFloat(
+ sign * float(value_s),
+ width=width,
+ prec=prec,
+ m_sign=m_sign,
+ m_lead0=lead0,
+ exp=exp,
+ e_width=e_width,
+ e_sign=e_sign,
+ anchor=node.anchor,
+ )
+ width = len(value_so)
+ prec = value_so.index(
+ '.'
+ ) # you can use index, this would not be float without dot
+ lead0 = leading_zeros(value_so)
+ return ScalarFloat(
+ sign * float(value_s),
+ width=width,
+ prec=prec,
+ m_sign=m_sign,
+ m_lead0=lead0,
+ anchor=node.anchor,
+ )
+
+ def construct_yaml_str(self, node):
+ # type: (Any) -> Any
+ value = self.construct_scalar(node)
+ if isinstance(value, ScalarString):
+ return value
+ return value
+
+ def construct_rt_sequence(self, node, seqtyp, deep=False):
+ # type: (Any, Any, bool) -> Any
+ if not isinstance(node, SequenceNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a sequence node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ ret_val = []
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment:
+ seqtyp._yaml_add_comment(node.comment[:2])
+ if len(node.comment) > 2:
+ # this happens e.g. if you have a sequence element that is a flow-style
+ # mapping and that has no EOL comment but a following commentline or
+ # empty line
+ seqtyp.yaml_end_comment_extend(node.comment[2], clear=True)
+ else:
+ # NEWCMNT
+ if node.comment:
+ nprintf('nc3', node.comment)
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ seqtyp.yaml_set_anchor(node.anchor)
+ for idx, child in enumerate(node.value):
+ if child.comment:
+ seqtyp._yaml_add_comment(child.comment, key=idx)
+ child.comment = None # if moved to sequence remove from child
+ ret_val.append(self.construct_object(child, deep=deep))
+ seqtyp._yaml_set_idx_line_col(
+ idx, [child.start_mark.line, child.start_mark.column]
+ )
+ return ret_val
+
+ def flatten_mapping(self, node):
+ # type: (Any) -> Any
+ """
+ This implements the merge key feature http://yaml.org/type/merge.html
+ by inserting keys from the merge dict/list of dicts if not yet
+ available in this node
+ """
+
+ def constructed(value_node):
+ # type: (Any) -> Any
+ # If the contents of a merge are defined within the
+ # merge marker, then they won't have been constructed
+ # yet. But if they were already constructed, we need to use
+ # the existing object.
+ if value_node in self.constructed_objects:
+ value = self.constructed_objects[value_node]
+ else:
+ value = self.construct_object(value_node, deep=False)
+ return value
+
+ # merge = []
+ merge_map_list = [] # type: List[Any]
+ index = 0
+ while index < len(node.value):
+ key_node, value_node = node.value[index]
+ if key_node.tag == u'tag:yaml.org,2002:merge':
+ if merge_map_list and not self.allow_duplicate_keys:
+ args = [
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found duplicate key "{}"'.format(key_node.value),
+ key_node.start_mark,
+ """
+ To suppress this check see:
+ http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
+ """,
+ """\
+ Duplicate keys will become an error in future releases, and are errors
+ by default when using the new API.
+ """,
+ ]
+ if self.allow_duplicate_keys is None:
+ warnings.warn(DuplicateKeyFutureWarning(*args))
+ else:
+ raise DuplicateKeyError(*args)
+ del node.value[index]
+ if isinstance(value_node, MappingNode):
+ merge_map_list.append((index, constructed(value_node)))
+ # self.flatten_mapping(value_node)
+ # merge.extend(value_node.value)
+ elif isinstance(value_node, SequenceNode):
+ # submerge = []
+ for subnode in value_node.value:
+ if not isinstance(subnode, MappingNode):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ _F(
+ 'expected a mapping for merging, but found {subnode_id!s}',
+ subnode_id=subnode.id,
+ ),
+ subnode.start_mark,
+ )
+ merge_map_list.append((index, constructed(subnode)))
+ # self.flatten_mapping(subnode)
+ # submerge.append(subnode.value)
+ # submerge.reverse()
+ # for value in submerge:
+ # merge.extend(value)
+ else:
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ _F(
+ 'expected a mapping or list of mappings for merging, '
+ 'but found {value_node_id!s}',
+ value_node_id=value_node.id,
+ ),
+ value_node.start_mark,
+ )
+ elif key_node.tag == 'tag:yaml.org,2002:value':
+ key_node.tag = 'tag:yaml.org,2002:str'
+ index += 1
+ else:
+ index += 1
+ return merge_map_list
+ # if merge:
+ # node.value = merge + node.value
+
+ def _sentinel(self):
+ # type: () -> None
+ pass
+
+ def construct_mapping(self, node, maptyp, deep=False): # type: ignore
+ # type: (Any, Any, bool) -> Any
+ if not isinstance(node, MappingNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ merge_map = self.flatten_mapping(node)
+ # mapping = {}
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment:
+ maptyp._yaml_add_comment(node.comment[:2])
+ if len(node.comment) > 2:
+ maptyp.yaml_end_comment_extend(node.comment[2], clear=True)
+ else:
+ # NEWCMNT
+ if node.comment:
+ # nprintf('nc4', node.comment, node.start_mark)
+ if maptyp.ca.pre is None:
+ maptyp.ca.pre = []
+ for cmnt in self.comments(node.comment, 0):
+ maptyp.ca.pre.append(cmnt)
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ maptyp.yaml_set_anchor(node.anchor)
+ last_key, last_value = None, self._sentinel
+ for key_node, value_node in node.value:
+ # keys can be list -> deep
+ key = self.construct_object(key_node, deep=True)
+ # lists are not hashable, but tuples are
+ if not isinstance(key, Hashable):
+ if isinstance(key, MutableSequence):
+ key_s = CommentedKeySeq(key)
+ if key_node.flow_style is True:
+ key_s.fa.set_flow_style()
+ elif key_node.flow_style is False:
+ key_s.fa.set_block_style()
+ key = key_s
+ elif isinstance(key, MutableMapping):
+ key_m = CommentedKeyMap(key)
+ if key_node.flow_style is True:
+ key_m.fa.set_flow_style()
+ elif key_node.flow_style is False:
+ key_m.fa.set_block_style()
+ key = key_m
+ if not isinstance(key, Hashable):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found unhashable key',
+ key_node.start_mark,
+ )
+ value = self.construct_object(value_node, deep=deep)
+ if self.check_mapping_key(node, key_node, maptyp, key, value):
+ if self.loader and self.loader.comment_handling is None:
+ if (
+ key_node.comment
+ and len(key_node.comment) > 4
+ and key_node.comment[4]
+ ):
+ if last_value is None:
+ key_node.comment[0] = key_node.comment.pop(4)
+ maptyp._yaml_add_comment(key_node.comment, value=last_key)
+ else:
+ key_node.comment[2] = key_node.comment.pop(4)
+ maptyp._yaml_add_comment(key_node.comment, key=key)
+ key_node.comment = None
+ if key_node.comment:
+ maptyp._yaml_add_comment(key_node.comment, key=key)
+ if value_node.comment:
+ maptyp._yaml_add_comment(value_node.comment, value=key)
+ else:
+ # NEWCMNT
+ if key_node.comment:
+ nprintf('nc5a', key, key_node.comment)
+ if key_node.comment[0]:
+ maptyp.ca.set(key, C_KEY_PRE, key_node.comment[0])
+ if key_node.comment[1]:
+ maptyp.ca.set(key, C_KEY_EOL, key_node.comment[1])
+ if key_node.comment[2]:
+ maptyp.ca.set(key, C_KEY_POST, key_node.comment[2])
+ if value_node.comment:
+ nprintf('nc5b', key, value_node.comment)
+ if value_node.comment[0]:
+ maptyp.ca.set(key, C_VALUE_PRE, value_node.comment[0])
+ if value_node.comment[1]:
+ maptyp.ca.set(key, C_VALUE_EOL, value_node.comment[1])
+ if value_node.comment[2]:
+ maptyp.ca.set(key, C_VALUE_POST, value_node.comment[2])
+ maptyp._yaml_set_kv_line_col(
+ key,
+ [
+ key_node.start_mark.line,
+ key_node.start_mark.column,
+ value_node.start_mark.line,
+ value_node.start_mark.column,
+ ],
+ )
+ maptyp[key] = value
+ last_key, last_value = key, value # could use indexing
+ # do this last, or <<: before a key will prevent insertion in instances
+ # of collections.OrderedDict (as they have no __contains__
+ if merge_map:
+ maptyp.add_yaml_merge(merge_map)
+
+ def construct_setting(self, node, typ, deep=False):
+ # type: (Any, Any, bool) -> Any
+ if not isinstance(node, MappingNode):
+ raise ConstructorError(
+ None,
+ None,
+ _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment:
+ typ._yaml_add_comment(node.comment[:2])
+ if len(node.comment) > 2:
+ typ.yaml_end_comment_extend(node.comment[2], clear=True)
+ else:
+ # NEWCMNT
+ if node.comment:
+ nprintf('nc6', node.comment)
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ typ.yaml_set_anchor(node.anchor)
+ for key_node, value_node in node.value:
+ # keys can be list -> deep
+ key = self.construct_object(key_node, deep=True)
+ # lists are not hashable, but tuples are
+ if not isinstance(key, Hashable):
+ if isinstance(key, list):
+ key = tuple(key)
+ if not isinstance(key, Hashable):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ 'found unhashable key',
+ key_node.start_mark,
+ )
+ # construct but should be null
+ value = self.construct_object(value_node, deep=deep) # NOQA
+ self.check_set_key(node, key_node, typ, key)
+ if self.loader and self.loader.comment_handling is None:
+ if key_node.comment:
+ typ._yaml_add_comment(key_node.comment, key=key)
+ if value_node.comment:
+ typ._yaml_add_comment(value_node.comment, value=key)
+ else:
+ # NEWCMNT
+ if key_node.comment:
+ nprintf('nc7a', key_node.comment)
+ if value_node.comment:
+ nprintf('nc7b', value_node.comment)
+ typ.add(key)
+
+ def construct_yaml_seq(self, node):
+ # type: (Any) -> Any
+ data = CommentedSeq()
+ data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ # if node.comment:
+ # data._yaml_add_comment(node.comment)
+ yield data
+ data.extend(self.construct_rt_sequence(node, data))
+ self.set_collection_style(data, node)
+
+ def construct_yaml_map(self, node):
+ # type: (Any) -> Any
+ data = CommentedMap()
+ data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ yield data
+ self.construct_mapping(node, data, deep=True)
+ self.set_collection_style(data, node)
+
+ def set_collection_style(self, data, node):
+ # type: (Any, Any) -> None
+ if len(data) == 0:
+ return
+ if node.flow_style is True:
+ data.fa.set_flow_style()
+ elif node.flow_style is False:
+ data.fa.set_block_style()
+
+ def construct_yaml_object(self, node, cls):
+ # type: (Any, Any) -> Any
+ data = cls.__new__(cls)
+ yield data
+ if hasattr(data, '__setstate__'):
+ state = SafeConstructor.construct_mapping(self, node, deep=True)
+ data.__setstate__(state)
+ else:
+ state = SafeConstructor.construct_mapping(self, node)
+ if hasattr(data, '__attrs_attrs__'): # issue 394
+ data.__init__(**state)
+ else:
+ data.__dict__.update(state)
+ if node.anchor:
+ from ruyaml.anchor import Anchor
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ if not hasattr(data, Anchor.attrib):
+ a = Anchor()
+ setattr(data, Anchor.attrib, a)
+ else:
+ a = getattr(data, Anchor.attrib)
+ a.value = node.anchor
+
+ def construct_yaml_omap(self, node):
+ # type: (Any) -> Any
+ # Note: we do now check for duplicate keys
+ omap = CommentedOrderedMap()
+ omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ if node.flow_style is True:
+ omap.fa.set_flow_style()
+ elif node.flow_style is False:
+ omap.fa.set_block_style()
+ yield omap
+ if self.loader and self.loader.comment_handling is None:
+ if node.comment:
+ omap._yaml_add_comment(node.comment[:2])
+ if len(node.comment) > 2:
+ omap.yaml_end_comment_extend(node.comment[2], clear=True)
+ else:
+ # NEWCMNT
+ if node.comment:
+ nprintf('nc8', node.comment)
+ if not isinstance(node, SequenceNode):
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ node.start_mark,
+ )
+ for subnode in node.value:
+ if not isinstance(subnode, MappingNode):
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F(
+ 'expected a mapping of length 1, but found {subnode_id!s}',
+ subnode_id=subnode.id,
+ ),
+ subnode.start_mark,
+ )
+ if len(subnode.value) != 1:
+ raise ConstructorError(
+ 'while constructing an ordered map',
+ node.start_mark,
+ _F(
+ 'expected a single mapping item, but found {len_subnode_val:d} items',
+ len_subnode_val=len(subnode.value),
+ ),
+ subnode.start_mark,
+ )
+ key_node, value_node = subnode.value[0]
+ key = self.construct_object(key_node)
+ assert key not in omap
+ value = self.construct_object(value_node)
+ if self.loader and self.loader.comment_handling is None:
+ if key_node.comment:
+ omap._yaml_add_comment(key_node.comment, key=key)
+ if subnode.comment:
+ omap._yaml_add_comment(subnode.comment, key=key)
+ if value_node.comment:
+ omap._yaml_add_comment(value_node.comment, value=key)
+ else:
+ # NEWCMNT
+ if key_node.comment:
+ nprintf('nc9a', key_node.comment)
+ if subnode.comment:
+ nprintf('nc9b', subnode.comment)
+ if value_node.comment:
+ nprintf('nc9c', value_node.comment)
+ omap[key] = value
+
+ def construct_yaml_set(self, node):
+ # type: (Any) -> Any
+ data = CommentedSet()
+ data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ yield data
+ self.construct_setting(node, data)
+
+ def construct_undefined(self, node):
+ # type: (Any) -> Any
+ try:
+ if isinstance(node, MappingNode):
+ data = CommentedMap()
+ data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ if node.flow_style is True:
+ data.fa.set_flow_style()
+ elif node.flow_style is False:
+ data.fa.set_block_style()
+ data.yaml_set_tag(node.tag)
+ yield data
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ data.yaml_set_anchor(node.anchor)
+ self.construct_mapping(node, data)
+ return
+ elif isinstance(node, ScalarNode):
+ data2 = TaggedScalar()
+ data2.value = self.construct_scalar(node)
+ data2.style = node.style
+ data2.yaml_set_tag(node.tag)
+ yield data2
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ data2.yaml_set_anchor(node.anchor, always_dump=True)
+ return
+ elif isinstance(node, SequenceNode):
+ data3 = CommentedSeq()
+ data3._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
+ if node.flow_style is True:
+ data3.fa.set_flow_style()
+ elif node.flow_style is False:
+ data3.fa.set_block_style()
+ data3.yaml_set_tag(node.tag)
+ yield data3
+ if node.anchor:
+ from ruyaml.serializer import templated_id
+
+ if not templated_id(node.anchor):
+ data3.yaml_set_anchor(node.anchor)
+ data3.extend(self.construct_sequence(node))
+ return
+ except: # NOQA
+ pass
+ raise ConstructorError(
+ None,
+ None,
+ _F(
+ 'could not determine a constructor for the tag {node_tag!r}',
+ node_tag=node.tag,
+ ),
+ node.start_mark,
+ )
+
+ def construct_yaml_timestamp(self, node, values=None):
+ # type: (Any, Any) -> Any
+ try:
+ match = self.timestamp_regexp.match(node.value)
+ except TypeError:
+ match = None
+ if match is None:
+ raise ConstructorError(
+ None,
+ None,
+ 'failed to construct timestamp from "{}"'.format(node.value),
+ node.start_mark,
+ )
+ values = match.groupdict()
+ if not values['hour']:
+ return create_timestamp(**values)
+ # return SafeConstructor.construct_yaml_timestamp(self, node, values)
+ for part in ['t', 'tz_sign', 'tz_hour', 'tz_minute']:
+ if values[part]:
+ break
+ else:
+ return create_timestamp(**values)
+ # return SafeConstructor.construct_yaml_timestamp(self, node, values)
+ year = int(values['year'])
+ month = int(values['month'])
+ day = int(values['day'])
+ hour = int(values['hour'])
+ minute = int(values['minute'])
+ second = int(values['second'])
+ fraction = 0
+ if values['fraction']:
+ fraction_s = values['fraction'][:6]
+ while len(fraction_s) < 6:
+ fraction_s += '0'
+ fraction = int(fraction_s)
+ if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4:
+ fraction += 1
+ delta = None
+ if values['tz_sign']:
+ tz_hour = int(values['tz_hour'])
+ minutes = values['tz_minute']
+ tz_minute = int(minutes) if minutes else 0
+ delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute)
+ if values['tz_sign'] == '-':
+ delta = -delta
+ # should check for None and solve issue 366 should be tzinfo=delta)
+ if delta:
+ dt = datetime.datetime(year, month, day, hour, minute)
+ dt -= delta
+ data = TimeStamp(
+ dt.year, dt.month, dt.day, dt.hour, dt.minute, second, fraction
+ )
+ data._yaml['delta'] = delta
+ tz = values['tz_sign'] + values['tz_hour']
+ if values['tz_minute']:
+ tz += ':' + values['tz_minute']
+ data._yaml['tz'] = tz
+ else:
+ data = TimeStamp(year, month, day, hour, minute, second, fraction)
+ if values['tz']: # no delta
+ data._yaml['tz'] = values['tz']
+
+ if values['t']:
+ data._yaml['t'] = True
+ return data
+
+ def construct_yaml_bool(self, node):
+ # type: (Any) -> Any
+ b = SafeConstructor.construct_yaml_bool(self, node)
+ if node.anchor:
+ return ScalarBoolean(b, anchor=node.anchor)
+ return b
+
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:null', RoundTripConstructor.construct_yaml_null
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:int', RoundTripConstructor.construct_yaml_int
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:float', RoundTripConstructor.construct_yaml_float
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:binary', RoundTripConstructor.construct_yaml_binary
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:timestamp', RoundTripConstructor.construct_yaml_timestamp
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:omap', RoundTripConstructor.construct_yaml_omap
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:pairs', RoundTripConstructor.construct_yaml_pairs
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:set', RoundTripConstructor.construct_yaml_set
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:str', RoundTripConstructor.construct_yaml_str
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:seq', RoundTripConstructor.construct_yaml_seq
+)
+
+RoundTripConstructor.add_constructor(
+ 'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map
+)
+
+RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined)