diff options
Diffstat (limited to 'third_party/python/Mako/mako/util.py')
-rw-r--r-- | third_party/python/Mako/mako/util.py | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/third_party/python/Mako/mako/util.py b/third_party/python/Mako/mako/util.py new file mode 100644 index 0000000000..16e3c72658 --- /dev/null +++ b/third_party/python/Mako/mako/util.py @@ -0,0 +1,400 @@ +# mako/util.py +# Copyright 2006-2020 the Mako authors and contributors <see AUTHORS file> +# +# This module is part of Mako and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php +from __future__ import absolute_import + +from ast import parse +import codecs +import collections +import operator +import os +import re +import timeit + +from mako import compat + + +def update_wrapper(decorated, fn): + decorated.__wrapped__ = fn + decorated.__name__ = fn.__name__ + return decorated + + +class PluginLoader(object): + def __init__(self, group): + self.group = group + self.impls = {} + + def load(self, name): + if name in self.impls: + return self.impls[name]() + else: + import pkg_resources + + for impl in pkg_resources.iter_entry_points(self.group, name): + self.impls[name] = impl.load + return impl.load() + else: + from mako import exceptions + + raise exceptions.RuntimeException( + "Can't load plugin %s %s" % (self.group, name) + ) + + def register(self, name, modulepath, objname): + def load(): + mod = __import__(modulepath) + for token in modulepath.split(".")[1:]: + mod = getattr(mod, token) + return getattr(mod, objname) + + self.impls[name] = load + + +def verify_directory(dir_): + """create and/or verify a filesystem directory.""" + + tries = 0 + + while not os.path.exists(dir_): + try: + tries += 1 + os.makedirs(dir_, compat.octal("0775")) + except: + if tries > 5: + raise + + +def to_list(x, default=None): + if x is None: + return default + if not isinstance(x, (list, tuple)): + return [x] + else: + return x + + +class memoized_property(object): + + """A read-only @property that is only evaluated once.""" + + def __init__(self, fget, doc=None): + self.fget = fget + self.__doc__ = doc or fget.__doc__ + self.__name__ = fget.__name__ + + def __get__(self, obj, cls): + if obj is None: + return self + obj.__dict__[self.__name__] = result = self.fget(obj) + return result + + +class memoized_instancemethod(object): + + """Decorate a method memoize its return value. + + Best applied to no-arg methods: memoization is not sensitive to + argument values, and will always return the same value even when + called with different arguments. + + """ + + def __init__(self, fget, doc=None): + self.fget = fget + self.__doc__ = doc or fget.__doc__ + self.__name__ = fget.__name__ + + def __get__(self, obj, cls): + if obj is None: + return self + + def oneshot(*args, **kw): + result = self.fget(obj, *args, **kw) + + def memo(*a, **kw): + return result + + memo.__name__ = self.__name__ + memo.__doc__ = self.__doc__ + obj.__dict__[self.__name__] = memo + return result + + oneshot.__name__ = self.__name__ + oneshot.__doc__ = self.__doc__ + return oneshot + + +class SetLikeDict(dict): + + """a dictionary that has some setlike methods on it""" + + def union(self, other): + """produce a 'union' of this dict and another (at the key level). + + values in the second dict take precedence over that of the first""" + x = SetLikeDict(**self) + x.update(other) + return x + + +class FastEncodingBuffer(object): + + """a very rudimentary buffer that is faster than StringIO, + but doesn't crash on unicode data like cStringIO.""" + + def __init__(self, encoding=None, errors="strict", as_unicode=False): + self.data = collections.deque() + self.encoding = encoding + if as_unicode: + self.delim = compat.u("") + else: + self.delim = "" + self.as_unicode = as_unicode + self.errors = errors + self.write = self.data.append + + def truncate(self): + self.data = collections.deque() + self.write = self.data.append + + def getvalue(self): + if self.encoding: + return self.delim.join(self.data).encode( + self.encoding, self.errors + ) + else: + return self.delim.join(self.data) + + +class LRUCache(dict): + + """A dictionary-like object that stores a limited number of items, + discarding lesser used items periodically. + + this is a rewrite of LRUCache from Myghty to use a periodic timestamp-based + paradigm so that synchronization is not really needed. the size management + is inexact. + """ + + class _Item(object): + def __init__(self, key, value): + self.key = key + self.value = value + self.timestamp = timeit.default_timer() + + def __repr__(self): + return repr(self.value) + + def __init__(self, capacity, threshold=0.5): + self.capacity = capacity + self.threshold = threshold + + def __getitem__(self, key): + item = dict.__getitem__(self, key) + item.timestamp = timeit.default_timer() + return item.value + + def values(self): + return [i.value for i in dict.values(self)] + + def setdefault(self, key, value): + if key in self: + return self[key] + else: + self[key] = value + return value + + def __setitem__(self, key, value): + item = dict.get(self, key) + if item is None: + item = self._Item(key, value) + dict.__setitem__(self, key, item) + else: + item.value = value + self._manage_size() + + def _manage_size(self): + while len(self) > self.capacity + self.capacity * self.threshold: + bytime = sorted( + dict.values(self), + key=operator.attrgetter("timestamp"), + reverse=True, + ) + for item in bytime[self.capacity :]: + try: + del self[item.key] + except KeyError: + # if we couldn't find a key, most likely some other thread + # broke in on us. loop around and try again + break + + +# Regexp to match python magic encoding line +_PYTHON_MAGIC_COMMENT_re = re.compile( + r"[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)", re.VERBOSE +) + + +def parse_encoding(fp): + """Deduce the encoding of a Python source file (binary mode) from magic + comment. + + It does this in the same way as the `Python interpreter`__ + + .. __: http://docs.python.org/ref/encodings.html + + The ``fp`` argument should be a seekable file object in binary mode. + """ + pos = fp.tell() + fp.seek(0) + try: + line1 = fp.readline() + has_bom = line1.startswith(codecs.BOM_UTF8) + if has_bom: + line1 = line1[len(codecs.BOM_UTF8) :] + + m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode("ascii", "ignore")) + if not m: + try: + parse(line1.decode("ascii", "ignore")) + except (ImportError, SyntaxError): + # Either it's a real syntax error, in which case the source + # is not valid python source, or line2 is a continuation of + # line1, in which case we don't want to scan line2 for a magic + # comment. + pass + else: + line2 = fp.readline() + m = _PYTHON_MAGIC_COMMENT_re.match( + line2.decode("ascii", "ignore") + ) + + if has_bom: + if m: + raise SyntaxError( + "python refuses to compile code with both a UTF8" + " byte-order-mark and a magic encoding comment" + ) + return "utf_8" + elif m: + return m.group(1) + else: + return None + finally: + fp.seek(pos) + + +def sorted_dict_repr(d): + """repr() a dictionary with the keys in order. + + Used by the lexer unit test to compare parse trees based on strings. + + """ + keys = list(d.keys()) + keys.sort() + return "{" + ", ".join(["%r: %r" % (k, d[k]) for k in keys]) + "}" + + +def restore__ast(_ast): + """Attempt to restore the required classes to the _ast module if it + appears to be missing them + """ + if hasattr(_ast, "AST"): + return + _ast.PyCF_ONLY_AST = 2 << 9 + m = compile( + """\ +def foo(): pass +class Bar(object): pass +if False: pass +baz = 'mako' +1 + 2 - 3 * 4 / 5 +6 // 7 % 8 << 9 >> 10 +11 & 12 ^ 13 | 14 +15 and 16 or 17 +-baz + (not +18) - ~17 +baz and 'foo' or 'bar' +(mako is baz == baz) is not baz != mako +mako > baz < mako >= baz <= mako +mako in baz not in mako""", + "<unknown>", + "exec", + _ast.PyCF_ONLY_AST, + ) + _ast.Module = type(m) + + for cls in _ast.Module.__mro__: + if cls.__name__ == "mod": + _ast.mod = cls + elif cls.__name__ == "AST": + _ast.AST = cls + + _ast.FunctionDef = type(m.body[0]) + _ast.ClassDef = type(m.body[1]) + _ast.If = type(m.body[2]) + + _ast.Name = type(m.body[3].targets[0]) + _ast.Store = type(m.body[3].targets[0].ctx) + _ast.Str = type(m.body[3].value) + + _ast.Sub = type(m.body[4].value.op) + _ast.Add = type(m.body[4].value.left.op) + _ast.Div = type(m.body[4].value.right.op) + _ast.Mult = type(m.body[4].value.right.left.op) + + _ast.RShift = type(m.body[5].value.op) + _ast.LShift = type(m.body[5].value.left.op) + _ast.Mod = type(m.body[5].value.left.left.op) + _ast.FloorDiv = type(m.body[5].value.left.left.left.op) + + _ast.BitOr = type(m.body[6].value.op) + _ast.BitXor = type(m.body[6].value.left.op) + _ast.BitAnd = type(m.body[6].value.left.left.op) + + _ast.Or = type(m.body[7].value.op) + _ast.And = type(m.body[7].value.values[0].op) + + _ast.Invert = type(m.body[8].value.right.op) + _ast.Not = type(m.body[8].value.left.right.op) + _ast.UAdd = type(m.body[8].value.left.right.operand.op) + _ast.USub = type(m.body[8].value.left.left.op) + + _ast.Or = type(m.body[9].value.op) + _ast.And = type(m.body[9].value.values[0].op) + + _ast.IsNot = type(m.body[10].value.ops[0]) + _ast.NotEq = type(m.body[10].value.ops[1]) + _ast.Is = type(m.body[10].value.left.ops[0]) + _ast.Eq = type(m.body[10].value.left.ops[1]) + + _ast.Gt = type(m.body[11].value.ops[0]) + _ast.Lt = type(m.body[11].value.ops[1]) + _ast.GtE = type(m.body[11].value.ops[2]) + _ast.LtE = type(m.body[11].value.ops[3]) + + _ast.In = type(m.body[12].value.ops[0]) + _ast.NotIn = type(m.body[12].value.ops[1]) + + +def read_file(path, mode="rb"): + fp = open(path, mode) + try: + data = fp.read() + return data + finally: + fp.close() + + +def read_python_file(path): + fp = open(path, "rb") + try: + encoding = parse_encoding(fp) + data = fp.read() + if encoding: + data = data.decode(encoding) + return data + finally: + fp.close() |