diff options
Diffstat (limited to 'third_party/python/diskcache/diskcache/recipes.py')
-rw-r--r-- | third_party/python/diskcache/diskcache/recipes.py | 437 |
1 files changed, 437 insertions, 0 deletions
diff --git a/third_party/python/diskcache/diskcache/recipes.py b/third_party/python/diskcache/diskcache/recipes.py new file mode 100644 index 0000000000..fb6425090a --- /dev/null +++ b/third_party/python/diskcache/diskcache/recipes.py @@ -0,0 +1,437 @@ +"""Disk Cache Recipes + +""" + +import functools +import math +import os +import random +import sys +import threading +import time + +from .core import ENOVAL, args_to_key, full_name + +############################################################################ +# BEGIN Python 2/3 Shims +############################################################################ + +if sys.hexversion < 0x03000000: + from thread import get_ident # pylint: disable=import-error +else: + from threading import get_ident + +############################################################################ +# END Python 2/3 Shims +############################################################################ + + +class Averager(object): + """Recipe for calculating a running average. + + Sometimes known as "online statistics," the running average maintains the + total and count. The average can then be calculated at any time. + + >>> import diskcache + >>> cache = diskcache.FanoutCache() + >>> ave = Averager(cache, 'latency') + >>> ave.add(0.080) + >>> ave.add(0.120) + >>> ave.get() + 0.1 + >>> ave.add(0.160) + >>> ave.pop() + 0.12 + >>> print(ave.get()) + None + + """ + def __init__(self, cache, key, expire=None, tag=None): + self._cache = cache + self._key = key + self._expire = expire + self._tag = tag + + def add(self, value): + "Add `value` to average." + with self._cache.transact(retry=True): + total, count = self._cache.get(self._key, default=(0.0, 0)) + total += value + count += 1 + self._cache.set( + self._key, (total, count), expire=self._expire, tag=self._tag, + ) + + def get(self): + "Get current average or return `None` if count equals zero." + total, count = self._cache.get(self._key, default=(0.0, 0), retry=True) + return None if count == 0 else total / count + + def pop(self): + "Return current average and delete key." + total, count = self._cache.pop(self._key, default=(0.0, 0), retry=True) + return None if count == 0 else total / count + + +class Lock(object): + """Recipe for cross-process and cross-thread lock. + + >>> import diskcache + >>> cache = diskcache.Cache() + >>> lock = Lock(cache, 'report-123') + >>> lock.acquire() + >>> lock.release() + >>> with lock: + ... pass + + """ + def __init__(self, cache, key, expire=None, tag=None): + self._cache = cache + self._key = key + self._expire = expire + self._tag = tag + + def acquire(self): + "Acquire lock using spin-lock algorithm." + while True: + added = self._cache.add( + self._key, None, expire=self._expire, tag=self._tag, retry=True, + ) + if added: + break + time.sleep(0.001) + + def release(self): + "Release lock by deleting key." + self._cache.delete(self._key, retry=True) + + def __enter__(self): + self.acquire() + + def __exit__(self, *exc_info): + self.release() + + +class RLock(object): + """Recipe for cross-process and cross-thread re-entrant lock. + + >>> import diskcache + >>> cache = diskcache.Cache() + >>> rlock = RLock(cache, 'user-123') + >>> rlock.acquire() + >>> rlock.acquire() + >>> rlock.release() + >>> with rlock: + ... pass + >>> rlock.release() + >>> rlock.release() + Traceback (most recent call last): + ... + AssertionError: cannot release un-acquired lock + + """ + def __init__(self, cache, key, expire=None, tag=None): + self._cache = cache + self._key = key + self._expire = expire + self._tag = tag + + def acquire(self): + "Acquire lock by incrementing count using spin-lock algorithm." + pid = os.getpid() + tid = get_ident() + pid_tid = '{}-{}'.format(pid, tid) + + while True: + with self._cache.transact(retry=True): + value, count = self._cache.get(self._key, default=(None, 0)) + if pid_tid == value or count == 0: + self._cache.set( + self._key, (pid_tid, count + 1), + expire=self._expire, tag=self._tag, + ) + return + time.sleep(0.001) + + def release(self): + "Release lock by decrementing count." + pid = os.getpid() + tid = get_ident() + pid_tid = '{}-{}'.format(pid, tid) + + with self._cache.transact(retry=True): + value, count = self._cache.get(self._key, default=(None, 0)) + is_owned = pid_tid == value and count > 0 + assert is_owned, 'cannot release un-acquired lock' + self._cache.set( + self._key, (value, count - 1), + expire=self._expire, tag=self._tag, + ) + + def __enter__(self): + self.acquire() + + def __exit__(self, *exc_info): + self.release() + + +class BoundedSemaphore(object): + """Recipe for cross-process and cross-thread bounded semaphore. + + >>> import diskcache + >>> cache = diskcache.Cache() + >>> semaphore = BoundedSemaphore(cache, 'max-cons', value=2) + >>> semaphore.acquire() + >>> semaphore.acquire() + >>> semaphore.release() + >>> with semaphore: + ... pass + >>> semaphore.release() + >>> semaphore.release() + Traceback (most recent call last): + ... + AssertionError: cannot release un-acquired semaphore + + """ + def __init__(self, cache, key, value=1, expire=None, tag=None): + self._cache = cache + self._key = key + self._value = value + self._expire = expire + self._tag = tag + + def acquire(self): + "Acquire semaphore by decrementing value using spin-lock algorithm." + while True: + with self._cache.transact(retry=True): + value = self._cache.get(self._key, default=self._value) + if value > 0: + self._cache.set( + self._key, value - 1, + expire=self._expire, tag=self._tag, + ) + return + time.sleep(0.001) + + def release(self): + "Release semaphore by incrementing value." + with self._cache.transact(retry=True): + value = self._cache.get(self._key, default=self._value) + assert self._value > value, 'cannot release un-acquired semaphore' + value += 1 + self._cache.set( + self._key, value, expire=self._expire, tag=self._tag, + ) + + def __enter__(self): + self.acquire() + + def __exit__(self, *exc_info): + self.release() + + +def throttle(cache, count, seconds, name=None, expire=None, tag=None, + time_func=time.time, sleep_func=time.sleep): + """Decorator to throttle calls to function. + + >>> import diskcache, time + >>> cache = diskcache.Cache() + >>> count = 0 + >>> @throttle(cache, 2, 1) # 2 calls per 1 second + ... def increment(): + ... global count + ... count += 1 + >>> start = time.time() + >>> while (time.time() - start) <= 2: + ... increment() + >>> count in (6, 7) # 6 or 7 calls depending on CPU load + True + + """ + def decorator(func): + rate = count / float(seconds) + key = full_name(func) if name is None else name + now = time_func() + cache.set(key, (now, count), expire=expire, tag=tag, retry=True) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + while True: + with cache.transact(retry=True): + last, tally = cache.get(key) + now = time_func() + tally += (now - last) * rate + delay = 0 + + if tally > count: + cache.set(key, (now, count - 1), expire) + elif tally >= 1: + cache.set(key, (now, tally - 1), expire) + else: + delay = (1 - tally) / rate + + if delay: + sleep_func(delay) + else: + break + + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def barrier(cache, lock_factory, name=None, expire=None, tag=None): + """Barrier to calling decorated function. + + Supports different kinds of locks: Lock, RLock, BoundedSemaphore. + + >>> import diskcache, time + >>> cache = diskcache.Cache() + >>> @barrier(cache, Lock) + ... def work(num): + ... print('worker started') + ... time.sleep(1) + ... print('worker finished') + >>> import multiprocessing.pool + >>> pool = multiprocessing.pool.ThreadPool(2) + >>> _ = pool.map(work, range(2)) + worker started + worker finished + worker started + worker finished + >>> pool.terminate() + + """ + def decorator(func): + key = full_name(func) if name is None else name + lock = lock_factory(cache, key, expire=expire, tag=tag) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + with lock: + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def memoize_stampede(cache, expire, name=None, typed=False, tag=None, beta=1): + """Memoizing cache decorator with cache stampede protection. + + Cache stampedes are a type of system overload that can occur when parallel + computing systems using memoization come under heavy load. This behaviour + is sometimes also called dog-piling, cache miss storm, cache choking, or + the thundering herd problem. + + The memoization decorator implements cache stampede protection through + early recomputation. Early recomputation of function results will occur + probabilistically before expiration in a background thread of + execution. Early probabilistic recomputation is based on research by + Vattani, A.; Chierichetti, F.; Lowenstein, K. (2015), Optimal Probabilistic + Cache Stampede Prevention, VLDB, pp. 886-897, ISSN 2150-8097 + + If name is set to None (default), the callable name will be determined + automatically. + + If typed is set to True, function arguments of different types will be + cached separately. For example, f(3) and f(3.0) will be treated as distinct + calls with distinct results. + + The original underlying function is accessible through the `__wrapped__` + attribute. This is useful for introspection, for bypassing the cache, or + for rewrapping the function with a different cache. + + >>> from diskcache import Cache + >>> cache = Cache() + >>> @memoize_stampede(cache, expire=1) + ... def fib(number): + ... if number == 0: + ... return 0 + ... elif number == 1: + ... return 1 + ... else: + ... return fib(number - 1) + fib(number - 2) + >>> print(fib(100)) + 354224848179261915075 + + An additional `__cache_key__` attribute can be used to generate the cache + key used for the given arguments. + + >>> key = fib.__cache_key__(100) + >>> del cache[key] + + Remember to call memoize when decorating a callable. If you forget, then a + TypeError will occur. + + :param cache: cache to store callable arguments and return values + :param float expire: seconds until arguments expire + :param str name: name given for callable (default None, automatic) + :param bool typed: cache different types separately (default False) + :param str tag: text to associate with arguments (default None) + :return: callable decorator + + """ + # Caution: Nearly identical code exists in Cache.memoize + def decorator(func): + "Decorator created by memoize call for callable." + base = (full_name(func),) if name is None else (name,) + + def timer(*args, **kwargs): + "Time execution of `func` and return result and time delta." + start = time.time() + result = func(*args, **kwargs) + delta = time.time() - start + return result, delta + + @functools.wraps(func) + def wrapper(*args, **kwargs): + "Wrapper for callable to cache arguments and return values." + key = wrapper.__cache_key__(*args, **kwargs) + pair, expire_time = cache.get( + key, default=ENOVAL, expire_time=True, retry=True, + ) + + if pair is not ENOVAL: + result, delta = pair + now = time.time() + ttl = expire_time - now + + if (-delta * beta * math.log(random.random())) < ttl: + return result # Cache hit. + + # Check whether a thread has started for early recomputation. + + thread_key = key + (ENOVAL,) + thread_added = cache.add( + thread_key, None, expire=delta, retry=True, + ) + + if thread_added: + # Start thread for early recomputation. + def recompute(): + with cache: + pair = timer(*args, **kwargs) + cache.set( + key, pair, expire=expire, tag=tag, retry=True, + ) + thread = threading.Thread(target=recompute) + thread.daemon = True + thread.start() + + return result + + pair = timer(*args, **kwargs) + cache.set(key, pair, expire=expire, tag=tag, retry=True) + return pair[0] + + def __cache_key__(*args, **kwargs): + "Make key for cache given function arguments." + return args_to_key(base, args, kwargs, typed) + + wrapper.__cache_key__ = __cache_key__ + return wrapper + + return decorator |