summaryrefslogtreecommitdiffstats
path: root/lib/ruyaml/compat.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ruyaml/compat.py')
-rw-r--r--lib/ruyaml/compat.py263
1 files changed, 263 insertions, 0 deletions
diff --git a/lib/ruyaml/compat.py b/lib/ruyaml/compat.py
new file mode 100644
index 0000000..91f4c81
--- /dev/null
+++ b/lib/ruyaml/compat.py
@@ -0,0 +1,263 @@
+# coding: utf-8
+
+# partially from package six by Benjamin Peterson
+
+import collections.abc
+import io
+import os
+import sys
+import traceback
+from abc import abstractmethod
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+# partially from package six by Benjamin Peterson
+
+
+_DEFAULT_YAML_VERSION = (1, 2)
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from ordereddict import OrderedDict # type: ignore
+
+ # to get the right name import ... as ordereddict doesn't do that
+
+
+class ordereddict(OrderedDict): # type: ignore
+ if not hasattr(OrderedDict, 'insert'):
+
+ def insert(self, pos, key, value):
+ # type: (int, Any, Any) -> None
+ if pos >= len(self):
+ self[key] = value
+ return
+ od = ordereddict()
+ od.update(self)
+ for k in od:
+ del self[k]
+ for index, old_key in enumerate(od):
+ if pos == index:
+ self[key] = value
+ self[old_key] = od[old_key]
+
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+
+# replace with f-strings when 3.5 support is dropped
+# ft = '42'
+# assert _F('abc {ft!r}', ft=ft) == 'abc %r' % ft
+# 'abc %r' % ft -> _F('abc {ft!r}' -> f'abc {ft!r}'
+def _F(s, *superfluous, **kw):
+ # type: (Any, Any, Any) -> Any
+ if superfluous:
+ raise TypeError
+ return s.format(**kw)
+
+
+StringIO = io.StringIO
+BytesIO = io.BytesIO
+
+# StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO]
+# StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore
+StreamType = Any
+
+StreamTextType = StreamType # Union[Text, StreamType]
+VersionType = Union[List[int], str, Tuple[int, int]]
+
+builtins_module = 'builtins'
+
+UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2
+
+DBG_TOKEN = 1
+DBG_EVENT = 2
+DBG_NODE = 4
+
+
+_debug = None # type: Optional[int]
+if 'RUAMELDEBUG' in os.environ:
+ _debugx = os.environ.get('RUAMELDEBUG')
+ if _debugx is None:
+ _debug = 0
+ else:
+ _debug = int(_debugx)
+
+
+if bool(_debug):
+
+ class ObjectCounter:
+ def __init__(self):
+ # type: () -> None
+ self.map = {} # type: Dict[Any, Any]
+
+ def __call__(self, k):
+ # type: (Any) -> None
+ self.map[k] = self.map.get(k, 0) + 1
+
+ def dump(self):
+ # type: () -> None
+ for k in sorted(self.map):
+ sys.stdout.write('{} -> {}'.format(k, self.map[k]))
+
+ object_counter = ObjectCounter()
+
+
+# used from yaml util when testing
+def dbg(val=None):
+ # type: (Any) -> Any
+ global _debug
+ if _debug is None:
+ # set to true or false
+ _debugx = os.environ.get('YAMLDEBUG')
+ if _debugx is None:
+ _debug = 0
+ else:
+ _debug = int(_debugx)
+ if val is None:
+ return _debug
+ return _debug & val
+
+
+class Nprint:
+ def __init__(self, file_name=None):
+ # type: (Any) -> None
+ self._max_print = None # type: Any
+ self._count = None # type: Any
+ self._file_name = file_name
+
+ def __call__(self, *args, **kw):
+ # type: (Any, Any) -> None
+ if not bool(_debug):
+ return
+ out = sys.stdout if self._file_name is None else open(self._file_name, 'a')
+ dbgprint = print # to fool checking for print statements by dv utility
+ kw1 = kw.copy()
+ kw1['file'] = out
+ dbgprint(*args, **kw1)
+ out.flush()
+ if self._max_print is not None:
+ if self._count is None:
+ self._count = self._max_print
+ self._count -= 1
+ if self._count == 0:
+ dbgprint('forced exit\n')
+ traceback.print_stack()
+ out.flush()
+ sys.exit(0)
+ if self._file_name:
+ out.close()
+
+ def set_max_print(self, i):
+ # type: (int) -> None
+ self._max_print = i
+ self._count = None
+
+ def fp(self, mode='a'):
+ # type: (str) -> Any
+ out = sys.stdout if self._file_name is None else open(self._file_name, mode)
+ return out
+
+
+nprint = Nprint()
+nprintf = Nprint('/var/tmp/ruyaml.log')
+
+# char checkers following production rules
+
+
+def check_namespace_char(ch):
+ # type: (Any) -> bool
+ if '\x21' <= ch <= '\x7E': # ! to ~
+ return True
+ if '\xA0' <= ch <= '\uD7FF':
+ return True
+ if ('\uE000' <= ch <= '\uFFFD') and ch != '\uFEFF': # excl. byte order mark
+ return True
+ if '\U00010000' <= ch <= '\U0010FFFF':
+ return True
+ return False
+
+
+def check_anchorname_char(ch):
+ # type: (Any) -> bool
+ if ch in ',[]{}':
+ return False
+ return check_namespace_char(ch)
+
+
+def version_tnf(t1, t2=None):
+ # type: (Any, Any) -> Any
+ """
+ return True if ruyaml version_info < t1, None if t2 is specified and bigger else False
+ """
+ from ruyaml import version_info # NOQA
+
+ if version_info < t1:
+ return True
+ if t2 is not None and version_info < t2:
+ return None
+ return False
+
+
+class MutableSliceableSequence(collections.abc.MutableSequence): # type: ignore
+ __slots__ = ()
+
+ def __getitem__(self, index):
+ # type: (Any) -> Any
+ if not isinstance(index, slice):
+ return self.__getsingleitem__(index)
+ return type(self)([self[i] for i in range(*index.indices(len(self)))]) # type: ignore
+
+ def __setitem__(self, index, value):
+ # type: (Any, Any) -> None
+ if not isinstance(index, slice):
+ return self.__setsingleitem__(index, value)
+ assert iter(value)
+ # nprint(index.start, index.stop, index.step, index.indices(len(self)))
+ if index.step is None:
+ del self[index.start : index.stop]
+ for elem in reversed(value):
+ self.insert(0 if index.start is None else index.start, elem)
+ else:
+ range_parms = index.indices(len(self))
+ nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[
+ 2
+ ] + 1
+ # need to test before changing, in case TypeError is caught
+ if nr_assigned_items < len(value):
+ raise TypeError(
+ 'too many elements in value {} < {}'.format(
+ nr_assigned_items, len(value)
+ )
+ )
+ elif nr_assigned_items > len(value):
+ raise TypeError(
+ 'not enough elements in value {} > {}'.format(
+ nr_assigned_items, len(value)
+ )
+ )
+ for idx, i in enumerate(range(*range_parms)):
+ self[i] = value[idx]
+
+ def __delitem__(self, index):
+ # type: (Any) -> None
+ if not isinstance(index, slice):
+ return self.__delsingleitem__(index)
+ # nprint(index.start, index.stop, index.step, index.indices(len(self)))
+ for i in reversed(range(*index.indices(len(self)))):
+ del self[i]
+
+ @abstractmethod
+ def __getsingleitem__(self, index):
+ # type: (Any) -> Any
+ raise IndexError
+
+ @abstractmethod
+ def __setsingleitem__(self, index, value):
+ # type: (Any, Any) -> None
+ raise IndexError
+
+ @abstractmethod
+ def __delsingleitem__(self, index):
+ # type: (Any) -> None
+ raise IndexError