# -*- coding: utf-8 -*- from __future__ import absolute_import import sys import inspect import json import functools import collections from datetime import datetime, timedelta from distutils.util import strtobool import fnmatch import time import threading import socket import six from six.moves import urllib import cherrypy try: from urlparse import urljoin except ImportError: from urllib.parse import urljoin from . import logger, mgr from .exceptions import ViewCacheNoDataException from .settings import Settings from .services.auth import JwtManager def ensure_str(s, encoding='utf-8', errors='strict'): """Ported from six.""" if not isinstance(s, (six.text_type, six.binary_type)): raise TypeError("not expecting type '%s'" % type(s)) if six.PY2 and isinstance(s, six.text_type): s = s.encode(encoding, errors) elif six.PY3 and isinstance(s, six.binary_type): s = s.decode(encoding, errors) return s class RequestLoggingTool(cherrypy.Tool): def __init__(self): cherrypy.Tool.__init__(self, 'before_handler', self.request_begin, priority=10) def _setup(self): cherrypy.Tool._setup(self) cherrypy.request.hooks.attach('on_end_request', self.request_end, priority=5) cherrypy.request.hooks.attach('after_error_response', self.request_error, priority=5) def request_begin(self): req = cherrypy.request user = JwtManager.get_username() if user is not None: # PY2: Encode user to str to prevent further implicit decoding user = ensure_str(user) # Log the request. logger.debug('[%s:%s] [%s] [%s] %s', req.remote.ip, req.remote.port, req.method, user, req.path_info) # Audit the request. if Settings.AUDIT_API_ENABLED and req.method not in ['GET']: url = build_url(req.remote.ip, scheme=req.scheme, port=req.remote.port) msg = '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \ 'user=\'{}\''.format(url, req.path_info, req.method, user) if Settings.AUDIT_API_LOG_PAYLOAD: params = dict(req.params or {}, **get_request_body_params(req)) # Hide sensitive data like passwords, secret keys, ... # Extend the list of patterns to search for if necessary. # Currently parameters like this are processed: # - secret_key # - user_password # - new_passwd_to_login keys = [] for key in ['password', 'passwd', 'secret']: keys.extend([x for x in params.keys() if key in x]) for key in keys: params[key] = '***' msg = '{} params=\'{}\''.format(msg, json.dumps(params)) mgr.cluster_log('audit', mgr.CLUSTER_LOG_PRIO_INFO, msg) def request_error(self): self._request_log(logger.error) logger.error(cherrypy.response.body) def request_end(self): status = cherrypy.response.status[:3] if status in ["401"]: # log unauthorized accesses self._request_log(logger.warning) else: self._request_log(logger.info) def _format_bytes(self, num): units = ['B', 'K', 'M', 'G'] if isinstance(num, str): try: num = int(num) except ValueError: return "n/a" format_str = "{:.0f}{}" for i, unit in enumerate(units): div = 2**(10*i) if num < 2**(10*(i+1)): if num % div == 0: format_str = "{}{}" else: div = float(div) format_str = "{:.1f}{}" return format_str.format(num/div, unit[0]) # content-length bigger than 1T!! return value in bytes return "{}B".format(num) def _request_log(self, logger_fn): req = cherrypy.request res = cherrypy.response lat = time.time() - res.time user = JwtManager.get_username() status = res.status[:3] if isinstance(res.status, str) else res.status if 'Content-Length' in res.headers: length = self._format_bytes(res.headers['Content-Length']) else: length = self._format_bytes(0) if user: logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, req.remote.port, req.method, status, "{0:.3f}s".format(lat), ensure_str(user), length, req.path_info) else: logger_fn("[%s:%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, req.remote.port, req.method, status, "{0:.3f}s".format(lat), length, req.path_info) # pylint: disable=too-many-instance-attributes class ViewCache(object): VALUE_OK = 0 VALUE_STALE = 1 VALUE_NONE = 2 class GetterThread(threading.Thread): def __init__(self, view, fn, args, kwargs): super(ViewCache.GetterThread, self).__init__() self._view = view self.event = threading.Event() self.fn = fn self.args = args self.kwargs = kwargs # pylint: disable=broad-except def run(self): t0 = 0.0 t1 = 0.0 try: t0 = time.time() logger.debug("VC: starting execution of %s", self.fn) val = self.fn(*self.args, **self.kwargs) t1 = time.time() except Exception as ex: with self._view.lock: logger.exception("Error while calling fn=%s ex=%s", self.fn, str(ex)) self._view.value = None self._view.value_when = None self._view.getter_thread = None self._view.exception = ex else: with self._view.lock: self._view.latency = t1 - t0 self._view.value = val self._view.value_when = datetime.now() self._view.getter_thread = None self._view.exception = None logger.debug("VC: execution of %s finished in: %s", self.fn, t1 - t0) self.event.set() class RemoteViewCache(object): # Return stale data if STALE_PERIOD = 1.0 def __init__(self, timeout): self.getter_thread = None # Consider data within 1s old to be sufficiently fresh self.timeout = timeout self.event = threading.Event() self.value_when = None self.value = None self.latency = 0 self.exception = None self.lock = threading.Lock() def reset(self): with self.lock: self.value_when = None self.value = None def run(self, fn, args, kwargs): """ If data less than `stale_period` old is available, return it immediately. If an attempt to fetch data does not complete within `timeout`, then return the most recent data available, with a status to indicate that it is stale. Initialization does not count towards the timeout, so the first call on one of these objects during the process lifetime may be slower than subsequent calls. :return: 2-tuple of value status code, value """ with self.lock: now = datetime.now() if self.value_when and now - self.value_when < timedelta( seconds=self.STALE_PERIOD): return ViewCache.VALUE_OK, self.value if self.getter_thread is None: self.getter_thread = ViewCache.GetterThread(self, fn, args, kwargs) self.getter_thread.start() else: logger.debug("VC: getter_thread still alive for: %s", fn) ev = self.getter_thread.event success = ev.wait(timeout=self.timeout) with self.lock: if success: # We fetched the data within the timeout if self.exception: # execution raised an exception # pylint: disable=raising-bad-type raise self.exception return ViewCache.VALUE_OK, self.value if self.value_when is not None: # We have some data, but it doesn't meet freshness requirements return ViewCache.VALUE_STALE, self.value # We have no data, not even stale data raise ViewCacheNoDataException() def __init__(self, timeout=5): self.timeout = timeout self.cache_by_args = {} def __call__(self, fn): def wrapper(*args, **kwargs): rvc = self.cache_by_args.get(args, None) if not rvc: rvc = ViewCache.RemoteViewCache(self.timeout) self.cache_by_args[args] = rvc return rvc.run(fn, args, kwargs) wrapper.reset = self.reset return wrapper def reset(self): for _, rvc in self.cache_by_args.items(): rvc.reset() class NotificationQueue(threading.Thread): _ALL_TYPES_ = '__ALL__' _listeners = collections.defaultdict(set) _lock = threading.Lock() _cond = threading.Condition() _queue = collections.deque() _running = False _instance = None def __init__(self): super(NotificationQueue, self).__init__() @classmethod def start_queue(cls): with cls._lock: if cls._instance: # the queue thread is already running return cls._running = True cls._instance = NotificationQueue() logger.debug("starting notification queue") cls._instance.start() @classmethod def stop(cls): with cls._lock: if not cls._instance: # the queue thread was not started return instance = cls._instance cls._instance = None cls._running = False with cls._cond: cls._cond.notify() logger.debug("waiting for notification queue to finish") instance.join() logger.debug("notification queue stopped") @classmethod def _registered_handler(cls, func, n_types): for _, reg_func in cls._listeners[n_types]: if reg_func == func: return True return False @classmethod def register(cls, func, n_types=None, priority=1): """Registers function to listen for notifications If the second parameter `n_types` is omitted, the function in `func` parameter will be called for any type of notifications. Args: func (function): python function ex: def foo(val) n_types (str|list): the single type to listen, or a list of types priority (int): the priority level (1=max, +inf=min) """ with cls._lock: if not n_types: n_types = [cls._ALL_TYPES_] elif isinstance(n_types, str): n_types = [n_types] elif not isinstance(n_types, list): raise Exception("n_types param is neither a string nor a list") for ev_type in n_types: if not cls._registered_handler(func, ev_type): cls._listeners[ev_type].add((priority, func)) logger.debug("NQ: function %s was registered for events of" " type %s", func, ev_type) @classmethod def deregister(cls, func, n_types=None): """Removes the listener function from this notification queue If the second parameter `n_types` is omitted, the function is removed from all event types, otherwise the function is removed only for the specified event types. Args: func (function): python function n_types (str|list): the single event type, or a list of event types """ with cls._lock: if not n_types: n_types = list(cls._listeners.keys()) elif isinstance(n_types, str): n_types = [n_types] elif not isinstance(n_types, list): raise Exception("n_types param is neither a string nor a list") for ev_type in n_types: listeners = cls._listeners[ev_type] toRemove = None for pr, fn in listeners: if fn == func: toRemove = (pr, fn) break if toRemove: listeners.discard(toRemove) logger.debug("NQ: function %s was deregistered for events " "of type %s", func, ev_type) @classmethod def new_notification(cls, notify_type, notify_value): with cls._cond: cls._queue.append((notify_type, notify_value)) cls._cond.notify() @classmethod def _notify_listeners(cls, events): for ev in events: notify_type, notify_value = ev with cls._lock: listeners = list(cls._listeners[notify_type]) listeners.extend(cls._listeners[cls._ALL_TYPES_]) listeners.sort(key=lambda lis: lis[0]) for listener in listeners: listener[1](notify_value) def run(self): logger.debug("notification queue started") while self._running: private_buffer = [] logger.debug("NQ: processing queue: %s", len(self._queue)) try: while True: private_buffer.append(self._queue.popleft()) except IndexError: pass self._notify_listeners(private_buffer) with self._cond: while self._running and not self._queue: self._cond.wait() # flush remaining events logger.debug("NQ: flush remaining events: %s", len(self._queue)) self._notify_listeners(self._queue) self._queue.clear() logger.debug("notification queue finished") # pylint: disable=too-many-arguments, protected-access class TaskManager(object): FINISHED_TASK_SIZE = 10 FINISHED_TASK_TTL = 60.0 VALUE_DONE = "done" VALUE_EXECUTING = "executing" _executing_tasks = set() _finished_tasks = [] _lock = threading.Lock() _task_local_data = threading.local() @classmethod def init(cls): NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished') @classmethod def _handle_finished_task(cls, task): logger.info("TM: finished %s", task) with cls._lock: cls._executing_tasks.remove(task) cls._finished_tasks.append(task) @classmethod def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None, exception_handler=None): if not args: args = [] if not kwargs: kwargs = {} if not executor: executor = ThreadedExecutor() task = Task(name, metadata, fn, args, kwargs, executor, exception_handler) with cls._lock: if task in cls._executing_tasks: logger.debug("TM: task already executing: %s", task) for t in cls._executing_tasks: if t == task: return t logger.debug("TM: created %s", task) cls._executing_tasks.add(task) logger.info("TM: running %s", task) task._run() return task @classmethod def current_task(cls): """ Returns the current task object. This method should only be called from a threaded task operation code. """ return cls._task_local_data.task @classmethod def _cleanup_old_tasks(cls, task_list): """ The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL value. """ now = datetime.now() for idx, t in enumerate(task_list): if idx < cls.FINISHED_TASK_SIZE: continue if now - datetime.fromtimestamp(t[1].end_time) > \ timedelta(seconds=cls.FINISHED_TASK_TTL): del cls._finished_tasks[t[0]] @classmethod def list(cls, name_glob=None): executing_tasks = [] finished_tasks = [] with cls._lock: for task in cls._executing_tasks: if not name_glob or fnmatch.fnmatch(task.name, name_glob): executing_tasks.append(task) for idx, task in enumerate(cls._finished_tasks): if not name_glob or fnmatch.fnmatch(task.name, name_glob): finished_tasks.append((idx, task)) finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True) cls._cleanup_old_tasks(finished_tasks) executing_tasks.sort(key=lambda t: t.begin_time, reverse=True) return executing_tasks, [t[1] for t in finished_tasks] @classmethod def list_serializable(cls, ns_glob=None): ex_t, fn_t = cls.list(ns_glob) return [{ 'name': t.name, 'metadata': t.metadata, 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), 'progress': t.progress } for t in ex_t if t.begin_time], [{ 'name': t.name, 'metadata': t.metadata, 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()), 'duration': t.duration, 'progress': t.progress, 'success': not t.exception, 'ret_value': t.ret_value if not t.exception else None, 'exception': t.ret_value if t.exception and t.ret_value else ( {'detail': str(t.exception)} if t.exception else None) } for t in fn_t] # pylint: disable=protected-access class TaskExecutor(object): def __init__(self): self.task = None def init(self, task): self.task = task # pylint: disable=broad-except def start(self): logger.debug("EX: executing task %s", self.task) try: self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) except Exception as ex: logger.exception("Error while calling %s", self.task) self.finish(None, ex) def finish(self, ret_value, exception): if not exception: logger.debug("EX: successfully finished task: %s", self.task) else: logger.debug("EX: task finished with exception: %s", self.task) self.task._complete(ret_value, exception) # pylint: disable=protected-access class ThreadedExecutor(TaskExecutor): def __init__(self): super(ThreadedExecutor, self).__init__() self._thread = threading.Thread(target=self._run) def start(self): self._thread.start() # pylint: disable=broad-except def _run(self): TaskManager._task_local_data.task = self.task try: logger.debug("TEX: executing task %s", self.task) val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) except Exception as ex: logger.exception("Error while calling %s", self.task) self.finish(None, ex) else: self.finish(val, None) class Task(object): def __init__(self, name, metadata, fn, args, kwargs, executor, exception_handler=None): self.name = name self.metadata = metadata self.fn = fn self.fn_args = args self.fn_kwargs = kwargs self.executor = executor self.ex_handler = exception_handler self.running = False self.event = threading.Event() self.progress = None self.ret_value = None self.begin_time = None self.end_time = None self.duration = 0 self.exception = None self.lock = threading.Lock() def __hash__(self): return hash((self.name, tuple(sorted(self.metadata.items())))) def __eq__(self, other): return self.name == other.name and self.metadata == other.metadata def __str__(self): return "Task(ns={}, md={})" \ .format(self.name, self.metadata) def __repr__(self): return str(self) def _run(self): NotificationQueue.register(self._handle_task_finished, 'cd_task_finished', 100) with self.lock: assert not self.running self.executor.init(self) self.set_progress(0, in_lock=True) self.begin_time = time.time() self.running = True self.executor.start() def _complete(self, ret_value, exception=None): now = time.time() if exception and self.ex_handler: # pylint: disable=broad-except try: ret_value = self.ex_handler(exception, task=self) except Exception as ex: exception = ex with self.lock: assert self.running, "_complete cannot be called before _run" self.end_time = now self.ret_value = ret_value self.exception = exception self.duration = now - self.begin_time if not self.exception: self.set_progress(100, True) NotificationQueue.new_notification('cd_task_finished', self) logger.debug("TK: execution of %s finished in: %s s", self, self.duration) def _handle_task_finished(self, task): if self == task: NotificationQueue.deregister(self._handle_task_finished) self.event.set() def wait(self, timeout=None): with self.lock: assert self.running, "wait cannot be called before _run" ev = self.event success = ev.wait(timeout=timeout) with self.lock: if success: # the action executed within the timeout if self.exception: # pylint: disable=raising-bad-type # execution raised an exception raise self.exception return TaskManager.VALUE_DONE, self.ret_value # the action is still executing return TaskManager.VALUE_EXECUTING, None def inc_progress(self, delta, in_lock=False): if not isinstance(delta, int) or delta < 0: raise Exception("Progress delta value must be a positive integer") if not in_lock: self.lock.acquire() prog = self.progress + delta self.progress = prog if prog <= 100 else 100 if not in_lock: self.lock.release() def set_progress(self, percentage, in_lock=False): if not isinstance(percentage, int) or percentage < 0 or percentage > 100: raise Exception("Progress value must be in percentage " "(0 <= percentage <= 100)") if not in_lock: self.lock.acquire() self.progress = percentage if not in_lock: self.lock.release() def is_valid_ip_address(addr): """ Validate the given IPv4 or IPv6 address. >>> is_valid_ip_address('2001:0db8::1234') True >>> is_valid_ip_address('192.168.121.1') True >>> is_valid_ip_address('1:::1') False >>> is_valid_ip_address('8.1.0') False >>> is_valid_ip_address('260.1.0.1') False :param addr: :type addr: str :return: Returns ``True`` if the IP address is valid, otherwise ``False``. :rtype: bool """ return is_valid_ipv4_address(addr) or is_valid_ipv6_address(addr) def is_valid_ipv4_address(addr): """ Validate the given IPv4 address. >>> is_valid_ipv4_address('0.0.0.0') True >>> is_valid_ipv4_address('192.168.121.1') True >>> is_valid_ipv4_address('a.b.c.d') False >>> is_valid_ipv4_address('172.1.0.a') False >>> is_valid_ipv4_address('2001:0db8::1234') False >>> is_valid_ipv4_address(None) False >>> is_valid_ipv4_address(123456) False :param addr: :type addr: str :return: Returns ``True`` if the IPv4 address is valid, otherwise ``False``. :rtype: bool """ try: socket.inet_pton(socket.AF_INET, addr) return True except (socket.error, TypeError): return False def is_valid_ipv6_address(addr): """ Validate the given IPv6 address. >>> is_valid_ipv6_address('2001:0db8::1234') True >>> is_valid_ipv6_address('fe80::bc6c:66b0:5af8:f44') True >>> is_valid_ipv6_address('192.168.121.1') False >>> is_valid_ipv6_address('a:x::1') False >>> is_valid_ipv6_address('1200:0000:AB00:1234:O000:2552:7777:1313') False >>> is_valid_ipv6_address(None) False >>> is_valid_ipv6_address(123456) False :param addr: :type addr: str :return: Returns ``True`` if the IPv6 address is valid, otherwise ``False``. :rtype: bool """ try: socket.inet_pton(socket.AF_INET6, addr) return True except (socket.error, TypeError): return False def build_url(host, scheme=None, port=None): """ Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets automatically. >>> build_url('example.com', 'https', 443) 'https://example.com:443' >>> build_url(host='example.com', port=443) '//example.com:443' >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http') 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80' :param scheme: The scheme, e.g. http, https or ftp. :type scheme: str :param host: Consisting of either a registered name (including but not limited to a hostname) or an IP address. :type host: str :type port: int :rtype: str """ netloc = host if not is_valid_ipv6_address(host) else '[{}]'.format(host) if port: netloc += ':{}'.format(port) pr = urllib.parse.ParseResult( scheme=scheme if scheme else '', netloc=netloc, path='', params='', query='', fragment='') return pr.geturl() def prepare_url_prefix(url_prefix): """ return '' if no prefix, or '/prefix' without slash in the end. """ url_prefix = urljoin('/', url_prefix) return url_prefix.rstrip('/') def dict_contains_path(dct, keys): """ Tests whether the keys exist recursively in `dictionary`. :type dct: dict :type keys: list :rtype: bool """ if keys: if not isinstance(dct, dict): return False key = keys.pop(0) if key in dct: dct = dct[key] return dict_contains_path(dct, keys) return False return True if sys.version_info > (3, 0): wraps = functools.wraps _getargspec = inspect.getfullargspec else: def wraps(func): def decorator(wrapper): new_wrapper = functools.wraps(func)(wrapper) new_wrapper.__wrapped__ = func # set __wrapped__ even for Python 2 return new_wrapper return decorator _getargspec = inspect.getargspec def getargspec(func): try: while True: func = func.__wrapped__ except AttributeError: pass # pylint: disable=deprecated-method return _getargspec(func) def str_to_bool(val): """ Convert a string representation of truth to True or False. >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True) True >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False) False >>> str_to_bool('xyz') Traceback (most recent call last): ... ValueError: invalid truth value 'xyz' :param val: The value to convert. :type val: str|bool :rtype: bool """ if isinstance(val, bool): return val return bool(strtobool(val)) def get_request_body_params(request): """ Helper function to get parameters from the request body. :param request The CherryPy request object. :type request: cherrypy.Request :return: A dictionary containing the parameters. :rtype: dict """ params = {} if request.method not in request.methods_with_bodies: return params content_type = request.headers.get('Content-Type', '') if content_type in ['application/json', 'text/javascript']: if not hasattr(request, 'json'): raise cherrypy.HTTPError(400, 'Expected JSON body') if isinstance(request.json, str): params.update(json.loads(request.json)) else: params.update(request.json) return params def find_object_in_list(key, value, iterable): """ Get the first occurrence of an object within a list with the specified key/value. >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}]) {'name': 'bar'} >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None True >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None True >>> find_object_in_list('foo', 'bar', []) is None True :param key: The name of the key. :param value: The value to search for. :param iterable: The list to process. :return: Returns the found object or None. """ for obj in iterable: if key in obj and obj[key] == value: return obj return None