diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/dashboard/tools.py | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/dashboard/tools.py')
-rw-r--r-- | src/pybind/mgr/dashboard/tools.py | 841 |
1 files changed, 841 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tools.py b/src/pybind/mgr/dashboard/tools.py new file mode 100644 index 000000000..1ae194219 --- /dev/null +++ b/src/pybind/mgr/dashboard/tools.py @@ -0,0 +1,841 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import collections +import fnmatch +import inspect +import json +import logging +import threading +import time +import urllib +from datetime import datetime, timedelta +from distutils.util import strtobool + +import cherrypy +from mgr_util import build_url + +from . import mgr +from .exceptions import ViewCacheNoDataException +from .services.auth import JwtManager +from .settings import Settings + +try: + from typing import Any, AnyStr, Callable, DefaultDict, Deque, Dict, List, \ + Optional, Set, Tuple, Union +except ImportError: + pass # For typing only + + +class RequestLoggingTool(cherrypy.Tool): + def __init__(self): + cherrypy.Tool.__init__(self, 'before_handler', self.request_begin, + priority=10) + self.logger = logging.getLogger('request') + + def _setup(self): + cherrypy.Tool._setup(self) + cherrypy.request.hooks.attach('on_end_request', self.request_end, + priority=5) + cherrypy.request.hooks.attach('after_error_response', self.request_error, + priority=5) + + def request_begin(self): + req = cherrypy.request + user = JwtManager.get_username() + # Log the request. + self.logger.debug('[%s:%s] [%s] [%s] %s', req.remote.ip, req.remote.port, + req.method, user, req.path_info) + # Audit the request. + if Settings.AUDIT_API_ENABLED and req.method not in ['GET']: + url = build_url(req.remote.ip, scheme=req.scheme, + port=req.remote.port) + msg = '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \ + 'user=\'{}\''.format(url, req.path_info, req.method, user) + if Settings.AUDIT_API_LOG_PAYLOAD: + params = dict(req.params or {}, **get_request_body_params(req)) + # Hide sensitive data like passwords, secret keys, ... + # Extend the list of patterns to search for if necessary. + # Currently parameters like this are processed: + # - secret_key + # - user_password + # - new_passwd_to_login + keys = [] + for key in ['password', 'passwd', 'secret']: + keys.extend([x for x in params.keys() if key in x]) + for key in keys: + params[key] = '***' + msg = '{} params=\'{}\''.format(msg, json.dumps(params)) + mgr.cluster_log('audit', mgr.ClusterLogPrio.INFO, msg) + + def request_error(self): + self._request_log(self.logger.error) + self.logger.error(cherrypy.response.body) + + def request_end(self): + status = cherrypy.response.status[:3] + if status in ["401", "403"]: + # log unauthorized accesses + self._request_log(self.logger.warning) + else: + self._request_log(self.logger.info) + + def _format_bytes(self, num): + units = ['B', 'K', 'M', 'G'] + + if isinstance(num, str): + try: + num = int(num) + except ValueError: + return "n/a" + + format_str = "{:.0f}{}" + for i, unit in enumerate(units): + div = 2**(10*i) + if num < 2**(10*(i+1)): + if num % div == 0: + format_str = "{}{}" + else: + div = float(div) + format_str = "{:.1f}{}" + return format_str.format(num/div, unit[0]) + + # content-length bigger than 1T!! return value in bytes + return "{}B".format(num) + + def _request_log(self, logger_fn): + req = cherrypy.request + res = cherrypy.response + lat = time.time() - res.time + user = JwtManager.get_username() + status = res.status[:3] if isinstance(res.status, str) else res.status + if 'Content-Length' in res.headers: + length = self._format_bytes(res.headers['Content-Length']) + else: + length = self._format_bytes(0) + if user: + logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, + req.remote.port, req.method, status, + "{0:.3f}s".format(lat), user, length, req.path_info) + else: + logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, + req.remote.port, req.method, status, + "{0:.3f}s".format(lat), length, getattr(req, 'unique_id', '-'), req.path_info) + + +# pylint: disable=too-many-instance-attributes +class ViewCache(object): + VALUE_OK = 0 + VALUE_STALE = 1 + VALUE_NONE = 2 + + class GetterThread(threading.Thread): + def __init__(self, view, fn, args, kwargs): + super(ViewCache.GetterThread, self).__init__() + self._view = view + self.event = threading.Event() + self.fn = fn + self.args = args + self.kwargs = kwargs + + # pylint: disable=broad-except + def run(self): + t0 = 0.0 + t1 = 0.0 + try: + t0 = time.time() + self._view.logger.debug("starting execution of %s", self.fn) + val = self.fn(*self.args, **self.kwargs) + t1 = time.time() + except Exception as ex: + with self._view.lock: + self._view.logger.exception("Error while calling fn=%s ex=%s", self.fn, + str(ex)) + self._view.value = None + self._view.value_when = None + self._view.getter_thread = None + self._view.exception = ex + else: + with self._view.lock: + self._view.latency = t1 - t0 + self._view.value = val + self._view.value_when = datetime.now() + self._view.getter_thread = None + self._view.exception = None + + self._view.logger.debug("execution of %s finished in: %s", self.fn, + t1 - t0) + self.event.set() + + class RemoteViewCache(object): + # Return stale data if + STALE_PERIOD = 1.0 + + def __init__(self, timeout): + self.getter_thread = None + # Consider data within 1s old to be sufficiently fresh + self.timeout = timeout + self.event = threading.Event() + self.value_when = None + self.value = None + self.latency = 0 + self.exception = None + self.lock = threading.Lock() + self.logger = logging.getLogger('viewcache') + + def reset(self): + with self.lock: + self.value_when = None + self.value = None + + def run(self, fn, args, kwargs): + """ + If data less than `stale_period` old is available, return it + immediately. + If an attempt to fetch data does not complete within `timeout`, then + return the most recent data available, with a status to indicate that + it is stale. + + Initialization does not count towards the timeout, so the first call + on one of these objects during the process lifetime may be slower + than subsequent calls. + + :return: 2-tuple of value status code, value + """ + with self.lock: + now = datetime.now() + if self.value_when and now - self.value_when < timedelta( + seconds=self.STALE_PERIOD): + return ViewCache.VALUE_OK, self.value + + if self.getter_thread is None: + self.getter_thread = ViewCache.GetterThread(self, fn, args, + kwargs) + self.getter_thread.start() + else: + self.logger.debug("getter_thread still alive for: %s", fn) + + ev = self.getter_thread.event + + success = ev.wait(timeout=self.timeout) + + with self.lock: + if success: + # We fetched the data within the timeout + if self.exception: + # execution raised an exception + # pylint: disable=raising-bad-type + raise self.exception + return ViewCache.VALUE_OK, self.value + if self.value_when is not None: + # We have some data, but it doesn't meet freshness requirements + return ViewCache.VALUE_STALE, self.value + # We have no data, not even stale data + raise ViewCacheNoDataException() + + def __init__(self, timeout=5): + self.timeout = timeout + self.cache_by_args = {} + + def __call__(self, fn): + def wrapper(*args, **kwargs): + rvc = self.cache_by_args.get(args, None) + if not rvc: + rvc = ViewCache.RemoteViewCache(self.timeout) + self.cache_by_args[args] = rvc + return rvc.run(fn, args, kwargs) + wrapper.reset = self.reset # type: ignore + return wrapper + + def reset(self): + for _, rvc in self.cache_by_args.items(): + rvc.reset() + + +class NotificationQueue(threading.Thread): + _ALL_TYPES_ = '__ALL__' + _listeners = collections.defaultdict(set) # type: DefaultDict[str, Set[Tuple[int, Callable]]] + _lock = threading.Lock() + _cond = threading.Condition() + _queue = collections.deque() # type: Deque[Tuple[str, Any]] + _running = False + _instance = None + + def __init__(self): + super(NotificationQueue, self).__init__() + + @classmethod + def start_queue(cls): + with cls._lock: + if cls._instance: + # the queue thread is already running + return + cls._running = True + cls._instance = NotificationQueue() + cls.logger = logging.getLogger('notification_queue') # type: ignore + cls.logger.debug("starting notification queue") # type: ignore + cls._instance.start() + + @classmethod + def stop(cls): + with cls._lock: + if not cls._instance: + # the queue thread was not started + return + instance = cls._instance + cls._instance = None + cls._running = False + with cls._cond: + cls._cond.notify() + cls.logger.debug("waiting for notification queue to finish") # type: ignore + instance.join() + cls.logger.debug("notification queue stopped") # type: ignore + + @classmethod + def _registered_handler(cls, func, n_types): + for _, reg_func in cls._listeners[n_types]: + if reg_func == func: + return True + return False + + @classmethod + def register(cls, func, n_types=None, priority=1): + """Registers function to listen for notifications + + If the second parameter `n_types` is omitted, the function in `func` + parameter will be called for any type of notifications. + + Args: + func (function): python function ex: def foo(val) + n_types (str|list): the single type to listen, or a list of types + priority (int): the priority level (1=max, +inf=min) + """ + with cls._lock: + if not n_types: + n_types = [cls._ALL_TYPES_] + elif isinstance(n_types, str): + n_types = [n_types] + elif not isinstance(n_types, list): + raise Exception("n_types param is neither a string nor a list") + for ev_type in n_types: + if not cls._registered_handler(func, ev_type): + cls._listeners[ev_type].add((priority, func)) + cls.logger.debug( # type: ignore + "function %s was registered for events of type %s", + func, ev_type + ) + + @classmethod + def deregister(cls, func, n_types=None): + # type: (Callable, Union[str, list, None]) -> None + """Removes the listener function from this notification queue + + If the second parameter `n_types` is omitted, the function is removed + from all event types, otherwise the function is removed only for the + specified event types. + + Args: + func (function): python function + n_types (str|list): the single event type, or a list of event types + """ + with cls._lock: + if not n_types: + n_types = list(cls._listeners.keys()) + elif isinstance(n_types, str): + n_types = [n_types] + elif not isinstance(n_types, list): + raise Exception("n_types param is neither a string nor a list") + for ev_type in n_types: + listeners = cls._listeners[ev_type] + to_remove = None + for pr, fn in listeners: + if fn == func: + to_remove = (pr, fn) + break + if to_remove: + listeners.discard(to_remove) + cls.logger.debug( # type: ignore + "function %s was deregistered for events of type %s", + func, ev_type + ) + + @classmethod + def new_notification(cls, notify_type, notify_value): + # type: (str, Any) -> None + with cls._cond: + cls._queue.append((notify_type, notify_value)) + cls._cond.notify() + + @classmethod + def _notify_listeners(cls, events): + for ev in events: + notify_type, notify_value = ev + with cls._lock: + listeners = list(cls._listeners[notify_type]) + listeners.extend(cls._listeners[cls._ALL_TYPES_]) + listeners.sort(key=lambda lis: lis[0]) + for listener in listeners: + listener[1](notify_value) + + def run(self): + self.logger.debug("notification queue started") # type: ignore + while self._running: + private_buffer = [] + self.logger.debug("processing queue: %s", len(self._queue)) # type: ignore + try: + while True: + private_buffer.append(self._queue.popleft()) + except IndexError: + pass + self._notify_listeners(private_buffer) + with self._cond: + while self._running and not self._queue: + self._cond.wait() + # flush remaining events + self.logger.debug("flush remaining events: %s", len(self._queue)) # type: ignore + self._notify_listeners(self._queue) + self._queue.clear() + self.logger.debug("notification queue finished") # type: ignore + + +# pylint: disable=too-many-arguments, protected-access +class TaskManager(object): + FINISHED_TASK_SIZE = 10 + FINISHED_TASK_TTL = 60.0 + + VALUE_DONE = "done" + VALUE_EXECUTING = "executing" + + _executing_tasks = set() # type: Set[Task] + _finished_tasks = [] # type: List[Task] + _lock = threading.Lock() + + _task_local_data = threading.local() + + @classmethod + def init(cls): + cls.logger = logging.getLogger('taskmgr') # type: ignore + NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished') + + @classmethod + def _handle_finished_task(cls, task): + cls.logger.info("finished %s", task) # type: ignore + with cls._lock: + cls._executing_tasks.remove(task) + cls._finished_tasks.append(task) + + @classmethod + def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None, + exception_handler=None): + if not args: + args = [] + if not kwargs: + kwargs = {} + if not executor: + executor = ThreadedExecutor() + task = Task(name, metadata, fn, args, kwargs, executor, + exception_handler) + with cls._lock: + if task in cls._executing_tasks: + cls.logger.debug("task already executing: %s", task) # type: ignore + for t in cls._executing_tasks: + if t == task: + return t + cls.logger.debug("created %s", task) # type: ignore + cls._executing_tasks.add(task) + cls.logger.info("running %s", task) # type: ignore + task._run() + return task + + @classmethod + def current_task(cls): + """ + Returns the current task object. + This method should only be called from a threaded task operation code. + """ + return cls._task_local_data.task + + @classmethod + def _cleanup_old_tasks(cls, task_list): + """ + The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent + finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL + value. + """ + now = datetime.now() + for idx, t in enumerate(task_list): + if idx < cls.FINISHED_TASK_SIZE: + continue + if now - datetime.fromtimestamp(t[1].end_time) > \ + timedelta(seconds=cls.FINISHED_TASK_TTL): + del cls._finished_tasks[t[0]] + + @classmethod + def list(cls, name_glob=None): + executing_tasks = [] + finished_tasks = [] + with cls._lock: + for task in cls._executing_tasks: + if not name_glob or fnmatch.fnmatch(task.name, name_glob): + executing_tasks.append(task) + for idx, task in enumerate(cls._finished_tasks): + if not name_glob or fnmatch.fnmatch(task.name, name_glob): + finished_tasks.append((idx, task)) + finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True) + cls._cleanup_old_tasks(finished_tasks) + executing_tasks.sort(key=lambda t: t.begin_time, reverse=True) + return executing_tasks, [t[1] for t in finished_tasks] + + @classmethod + def list_serializable(cls, ns_glob=None): + ex_t, fn_t = cls.list(ns_glob) + return [{ + 'name': t.name, + 'metadata': t.metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), + 'progress': t.progress + } for t in ex_t if t.begin_time], [{ + 'name': t.name, + 'metadata': t.metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), + 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()), + 'duration': t.duration, + 'progress': t.progress, + 'success': not t.exception, + 'ret_value': t.ret_value if not t.exception else None, + 'exception': t.ret_value if t.exception and t.ret_value else ( + {'detail': str(t.exception)} if t.exception else None) + } for t in fn_t] + + +# pylint: disable=protected-access +class TaskExecutor(object): + def __init__(self): + self.logger = logging.getLogger('taskexec') + self.task = None + + def init(self, task): + self.task = task + + # pylint: disable=broad-except + def start(self): + self.logger.debug("executing task %s", self.task) + try: + self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore + except Exception as ex: + self.logger.exception("Error while calling %s", self.task) + self.finish(None, ex) + + def finish(self, ret_value, exception): + if not exception: + self.logger.debug("successfully finished task: %s", self.task) + else: + self.logger.debug("task finished with exception: %s", self.task) + self.task._complete(ret_value, exception) # type: ignore + + +# pylint: disable=protected-access +class ThreadedExecutor(TaskExecutor): + def __init__(self): + super(ThreadedExecutor, self).__init__() + self._thread = threading.Thread(target=self._run) + + def start(self): + self._thread.start() + + # pylint: disable=broad-except + def _run(self): + TaskManager._task_local_data.task = self.task + try: + self.logger.debug("executing task %s", self.task) + val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore + except Exception as ex: + self.logger.exception("Error while calling %s", self.task) + self.finish(None, ex) + else: + self.finish(val, None) + + +class Task(object): + def __init__(self, name, metadata, fn, args, kwargs, executor, + exception_handler=None): + self.name = name + self.metadata = metadata + self.fn = fn + self.fn_args = args + self.fn_kwargs = kwargs + self.executor = executor + self.ex_handler = exception_handler + self.running = False + self.event = threading.Event() + self.progress = None + self.ret_value = None + self._begin_time: Optional[float] = None + self._end_time: Optional[float] = None + self.duration = 0.0 + self.exception = None + self.logger = logging.getLogger('task') + self.lock = threading.Lock() + + def __hash__(self): + return hash((self.name, tuple(sorted(self.metadata.items())))) + + def __eq__(self, other): + return self.name == other.name and self.metadata == other.metadata + + def __str__(self): + return "Task(ns={}, md={})" \ + .format(self.name, self.metadata) + + def __repr__(self): + return str(self) + + def _run(self): + NotificationQueue.register(self._handle_task_finished, 'cd_task_finished', 100) + with self.lock: + assert not self.running + self.executor.init(self) + self.set_progress(0, in_lock=True) + self._begin_time = time.time() + self.running = True + self.executor.start() + + def _complete(self, ret_value, exception=None): + now = time.time() + if exception and self.ex_handler: + # pylint: disable=broad-except + try: + ret_value = self.ex_handler(exception, task=self) + except Exception as ex: + exception = ex + with self.lock: + assert self.running, "_complete cannot be called before _run" + self._end_time = now + self.ret_value = ret_value + self.exception = exception + self.duration = now - self.begin_time + if not self.exception: + self.set_progress(100, True) + NotificationQueue.new_notification('cd_task_finished', self) + self.logger.debug("execution of %s finished in: %s s", self, + self.duration) + + def _handle_task_finished(self, task): + if self == task: + NotificationQueue.deregister(self._handle_task_finished) + self.event.set() + + def wait(self, timeout=None): + with self.lock: + assert self.running, "wait cannot be called before _run" + ev = self.event + + success = ev.wait(timeout=timeout) + with self.lock: + if success: + # the action executed within the timeout + if self.exception: + # pylint: disable=raising-bad-type + # execution raised an exception + raise self.exception + return TaskManager.VALUE_DONE, self.ret_value + # the action is still executing + return TaskManager.VALUE_EXECUTING, None + + def inc_progress(self, delta, in_lock=False): + if not isinstance(delta, int) or delta < 0: + raise Exception("Progress delta value must be a positive integer") + if not in_lock: + self.lock.acquire() + prog = self.progress + delta # type: ignore + self.progress = prog if prog <= 100 else 100 + if not in_lock: + self.lock.release() + + def set_progress(self, percentage, in_lock=False): + if not isinstance(percentage, int) or percentage < 0 or percentage > 100: + raise Exception("Progress value must be in percentage " + "(0 <= percentage <= 100)") + if not in_lock: + self.lock.acquire() + self.progress = percentage + if not in_lock: + self.lock.release() + + @property + def end_time(self) -> float: + assert self._end_time is not None + return self._end_time + + @property + def begin_time(self) -> float: + assert self._begin_time is not None + return self._begin_time + + +def prepare_url_prefix(url_prefix): + """ + return '' if no prefix, or '/prefix' without slash in the end. + """ + url_prefix = urllib.parse.urljoin('/', url_prefix) + return url_prefix.rstrip('/') + + +def dict_contains_path(dct, keys): + """ + Tests whether the keys exist recursively in `dictionary`. + + :type dct: dict + :type keys: list + :rtype: bool + """ + if keys: + if not isinstance(dct, dict): + return False + key = keys.pop(0) + if key in dct: + dct = dct[key] + return dict_contains_path(dct, keys) + return False + return True + + +def dict_get(obj, path, default=None): + """ + Get the value at any depth of a nested object based on the path + described by `path`. If path doesn't exist, `default` is returned. + """ + current = obj + for part in path.split('.'): + if not isinstance(current, dict): + return default + if part not in current.keys(): + return default + current = current.get(part, {}) + return current + + +def getargspec(func): + try: + while True: + func = func.__wrapped__ + except AttributeError: + pass + # pylint: disable=deprecated-method + return inspect.getfullargspec(func) + + +def str_to_bool(val): + """ + Convert a string representation of truth to True or False. + + >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True) + True + + >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False) + False + + >>> str_to_bool('xyz') + Traceback (most recent call last): + ... + ValueError: invalid truth value 'xyz' + + :param val: The value to convert. + :type val: str|bool + :rtype: bool + """ + if isinstance(val, bool): + return val + return bool(strtobool(val)) + + +def json_str_to_object(value): # type: (AnyStr) -> Any + """ + It converts a JSON valid string representation to object. + + >>> result = json_str_to_object('{"a": 1}') + >>> result == {'a': 1} + True + """ + if value == '': + return value + + try: + # json.loads accepts binary input from version >=3.6 + value = value.decode('utf-8') # type: ignore + except AttributeError: + pass + + return json.loads(value) + + +def partial_dict(orig, keys): # type: (Dict, List[str]) -> Dict + """ + It returns Dict containing only the selected keys of original Dict. + + >>> partial_dict({'a': 1, 'b': 2}, ['b']) + {'b': 2} + """ + return {k: orig[k] for k in keys} + + +def get_request_body_params(request): + """ + Helper function to get parameters from the request body. + :param request The CherryPy request object. + :type request: cherrypy.Request + :return: A dictionary containing the parameters. + :rtype: dict + """ + params = {} # type: dict + if request.method not in request.methods_with_bodies: + return params + + content_type = request.headers.get('Content-Type', '') + if content_type in ['application/json', 'text/javascript']: + if not hasattr(request, 'json'): + raise cherrypy.HTTPError(400, 'Expected JSON body') + if isinstance(request.json, str): + params.update(json.loads(request.json)) + else: + params.update(request.json) + + return params + + +def find_object_in_list(key, value, iterable): + """ + Get the first occurrence of an object within a list with + the specified key/value. + + >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}]) + {'name': 'bar'} + + >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None + True + + >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None + True + + >>> find_object_in_list('foo', 'bar', []) is None + True + + :param key: The name of the key. + :param value: The value to search for. + :param iterable: The list to process. + :return: Returns the found object or None. + """ + for obj in iterable: + if key in obj and obj[key] == value: + return obj + return None + + +def merge_list_of_dicts_by_key(target_list: list, source_list: list, key: str): + target_list = {d[key]: d for d in target_list} + for sdict in source_list: + if bool(sdict): + if sdict[key] in target_list: + target_list[sdict[key]].update(sdict) + target_list = [value for value in target_list.values()] + return target_list |