summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tools.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/dashboard/tools.py')
-rw-r--r--src/pybind/mgr/dashboard/tools.py841
1 files changed, 841 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tools.py b/src/pybind/mgr/dashboard/tools.py
new file mode 100644
index 000000000..1ae194219
--- /dev/null
+++ b/src/pybind/mgr/dashboard/tools.py
@@ -0,0 +1,841 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import collections
+import fnmatch
+import inspect
+import json
+import logging
+import threading
+import time
+import urllib
+from datetime import datetime, timedelta
+from distutils.util import strtobool
+
+import cherrypy
+from mgr_util import build_url
+
+from . import mgr
+from .exceptions import ViewCacheNoDataException
+from .services.auth import JwtManager
+from .settings import Settings
+
+try:
+ from typing import Any, AnyStr, Callable, DefaultDict, Deque, Dict, List, \
+ Optional, Set, Tuple, Union
+except ImportError:
+ pass # For typing only
+
+
+class RequestLoggingTool(cherrypy.Tool):
+ def __init__(self):
+ cherrypy.Tool.__init__(self, 'before_handler', self.request_begin,
+ priority=10)
+ self.logger = logging.getLogger('request')
+
+ def _setup(self):
+ cherrypy.Tool._setup(self)
+ cherrypy.request.hooks.attach('on_end_request', self.request_end,
+ priority=5)
+ cherrypy.request.hooks.attach('after_error_response', self.request_error,
+ priority=5)
+
+ def request_begin(self):
+ req = cherrypy.request
+ user = JwtManager.get_username()
+ # Log the request.
+ self.logger.debug('[%s:%s] [%s] [%s] %s', req.remote.ip, req.remote.port,
+ req.method, user, req.path_info)
+ # Audit the request.
+ if Settings.AUDIT_API_ENABLED and req.method not in ['GET']:
+ url = build_url(req.remote.ip, scheme=req.scheme,
+ port=req.remote.port)
+ msg = '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \
+ 'user=\'{}\''.format(url, req.path_info, req.method, user)
+ if Settings.AUDIT_API_LOG_PAYLOAD:
+ params = dict(req.params or {}, **get_request_body_params(req))
+ # Hide sensitive data like passwords, secret keys, ...
+ # Extend the list of patterns to search for if necessary.
+ # Currently parameters like this are processed:
+ # - secret_key
+ # - user_password
+ # - new_passwd_to_login
+ keys = []
+ for key in ['password', 'passwd', 'secret']:
+ keys.extend([x for x in params.keys() if key in x])
+ for key in keys:
+ params[key] = '***'
+ msg = '{} params=\'{}\''.format(msg, json.dumps(params))
+ mgr.cluster_log('audit', mgr.ClusterLogPrio.INFO, msg)
+
+ def request_error(self):
+ self._request_log(self.logger.error)
+ self.logger.error(cherrypy.response.body)
+
+ def request_end(self):
+ status = cherrypy.response.status[:3]
+ if status in ["401", "403"]:
+ # log unauthorized accesses
+ self._request_log(self.logger.warning)
+ else:
+ self._request_log(self.logger.info)
+
+ def _format_bytes(self, num):
+ units = ['B', 'K', 'M', 'G']
+
+ if isinstance(num, str):
+ try:
+ num = int(num)
+ except ValueError:
+ return "n/a"
+
+ format_str = "{:.0f}{}"
+ for i, unit in enumerate(units):
+ div = 2**(10*i)
+ if num < 2**(10*(i+1)):
+ if num % div == 0:
+ format_str = "{}{}"
+ else:
+ div = float(div)
+ format_str = "{:.1f}{}"
+ return format_str.format(num/div, unit[0])
+
+ # content-length bigger than 1T!! return value in bytes
+ return "{}B".format(num)
+
+ def _request_log(self, logger_fn):
+ req = cherrypy.request
+ res = cherrypy.response
+ lat = time.time() - res.time
+ user = JwtManager.get_username()
+ status = res.status[:3] if isinstance(res.status, str) else res.status
+ if 'Content-Length' in res.headers:
+ length = self._format_bytes(res.headers['Content-Length'])
+ else:
+ length = self._format_bytes(0)
+ if user:
+ logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip,
+ req.remote.port, req.method, status,
+ "{0:.3f}s".format(lat), user, length, req.path_info)
+ else:
+ logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip,
+ req.remote.port, req.method, status,
+ "{0:.3f}s".format(lat), length, getattr(req, 'unique_id', '-'), req.path_info)
+
+
+# pylint: disable=too-many-instance-attributes
+class ViewCache(object):
+ VALUE_OK = 0
+ VALUE_STALE = 1
+ VALUE_NONE = 2
+
+ class GetterThread(threading.Thread):
+ def __init__(self, view, fn, args, kwargs):
+ super(ViewCache.GetterThread, self).__init__()
+ self._view = view
+ self.event = threading.Event()
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ # pylint: disable=broad-except
+ def run(self):
+ t0 = 0.0
+ t1 = 0.0
+ try:
+ t0 = time.time()
+ self._view.logger.debug("starting execution of %s", self.fn)
+ val = self.fn(*self.args, **self.kwargs)
+ t1 = time.time()
+ except Exception as ex:
+ with self._view.lock:
+ self._view.logger.exception("Error while calling fn=%s ex=%s", self.fn,
+ str(ex))
+ self._view.value = None
+ self._view.value_when = None
+ self._view.getter_thread = None
+ self._view.exception = ex
+ else:
+ with self._view.lock:
+ self._view.latency = t1 - t0
+ self._view.value = val
+ self._view.value_when = datetime.now()
+ self._view.getter_thread = None
+ self._view.exception = None
+
+ self._view.logger.debug("execution of %s finished in: %s", self.fn,
+ t1 - t0)
+ self.event.set()
+
+ class RemoteViewCache(object):
+ # Return stale data if
+ STALE_PERIOD = 1.0
+
+ def __init__(self, timeout):
+ self.getter_thread = None
+ # Consider data within 1s old to be sufficiently fresh
+ self.timeout = timeout
+ self.event = threading.Event()
+ self.value_when = None
+ self.value = None
+ self.latency = 0
+ self.exception = None
+ self.lock = threading.Lock()
+ self.logger = logging.getLogger('viewcache')
+
+ def reset(self):
+ with self.lock:
+ self.value_when = None
+ self.value = None
+
+ def run(self, fn, args, kwargs):
+ """
+ If data less than `stale_period` old is available, return it
+ immediately.
+ If an attempt to fetch data does not complete within `timeout`, then
+ return the most recent data available, with a status to indicate that
+ it is stale.
+
+ Initialization does not count towards the timeout, so the first call
+ on one of these objects during the process lifetime may be slower
+ than subsequent calls.
+
+ :return: 2-tuple of value status code, value
+ """
+ with self.lock:
+ now = datetime.now()
+ if self.value_when and now - self.value_when < timedelta(
+ seconds=self.STALE_PERIOD):
+ return ViewCache.VALUE_OK, self.value
+
+ if self.getter_thread is None:
+ self.getter_thread = ViewCache.GetterThread(self, fn, args,
+ kwargs)
+ self.getter_thread.start()
+ else:
+ self.logger.debug("getter_thread still alive for: %s", fn)
+
+ ev = self.getter_thread.event
+
+ success = ev.wait(timeout=self.timeout)
+
+ with self.lock:
+ if success:
+ # We fetched the data within the timeout
+ if self.exception:
+ # execution raised an exception
+ # pylint: disable=raising-bad-type
+ raise self.exception
+ return ViewCache.VALUE_OK, self.value
+ if self.value_when is not None:
+ # We have some data, but it doesn't meet freshness requirements
+ return ViewCache.VALUE_STALE, self.value
+ # We have no data, not even stale data
+ raise ViewCacheNoDataException()
+
+ def __init__(self, timeout=5):
+ self.timeout = timeout
+ self.cache_by_args = {}
+
+ def __call__(self, fn):
+ def wrapper(*args, **kwargs):
+ rvc = self.cache_by_args.get(args, None)
+ if not rvc:
+ rvc = ViewCache.RemoteViewCache(self.timeout)
+ self.cache_by_args[args] = rvc
+ return rvc.run(fn, args, kwargs)
+ wrapper.reset = self.reset # type: ignore
+ return wrapper
+
+ def reset(self):
+ for _, rvc in self.cache_by_args.items():
+ rvc.reset()
+
+
+class NotificationQueue(threading.Thread):
+ _ALL_TYPES_ = '__ALL__'
+ _listeners = collections.defaultdict(set) # type: DefaultDict[str, Set[Tuple[int, Callable]]]
+ _lock = threading.Lock()
+ _cond = threading.Condition()
+ _queue = collections.deque() # type: Deque[Tuple[str, Any]]
+ _running = False
+ _instance = None
+
+ def __init__(self):
+ super(NotificationQueue, self).__init__()
+
+ @classmethod
+ def start_queue(cls):
+ with cls._lock:
+ if cls._instance:
+ # the queue thread is already running
+ return
+ cls._running = True
+ cls._instance = NotificationQueue()
+ cls.logger = logging.getLogger('notification_queue') # type: ignore
+ cls.logger.debug("starting notification queue") # type: ignore
+ cls._instance.start()
+
+ @classmethod
+ def stop(cls):
+ with cls._lock:
+ if not cls._instance:
+ # the queue thread was not started
+ return
+ instance = cls._instance
+ cls._instance = None
+ cls._running = False
+ with cls._cond:
+ cls._cond.notify()
+ cls.logger.debug("waiting for notification queue to finish") # type: ignore
+ instance.join()
+ cls.logger.debug("notification queue stopped") # type: ignore
+
+ @classmethod
+ def _registered_handler(cls, func, n_types):
+ for _, reg_func in cls._listeners[n_types]:
+ if reg_func == func:
+ return True
+ return False
+
+ @classmethod
+ def register(cls, func, n_types=None, priority=1):
+ """Registers function to listen for notifications
+
+ If the second parameter `n_types` is omitted, the function in `func`
+ parameter will be called for any type of notifications.
+
+ Args:
+ func (function): python function ex: def foo(val)
+ n_types (str|list): the single type to listen, or a list of types
+ priority (int): the priority level (1=max, +inf=min)
+ """
+ with cls._lock:
+ if not n_types:
+ n_types = [cls._ALL_TYPES_]
+ elif isinstance(n_types, str):
+ n_types = [n_types]
+ elif not isinstance(n_types, list):
+ raise Exception("n_types param is neither a string nor a list")
+ for ev_type in n_types:
+ if not cls._registered_handler(func, ev_type):
+ cls._listeners[ev_type].add((priority, func))
+ cls.logger.debug( # type: ignore
+ "function %s was registered for events of type %s",
+ func, ev_type
+ )
+
+ @classmethod
+ def deregister(cls, func, n_types=None):
+ # type: (Callable, Union[str, list, None]) -> None
+ """Removes the listener function from this notification queue
+
+ If the second parameter `n_types` is omitted, the function is removed
+ from all event types, otherwise the function is removed only for the
+ specified event types.
+
+ Args:
+ func (function): python function
+ n_types (str|list): the single event type, or a list of event types
+ """
+ with cls._lock:
+ if not n_types:
+ n_types = list(cls._listeners.keys())
+ elif isinstance(n_types, str):
+ n_types = [n_types]
+ elif not isinstance(n_types, list):
+ raise Exception("n_types param is neither a string nor a list")
+ for ev_type in n_types:
+ listeners = cls._listeners[ev_type]
+ to_remove = None
+ for pr, fn in listeners:
+ if fn == func:
+ to_remove = (pr, fn)
+ break
+ if to_remove:
+ listeners.discard(to_remove)
+ cls.logger.debug( # type: ignore
+ "function %s was deregistered for events of type %s",
+ func, ev_type
+ )
+
+ @classmethod
+ def new_notification(cls, notify_type, notify_value):
+ # type: (str, Any) -> None
+ with cls._cond:
+ cls._queue.append((notify_type, notify_value))
+ cls._cond.notify()
+
+ @classmethod
+ def _notify_listeners(cls, events):
+ for ev in events:
+ notify_type, notify_value = ev
+ with cls._lock:
+ listeners = list(cls._listeners[notify_type])
+ listeners.extend(cls._listeners[cls._ALL_TYPES_])
+ listeners.sort(key=lambda lis: lis[0])
+ for listener in listeners:
+ listener[1](notify_value)
+
+ def run(self):
+ self.logger.debug("notification queue started") # type: ignore
+ while self._running:
+ private_buffer = []
+ self.logger.debug("processing queue: %s", len(self._queue)) # type: ignore
+ try:
+ while True:
+ private_buffer.append(self._queue.popleft())
+ except IndexError:
+ pass
+ self._notify_listeners(private_buffer)
+ with self._cond:
+ while self._running and not self._queue:
+ self._cond.wait()
+ # flush remaining events
+ self.logger.debug("flush remaining events: %s", len(self._queue)) # type: ignore
+ self._notify_listeners(self._queue)
+ self._queue.clear()
+ self.logger.debug("notification queue finished") # type: ignore
+
+
+# pylint: disable=too-many-arguments, protected-access
+class TaskManager(object):
+ FINISHED_TASK_SIZE = 10
+ FINISHED_TASK_TTL = 60.0
+
+ VALUE_DONE = "done"
+ VALUE_EXECUTING = "executing"
+
+ _executing_tasks = set() # type: Set[Task]
+ _finished_tasks = [] # type: List[Task]
+ _lock = threading.Lock()
+
+ _task_local_data = threading.local()
+
+ @classmethod
+ def init(cls):
+ cls.logger = logging.getLogger('taskmgr') # type: ignore
+ NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished')
+
+ @classmethod
+ def _handle_finished_task(cls, task):
+ cls.logger.info("finished %s", task) # type: ignore
+ with cls._lock:
+ cls._executing_tasks.remove(task)
+ cls._finished_tasks.append(task)
+
+ @classmethod
+ def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None,
+ exception_handler=None):
+ if not args:
+ args = []
+ if not kwargs:
+ kwargs = {}
+ if not executor:
+ executor = ThreadedExecutor()
+ task = Task(name, metadata, fn, args, kwargs, executor,
+ exception_handler)
+ with cls._lock:
+ if task in cls._executing_tasks:
+ cls.logger.debug("task already executing: %s", task) # type: ignore
+ for t in cls._executing_tasks:
+ if t == task:
+ return t
+ cls.logger.debug("created %s", task) # type: ignore
+ cls._executing_tasks.add(task)
+ cls.logger.info("running %s", task) # type: ignore
+ task._run()
+ return task
+
+ @classmethod
+ def current_task(cls):
+ """
+ Returns the current task object.
+ This method should only be called from a threaded task operation code.
+ """
+ return cls._task_local_data.task
+
+ @classmethod
+ def _cleanup_old_tasks(cls, task_list):
+ """
+ The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent
+ finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL
+ value.
+ """
+ now = datetime.now()
+ for idx, t in enumerate(task_list):
+ if idx < cls.FINISHED_TASK_SIZE:
+ continue
+ if now - datetime.fromtimestamp(t[1].end_time) > \
+ timedelta(seconds=cls.FINISHED_TASK_TTL):
+ del cls._finished_tasks[t[0]]
+
+ @classmethod
+ def list(cls, name_glob=None):
+ executing_tasks = []
+ finished_tasks = []
+ with cls._lock:
+ for task in cls._executing_tasks:
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ executing_tasks.append(task)
+ for idx, task in enumerate(cls._finished_tasks):
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ finished_tasks.append((idx, task))
+ finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True)
+ cls._cleanup_old_tasks(finished_tasks)
+ executing_tasks.sort(key=lambda t: t.begin_time, reverse=True)
+ return executing_tasks, [t[1] for t in finished_tasks]
+
+ @classmethod
+ def list_serializable(cls, ns_glob=None):
+ ex_t, fn_t = cls.list(ns_glob)
+ return [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'progress': t.progress
+ } for t in ex_t if t.begin_time], [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()),
+ 'duration': t.duration,
+ 'progress': t.progress,
+ 'success': not t.exception,
+ 'ret_value': t.ret_value if not t.exception else None,
+ 'exception': t.ret_value if t.exception and t.ret_value else (
+ {'detail': str(t.exception)} if t.exception else None)
+ } for t in fn_t]
+
+
+# pylint: disable=protected-access
+class TaskExecutor(object):
+ def __init__(self):
+ self.logger = logging.getLogger('taskexec')
+ self.task = None
+
+ def init(self, task):
+ self.task = task
+
+ # pylint: disable=broad-except
+ def start(self):
+ self.logger.debug("executing task %s", self.task)
+ try:
+ self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore
+ except Exception as ex:
+ self.logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+
+ def finish(self, ret_value, exception):
+ if not exception:
+ self.logger.debug("successfully finished task: %s", self.task)
+ else:
+ self.logger.debug("task finished with exception: %s", self.task)
+ self.task._complete(ret_value, exception) # type: ignore
+
+
+# pylint: disable=protected-access
+class ThreadedExecutor(TaskExecutor):
+ def __init__(self):
+ super(ThreadedExecutor, self).__init__()
+ self._thread = threading.Thread(target=self._run)
+
+ def start(self):
+ self._thread.start()
+
+ # pylint: disable=broad-except
+ def _run(self):
+ TaskManager._task_local_data.task = self.task
+ try:
+ self.logger.debug("executing task %s", self.task)
+ val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore
+ except Exception as ex:
+ self.logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+ else:
+ self.finish(val, None)
+
+
+class Task(object):
+ def __init__(self, name, metadata, fn, args, kwargs, executor,
+ exception_handler=None):
+ self.name = name
+ self.metadata = metadata
+ self.fn = fn
+ self.fn_args = args
+ self.fn_kwargs = kwargs
+ self.executor = executor
+ self.ex_handler = exception_handler
+ self.running = False
+ self.event = threading.Event()
+ self.progress = None
+ self.ret_value = None
+ self._begin_time: Optional[float] = None
+ self._end_time: Optional[float] = None
+ self.duration = 0.0
+ self.exception = None
+ self.logger = logging.getLogger('task')
+ self.lock = threading.Lock()
+
+ def __hash__(self):
+ return hash((self.name, tuple(sorted(self.metadata.items()))))
+
+ def __eq__(self, other):
+ return self.name == other.name and self.metadata == other.metadata
+
+ def __str__(self):
+ return "Task(ns={}, md={})" \
+ .format(self.name, self.metadata)
+
+ def __repr__(self):
+ return str(self)
+
+ def _run(self):
+ NotificationQueue.register(self._handle_task_finished, 'cd_task_finished', 100)
+ with self.lock:
+ assert not self.running
+ self.executor.init(self)
+ self.set_progress(0, in_lock=True)
+ self._begin_time = time.time()
+ self.running = True
+ self.executor.start()
+
+ def _complete(self, ret_value, exception=None):
+ now = time.time()
+ if exception and self.ex_handler:
+ # pylint: disable=broad-except
+ try:
+ ret_value = self.ex_handler(exception, task=self)
+ except Exception as ex:
+ exception = ex
+ with self.lock:
+ assert self.running, "_complete cannot be called before _run"
+ self._end_time = now
+ self.ret_value = ret_value
+ self.exception = exception
+ self.duration = now - self.begin_time
+ if not self.exception:
+ self.set_progress(100, True)
+ NotificationQueue.new_notification('cd_task_finished', self)
+ self.logger.debug("execution of %s finished in: %s s", self,
+ self.duration)
+
+ def _handle_task_finished(self, task):
+ if self == task:
+ NotificationQueue.deregister(self._handle_task_finished)
+ self.event.set()
+
+ def wait(self, timeout=None):
+ with self.lock:
+ assert self.running, "wait cannot be called before _run"
+ ev = self.event
+
+ success = ev.wait(timeout=timeout)
+ with self.lock:
+ if success:
+ # the action executed within the timeout
+ if self.exception:
+ # pylint: disable=raising-bad-type
+ # execution raised an exception
+ raise self.exception
+ return TaskManager.VALUE_DONE, self.ret_value
+ # the action is still executing
+ return TaskManager.VALUE_EXECUTING, None
+
+ def inc_progress(self, delta, in_lock=False):
+ if not isinstance(delta, int) or delta < 0:
+ raise Exception("Progress delta value must be a positive integer")
+ if not in_lock:
+ self.lock.acquire()
+ prog = self.progress + delta # type: ignore
+ self.progress = prog if prog <= 100 else 100
+ if not in_lock:
+ self.lock.release()
+
+ def set_progress(self, percentage, in_lock=False):
+ if not isinstance(percentage, int) or percentage < 0 or percentage > 100:
+ raise Exception("Progress value must be in percentage "
+ "(0 <= percentage <= 100)")
+ if not in_lock:
+ self.lock.acquire()
+ self.progress = percentage
+ if not in_lock:
+ self.lock.release()
+
+ @property
+ def end_time(self) -> float:
+ assert self._end_time is not None
+ return self._end_time
+
+ @property
+ def begin_time(self) -> float:
+ assert self._begin_time is not None
+ return self._begin_time
+
+
+def prepare_url_prefix(url_prefix):
+ """
+ return '' if no prefix, or '/prefix' without slash in the end.
+ """
+ url_prefix = urllib.parse.urljoin('/', url_prefix)
+ return url_prefix.rstrip('/')
+
+
+def dict_contains_path(dct, keys):
+ """
+ Tests whether the keys exist recursively in `dictionary`.
+
+ :type dct: dict
+ :type keys: list
+ :rtype: bool
+ """
+ if keys:
+ if not isinstance(dct, dict):
+ return False
+ key = keys.pop(0)
+ if key in dct:
+ dct = dct[key]
+ return dict_contains_path(dct, keys)
+ return False
+ return True
+
+
+def dict_get(obj, path, default=None):
+ """
+ Get the value at any depth of a nested object based on the path
+ described by `path`. If path doesn't exist, `default` is returned.
+ """
+ current = obj
+ for part in path.split('.'):
+ if not isinstance(current, dict):
+ return default
+ if part not in current.keys():
+ return default
+ current = current.get(part, {})
+ return current
+
+
+def getargspec(func):
+ try:
+ while True:
+ func = func.__wrapped__
+ except AttributeError:
+ pass
+ # pylint: disable=deprecated-method
+ return inspect.getfullargspec(func)
+
+
+def str_to_bool(val):
+ """
+ Convert a string representation of truth to True or False.
+
+ >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True)
+ True
+
+ >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False)
+ False
+
+ >>> str_to_bool('xyz')
+ Traceback (most recent call last):
+ ...
+ ValueError: invalid truth value 'xyz'
+
+ :param val: The value to convert.
+ :type val: str|bool
+ :rtype: bool
+ """
+ if isinstance(val, bool):
+ return val
+ return bool(strtobool(val))
+
+
+def json_str_to_object(value): # type: (AnyStr) -> Any
+ """
+ It converts a JSON valid string representation to object.
+
+ >>> result = json_str_to_object('{"a": 1}')
+ >>> result == {'a': 1}
+ True
+ """
+ if value == '':
+ return value
+
+ try:
+ # json.loads accepts binary input from version >=3.6
+ value = value.decode('utf-8') # type: ignore
+ except AttributeError:
+ pass
+
+ return json.loads(value)
+
+
+def partial_dict(orig, keys): # type: (Dict, List[str]) -> Dict
+ """
+ It returns Dict containing only the selected keys of original Dict.
+
+ >>> partial_dict({'a': 1, 'b': 2}, ['b'])
+ {'b': 2}
+ """
+ return {k: orig[k] for k in keys}
+
+
+def get_request_body_params(request):
+ """
+ Helper function to get parameters from the request body.
+ :param request The CherryPy request object.
+ :type request: cherrypy.Request
+ :return: A dictionary containing the parameters.
+ :rtype: dict
+ """
+ params = {} # type: dict
+ if request.method not in request.methods_with_bodies:
+ return params
+
+ content_type = request.headers.get('Content-Type', '')
+ if content_type in ['application/json', 'text/javascript']:
+ if not hasattr(request, 'json'):
+ raise cherrypy.HTTPError(400, 'Expected JSON body')
+ if isinstance(request.json, str):
+ params.update(json.loads(request.json))
+ else:
+ params.update(request.json)
+
+ return params
+
+
+def find_object_in_list(key, value, iterable):
+ """
+ Get the first occurrence of an object within a list with
+ the specified key/value.
+
+ >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}])
+ {'name': 'bar'}
+
+ >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None
+ True
+
+ >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None
+ True
+
+ >>> find_object_in_list('foo', 'bar', []) is None
+ True
+
+ :param key: The name of the key.
+ :param value: The value to search for.
+ :param iterable: The list to process.
+ :return: Returns the found object or None.
+ """
+ for obj in iterable:
+ if key in obj and obj[key] == value:
+ return obj
+ return None
+
+
+def merge_list_of_dicts_by_key(target_list: list, source_list: list, key: str):
+ target_list = {d[key]: d for d in target_list}
+ for sdict in source_list:
+ if bool(sdict):
+ if sdict[key] in target_list:
+ target_list[sdict[key]].update(sdict)
+ target_list = [value for value in target_list.values()]
+ return target_list