summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tools.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/dashboard/tools.py932
1 files changed, 932 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tools.py b/src/pybind/mgr/dashboard/tools.py
new file mode 100644
index 00000000..82a82194
--- /dev/null
+++ b/src/pybind/mgr/dashboard/tools.py
@@ -0,0 +1,932 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import sys
+import inspect
+import json
+import functools
+
+import collections
+from datetime import datetime, timedelta
+from distutils.util import strtobool
+import fnmatch
+import time
+import threading
+import socket
+import six
+from six.moves import urllib
+import cherrypy
+
+try:
+ from urlparse import urljoin
+except ImportError:
+ from urllib.parse import urljoin
+
+from . import logger, mgr
+from .exceptions import ViewCacheNoDataException
+from .settings import Settings
+from .services.auth import JwtManager
+
+
+def ensure_str(s, encoding='utf-8', errors='strict'):
+ """Ported from six."""
+ if not isinstance(s, (six.text_type, six.binary_type)):
+ raise TypeError("not expecting type '%s'" % type(s))
+ if six.PY2 and isinstance(s, six.text_type):
+ s = s.encode(encoding, errors)
+ elif six.PY3 and isinstance(s, six.binary_type):
+ s = s.decode(encoding, errors)
+ return s
+
+
+class RequestLoggingTool(cherrypy.Tool):
+ def __init__(self):
+ cherrypy.Tool.__init__(self, 'before_handler', self.request_begin,
+ priority=10)
+
+ def _setup(self):
+ cherrypy.Tool._setup(self)
+ cherrypy.request.hooks.attach('on_end_request', self.request_end,
+ priority=5)
+ cherrypy.request.hooks.attach('after_error_response', self.request_error,
+ priority=5)
+
+ def request_begin(self):
+ req = cherrypy.request
+ user = JwtManager.get_username()
+ if user is not None:
+ # PY2: Encode user to str to prevent further implicit decoding
+ user = ensure_str(user)
+ # Log the request.
+ logger.debug('[%s:%s] [%s] [%s] %s', req.remote.ip, req.remote.port,
+ req.method, user, req.path_info)
+ # Audit the request.
+ if Settings.AUDIT_API_ENABLED and req.method not in ['GET']:
+ url = build_url(req.remote.ip, scheme=req.scheme,
+ port=req.remote.port)
+ msg = '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \
+ 'user=\'{}\''.format(url, req.path_info, req.method, user)
+ if Settings.AUDIT_API_LOG_PAYLOAD:
+ params = dict(req.params or {}, **get_request_body_params(req))
+ # Hide sensitive data like passwords, secret keys, ...
+ # Extend the list of patterns to search for if necessary.
+ # Currently parameters like this are processed:
+ # - secret_key
+ # - user_password
+ # - new_passwd_to_login
+ keys = []
+ for key in ['password', 'passwd', 'secret']:
+ keys.extend([x for x in params.keys() if key in x])
+ for key in keys:
+ params[key] = '***'
+ msg = '{} params=\'{}\''.format(msg, json.dumps(params))
+ mgr.cluster_log('audit', mgr.CLUSTER_LOG_PRIO_INFO, msg)
+
+ def request_error(self):
+ self._request_log(logger.error)
+ logger.error(cherrypy.response.body)
+
+ def request_end(self):
+ status = cherrypy.response.status[:3]
+ if status in ["401"]:
+ # log unauthorized accesses
+ self._request_log(logger.warning)
+ else:
+ self._request_log(logger.info)
+
+ def _format_bytes(self, num):
+ units = ['B', 'K', 'M', 'G']
+
+ if isinstance(num, str):
+ try:
+ num = int(num)
+ except ValueError:
+ return "n/a"
+
+ format_str = "{:.0f}{}"
+ for i, unit in enumerate(units):
+ div = 2**(10*i)
+ if num < 2**(10*(i+1)):
+ if num % div == 0:
+ format_str = "{}{}"
+ else:
+ div = float(div)
+ format_str = "{:.1f}{}"
+ return format_str.format(num/div, unit[0])
+
+ # content-length bigger than 1T!! return value in bytes
+ return "{}B".format(num)
+
+ def _request_log(self, logger_fn):
+ req = cherrypy.request
+ res = cherrypy.response
+ lat = time.time() - res.time
+ user = JwtManager.get_username()
+ status = res.status[:3] if isinstance(res.status, str) else res.status
+ if 'Content-Length' in res.headers:
+ length = self._format_bytes(res.headers['Content-Length'])
+ else:
+ length = self._format_bytes(0)
+ if user:
+ logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip,
+ req.remote.port, req.method, status,
+ "{0:.3f}s".format(lat), ensure_str(user), length, req.path_info)
+ else:
+ logger_fn("[%s:%s] [%s] [%s] [%s] [%s] %s", req.remote.ip,
+ req.remote.port, req.method, status,
+ "{0:.3f}s".format(lat), length, req.path_info)
+
+
+# pylint: disable=too-many-instance-attributes
+class ViewCache(object):
+ VALUE_OK = 0
+ VALUE_STALE = 1
+ VALUE_NONE = 2
+
+ class GetterThread(threading.Thread):
+ def __init__(self, view, fn, args, kwargs):
+ super(ViewCache.GetterThread, self).__init__()
+ self._view = view
+ self.event = threading.Event()
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ # pylint: disable=broad-except
+ def run(self):
+ t0 = 0.0
+ t1 = 0.0
+ try:
+ t0 = time.time()
+ logger.debug("VC: starting execution of %s", self.fn)
+ val = self.fn(*self.args, **self.kwargs)
+ t1 = time.time()
+ except Exception as ex:
+ with self._view.lock:
+ logger.exception("Error while calling fn=%s ex=%s", self.fn,
+ str(ex))
+ self._view.value = None
+ self._view.value_when = None
+ self._view.getter_thread = None
+ self._view.exception = ex
+ else:
+ with self._view.lock:
+ self._view.latency = t1 - t0
+ self._view.value = val
+ self._view.value_when = datetime.now()
+ self._view.getter_thread = None
+ self._view.exception = None
+
+ logger.debug("VC: execution of %s finished in: %s", self.fn,
+ t1 - t0)
+ self.event.set()
+
+ class RemoteViewCache(object):
+ # Return stale data if
+ STALE_PERIOD = 1.0
+
+ def __init__(self, timeout):
+ self.getter_thread = None
+ # Consider data within 1s old to be sufficiently fresh
+ self.timeout = timeout
+ self.event = threading.Event()
+ self.value_when = None
+ self.value = None
+ self.latency = 0
+ self.exception = None
+ self.lock = threading.Lock()
+
+ def reset(self):
+ with self.lock:
+ self.value_when = None
+ self.value = None
+
+ def run(self, fn, args, kwargs):
+ """
+ If data less than `stale_period` old is available, return it
+ immediately.
+ If an attempt to fetch data does not complete within `timeout`, then
+ return the most recent data available, with a status to indicate that
+ it is stale.
+
+ Initialization does not count towards the timeout, so the first call
+ on one of these objects during the process lifetime may be slower
+ than subsequent calls.
+
+ :return: 2-tuple of value status code, value
+ """
+ with self.lock:
+ now = datetime.now()
+ if self.value_when and now - self.value_when < timedelta(
+ seconds=self.STALE_PERIOD):
+ return ViewCache.VALUE_OK, self.value
+
+ if self.getter_thread is None:
+ self.getter_thread = ViewCache.GetterThread(self, fn, args,
+ kwargs)
+ self.getter_thread.start()
+ else:
+ logger.debug("VC: getter_thread still alive for: %s", fn)
+
+ ev = self.getter_thread.event
+
+ success = ev.wait(timeout=self.timeout)
+
+ with self.lock:
+ if success:
+ # We fetched the data within the timeout
+ if self.exception:
+ # execution raised an exception
+ # pylint: disable=raising-bad-type
+ raise self.exception
+ return ViewCache.VALUE_OK, self.value
+ if self.value_when is not None:
+ # We have some data, but it doesn't meet freshness requirements
+ return ViewCache.VALUE_STALE, self.value
+ # We have no data, not even stale data
+ raise ViewCacheNoDataException()
+
+ def __init__(self, timeout=5):
+ self.timeout = timeout
+ self.cache_by_args = {}
+
+ def __call__(self, fn):
+ def wrapper(*args, **kwargs):
+ rvc = self.cache_by_args.get(args, None)
+ if not rvc:
+ rvc = ViewCache.RemoteViewCache(self.timeout)
+ self.cache_by_args[args] = rvc
+ return rvc.run(fn, args, kwargs)
+ wrapper.reset = self.reset
+ return wrapper
+
+ def reset(self):
+ for _, rvc in self.cache_by_args.items():
+ rvc.reset()
+
+
+class NotificationQueue(threading.Thread):
+ _ALL_TYPES_ = '__ALL__'
+ _listeners = collections.defaultdict(set)
+ _lock = threading.Lock()
+ _cond = threading.Condition()
+ _queue = collections.deque()
+ _running = False
+ _instance = None
+
+ def __init__(self):
+ super(NotificationQueue, self).__init__()
+
+ @classmethod
+ def start_queue(cls):
+ with cls._lock:
+ if cls._instance:
+ # the queue thread is already running
+ return
+ cls._running = True
+ cls._instance = NotificationQueue()
+ logger.debug("starting notification queue")
+ cls._instance.start()
+
+ @classmethod
+ def stop(cls):
+ with cls._lock:
+ if not cls._instance:
+ # the queue thread was not started
+ return
+ instance = cls._instance
+ cls._instance = None
+ cls._running = False
+ with cls._cond:
+ cls._cond.notify()
+ logger.debug("waiting for notification queue to finish")
+ instance.join()
+ logger.debug("notification queue stopped")
+
+ @classmethod
+ def _registered_handler(cls, func, n_types):
+ for _, reg_func in cls._listeners[n_types]:
+ if reg_func == func:
+ return True
+ return False
+
+ @classmethod
+ def register(cls, func, n_types=None, priority=1):
+ """Registers function to listen for notifications
+
+ If the second parameter `n_types` is omitted, the function in `func`
+ parameter will be called for any type of notifications.
+
+ Args:
+ func (function): python function ex: def foo(val)
+ n_types (str|list): the single type to listen, or a list of types
+ priority (int): the priority level (1=max, +inf=min)
+ """
+ with cls._lock:
+ if not n_types:
+ n_types = [cls._ALL_TYPES_]
+ elif isinstance(n_types, str):
+ n_types = [n_types]
+ elif not isinstance(n_types, list):
+ raise Exception("n_types param is neither a string nor a list")
+ for ev_type in n_types:
+ if not cls._registered_handler(func, ev_type):
+ cls._listeners[ev_type].add((priority, func))
+ logger.debug("NQ: function %s was registered for events of"
+ " type %s", func, ev_type)
+
+ @classmethod
+ def deregister(cls, func, n_types=None):
+ """Removes the listener function from this notification queue
+
+ If the second parameter `n_types` is omitted, the function is removed
+ from all event types, otherwise the function is removed only for the
+ specified event types.
+
+ Args:
+ func (function): python function
+ n_types (str|list): the single event type, or a list of event types
+ """
+ with cls._lock:
+ if not n_types:
+ n_types = list(cls._listeners.keys())
+ elif isinstance(n_types, str):
+ n_types = [n_types]
+ elif not isinstance(n_types, list):
+ raise Exception("n_types param is neither a string nor a list")
+ for ev_type in n_types:
+ listeners = cls._listeners[ev_type]
+ toRemove = None
+ for pr, fn in listeners:
+ if fn == func:
+ toRemove = (pr, fn)
+ break
+ if toRemove:
+ listeners.discard(toRemove)
+ logger.debug("NQ: function %s was deregistered for events "
+ "of type %s", func, ev_type)
+
+ @classmethod
+ def new_notification(cls, notify_type, notify_value):
+ with cls._cond:
+ cls._queue.append((notify_type, notify_value))
+ cls._cond.notify()
+
+ @classmethod
+ def _notify_listeners(cls, events):
+ for ev in events:
+ notify_type, notify_value = ev
+ with cls._lock:
+ listeners = list(cls._listeners[notify_type])
+ listeners.extend(cls._listeners[cls._ALL_TYPES_])
+ listeners.sort(key=lambda lis: lis[0])
+ for listener in listeners:
+ listener[1](notify_value)
+
+ def run(self):
+ logger.debug("notification queue started")
+ while self._running:
+ private_buffer = []
+ logger.debug("NQ: processing queue: %s", len(self._queue))
+ try:
+ while True:
+ private_buffer.append(self._queue.popleft())
+ except IndexError:
+ pass
+ self._notify_listeners(private_buffer)
+ with self._cond:
+ while self._running and not self._queue:
+ self._cond.wait()
+ # flush remaining events
+ logger.debug("NQ: flush remaining events: %s", len(self._queue))
+ self._notify_listeners(self._queue)
+ self._queue.clear()
+ logger.debug("notification queue finished")
+
+
+# pylint: disable=too-many-arguments, protected-access
+class TaskManager(object):
+ FINISHED_TASK_SIZE = 10
+ FINISHED_TASK_TTL = 60.0
+
+ VALUE_DONE = "done"
+ VALUE_EXECUTING = "executing"
+
+ _executing_tasks = set()
+ _finished_tasks = []
+ _lock = threading.Lock()
+
+ _task_local_data = threading.local()
+
+ @classmethod
+ def init(cls):
+ NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished')
+
+ @classmethod
+ def _handle_finished_task(cls, task):
+ logger.info("TM: finished %s", task)
+ with cls._lock:
+ cls._executing_tasks.remove(task)
+ cls._finished_tasks.append(task)
+
+ @classmethod
+ def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None,
+ exception_handler=None):
+ if not args:
+ args = []
+ if not kwargs:
+ kwargs = {}
+ if not executor:
+ executor = ThreadedExecutor()
+ task = Task(name, metadata, fn, args, kwargs, executor,
+ exception_handler)
+ with cls._lock:
+ if task in cls._executing_tasks:
+ logger.debug("TM: task already executing: %s", task)
+ for t in cls._executing_tasks:
+ if t == task:
+ return t
+ logger.debug("TM: created %s", task)
+ cls._executing_tasks.add(task)
+ logger.info("TM: running %s", task)
+ task._run()
+ return task
+
+ @classmethod
+ def current_task(cls):
+ """
+ Returns the current task object.
+ This method should only be called from a threaded task operation code.
+ """
+ return cls._task_local_data.task
+
+ @classmethod
+ def _cleanup_old_tasks(cls, task_list):
+ """
+ The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent
+ finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL
+ value.
+ """
+ now = datetime.now()
+ for idx, t in enumerate(task_list):
+ if idx < cls.FINISHED_TASK_SIZE:
+ continue
+ if now - datetime.fromtimestamp(t[1].end_time) > \
+ timedelta(seconds=cls.FINISHED_TASK_TTL):
+ del cls._finished_tasks[t[0]]
+
+ @classmethod
+ def list(cls, name_glob=None):
+ executing_tasks = []
+ finished_tasks = []
+ with cls._lock:
+ for task in cls._executing_tasks:
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ executing_tasks.append(task)
+ for idx, task in enumerate(cls._finished_tasks):
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ finished_tasks.append((idx, task))
+ finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True)
+ cls._cleanup_old_tasks(finished_tasks)
+ executing_tasks.sort(key=lambda t: t.begin_time, reverse=True)
+ return executing_tasks, [t[1] for t in finished_tasks]
+
+ @classmethod
+ def list_serializable(cls, ns_glob=None):
+ ex_t, fn_t = cls.list(ns_glob)
+ return [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'progress': t.progress
+ } for t in ex_t if t.begin_time], [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()),
+ 'duration': t.duration,
+ 'progress': t.progress,
+ 'success': not t.exception,
+ 'ret_value': t.ret_value if not t.exception else None,
+ 'exception': t.ret_value if t.exception and t.ret_value else (
+ {'detail': str(t.exception)} if t.exception else None)
+ } for t in fn_t]
+
+
+# pylint: disable=protected-access
+class TaskExecutor(object):
+ def __init__(self):
+ self.task = None
+
+ def init(self, task):
+ self.task = task
+
+ # pylint: disable=broad-except
+ def start(self):
+ logger.debug("EX: executing task %s", self.task)
+ try:
+ self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ except Exception as ex:
+ logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+
+ def finish(self, ret_value, exception):
+ if not exception:
+ logger.debug("EX: successfully finished task: %s", self.task)
+ else:
+ logger.debug("EX: task finished with exception: %s", self.task)
+ self.task._complete(ret_value, exception)
+
+
+# pylint: disable=protected-access
+class ThreadedExecutor(TaskExecutor):
+ def __init__(self):
+ super(ThreadedExecutor, self).__init__()
+ self._thread = threading.Thread(target=self._run)
+
+ def start(self):
+ self._thread.start()
+
+ # pylint: disable=broad-except
+ def _run(self):
+ TaskManager._task_local_data.task = self.task
+ try:
+ logger.debug("TEX: executing task %s", self.task)
+ val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ except Exception as ex:
+ logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+ else:
+ self.finish(val, None)
+
+
+class Task(object):
+ def __init__(self, name, metadata, fn, args, kwargs, executor,
+ exception_handler=None):
+ self.name = name
+ self.metadata = metadata
+ self.fn = fn
+ self.fn_args = args
+ self.fn_kwargs = kwargs
+ self.executor = executor
+ self.ex_handler = exception_handler
+ self.running = False
+ self.event = threading.Event()
+ self.progress = None
+ self.ret_value = None
+ self.begin_time = None
+ self.end_time = None
+ self.duration = 0
+ self.exception = None
+ self.lock = threading.Lock()
+
+ def __hash__(self):
+ return hash((self.name, tuple(sorted(self.metadata.items()))))
+
+ def __eq__(self, other):
+ return self.name == other.name and self.metadata == other.metadata
+
+ def __str__(self):
+ return "Task(ns={}, md={})" \
+ .format(self.name, self.metadata)
+
+ def __repr__(self):
+ return str(self)
+
+ def _run(self):
+ NotificationQueue.register(self._handle_task_finished, 'cd_task_finished', 100)
+ with self.lock:
+ assert not self.running
+ self.executor.init(self)
+ self.set_progress(0, in_lock=True)
+ self.begin_time = time.time()
+ self.running = True
+ self.executor.start()
+
+ def _complete(self, ret_value, exception=None):
+ now = time.time()
+ if exception and self.ex_handler:
+ # pylint: disable=broad-except
+ try:
+ ret_value = self.ex_handler(exception, task=self)
+ except Exception as ex:
+ exception = ex
+ with self.lock:
+ assert self.running, "_complete cannot be called before _run"
+ self.end_time = now
+ self.ret_value = ret_value
+ self.exception = exception
+ self.duration = now - self.begin_time
+ if not self.exception:
+ self.set_progress(100, True)
+ NotificationQueue.new_notification('cd_task_finished', self)
+ logger.debug("TK: execution of %s finished in: %s s", self,
+ self.duration)
+
+ def _handle_task_finished(self, task):
+ if self == task:
+ NotificationQueue.deregister(self._handle_task_finished)
+ self.event.set()
+
+ def wait(self, timeout=None):
+ with self.lock:
+ assert self.running, "wait cannot be called before _run"
+ ev = self.event
+
+ success = ev.wait(timeout=timeout)
+ with self.lock:
+ if success:
+ # the action executed within the timeout
+ if self.exception:
+ # pylint: disable=raising-bad-type
+ # execution raised an exception
+ raise self.exception
+ return TaskManager.VALUE_DONE, self.ret_value
+ # the action is still executing
+ return TaskManager.VALUE_EXECUTING, None
+
+ def inc_progress(self, delta, in_lock=False):
+ if not isinstance(delta, int) or delta < 0:
+ raise Exception("Progress delta value must be a positive integer")
+ if not in_lock:
+ self.lock.acquire()
+ prog = self.progress + delta
+ self.progress = prog if prog <= 100 else 100
+ if not in_lock:
+ self.lock.release()
+
+ def set_progress(self, percentage, in_lock=False):
+ if not isinstance(percentage, int) or percentage < 0 or percentage > 100:
+ raise Exception("Progress value must be in percentage "
+ "(0 <= percentage <= 100)")
+ if not in_lock:
+ self.lock.acquire()
+ self.progress = percentage
+ if not in_lock:
+ self.lock.release()
+
+
+def is_valid_ip_address(addr):
+ """
+ Validate the given IPv4 or IPv6 address.
+
+ >>> is_valid_ip_address('2001:0db8::1234')
+ True
+
+ >>> is_valid_ip_address('192.168.121.1')
+ True
+
+ >>> is_valid_ip_address('1:::1')
+ False
+
+ >>> is_valid_ip_address('8.1.0')
+ False
+
+ >>> is_valid_ip_address('260.1.0.1')
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IP address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
+ return is_valid_ipv4_address(addr) or is_valid_ipv6_address(addr)
+
+
+def is_valid_ipv4_address(addr):
+ """
+ Validate the given IPv4 address.
+
+ >>> is_valid_ipv4_address('0.0.0.0')
+ True
+
+ >>> is_valid_ipv4_address('192.168.121.1')
+ True
+
+ >>> is_valid_ipv4_address('a.b.c.d')
+ False
+
+ >>> is_valid_ipv4_address('172.1.0.a')
+ False
+
+ >>> is_valid_ipv4_address('2001:0db8::1234')
+ False
+
+ >>> is_valid_ipv4_address(None)
+ False
+
+ >>> is_valid_ipv4_address(123456)
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IPv4 address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
+ try:
+ socket.inet_pton(socket.AF_INET, addr)
+ return True
+ except (socket.error, TypeError):
+ return False
+
+
+def is_valid_ipv6_address(addr):
+ """
+ Validate the given IPv6 address.
+
+ >>> is_valid_ipv6_address('2001:0db8::1234')
+ True
+
+ >>> is_valid_ipv6_address('fe80::bc6c:66b0:5af8:f44')
+ True
+
+ >>> is_valid_ipv6_address('192.168.121.1')
+ False
+
+ >>> is_valid_ipv6_address('a:x::1')
+ False
+
+ >>> is_valid_ipv6_address('1200:0000:AB00:1234:O000:2552:7777:1313')
+ False
+
+ >>> is_valid_ipv6_address(None)
+ False
+
+ >>> is_valid_ipv6_address(123456)
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IPv6 address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
+ try:
+ socket.inet_pton(socket.AF_INET6, addr)
+ return True
+ except (socket.error, TypeError):
+ return False
+
+
+def build_url(host, scheme=None, port=None):
+ """
+ Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
+ automatically.
+
+ >>> build_url('example.com', 'https', 443)
+ 'https://example.com:443'
+
+ >>> build_url(host='example.com', port=443)
+ '//example.com:443'
+
+ >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
+ 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
+
+ :param scheme: The scheme, e.g. http, https or ftp.
+ :type scheme: str
+ :param host: Consisting of either a registered name (including but not limited to
+ a hostname) or an IP address.
+ :type host: str
+ :type port: int
+ :rtype: str
+ """
+ netloc = host if not is_valid_ipv6_address(host) else '[{}]'.format(host)
+ if port:
+ netloc += ':{}'.format(port)
+ pr = urllib.parse.ParseResult(
+ scheme=scheme if scheme else '',
+ netloc=netloc,
+ path='',
+ params='',
+ query='',
+ fragment='')
+ return pr.geturl()
+
+
+def prepare_url_prefix(url_prefix):
+ """
+ return '' if no prefix, or '/prefix' without slash in the end.
+ """
+ url_prefix = urljoin('/', url_prefix)
+ return url_prefix.rstrip('/')
+
+
+def dict_contains_path(dct, keys):
+ """
+ Tests whether the keys exist recursively in `dictionary`.
+
+ :type dct: dict
+ :type keys: list
+ :rtype: bool
+ """
+ if keys:
+ if not isinstance(dct, dict):
+ return False
+ key = keys.pop(0)
+ if key in dct:
+ dct = dct[key]
+ return dict_contains_path(dct, keys)
+ return False
+ return True
+
+
+if sys.version_info > (3, 0):
+ wraps = functools.wraps
+ _getargspec = inspect.getfullargspec
+else:
+ def wraps(func):
+ def decorator(wrapper):
+ new_wrapper = functools.wraps(func)(wrapper)
+ new_wrapper.__wrapped__ = func # set __wrapped__ even for Python 2
+ return new_wrapper
+ return decorator
+
+ _getargspec = inspect.getargspec
+
+
+def getargspec(func):
+ try:
+ while True:
+ func = func.__wrapped__
+ except AttributeError:
+ pass
+ # pylint: disable=deprecated-method
+ return _getargspec(func)
+
+
+def str_to_bool(val):
+ """
+ Convert a string representation of truth to True or False.
+
+ >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True)
+ True
+
+ >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False)
+ False
+
+ >>> str_to_bool('xyz')
+ Traceback (most recent call last):
+ ...
+ ValueError: invalid truth value 'xyz'
+
+ :param val: The value to convert.
+ :type val: str|bool
+ :rtype: bool
+ """
+ if isinstance(val, bool):
+ return val
+ return bool(strtobool(val))
+
+
+def get_request_body_params(request):
+ """
+ Helper function to get parameters from the request body.
+ :param request The CherryPy request object.
+ :type request: cherrypy.Request
+ :return: A dictionary containing the parameters.
+ :rtype: dict
+ """
+ params = {}
+ if request.method not in request.methods_with_bodies:
+ return params
+
+ content_type = request.headers.get('Content-Type', '')
+ if content_type in ['application/json', 'text/javascript']:
+ if not hasattr(request, 'json'):
+ raise cherrypy.HTTPError(400, 'Expected JSON body')
+ if isinstance(request.json, str):
+ params.update(json.loads(request.json))
+ else:
+ params.update(request.json)
+
+ return params
+
+
+def find_object_in_list(key, value, iterable):
+ """
+ Get the first occurrence of an object within a list with
+ the specified key/value.
+
+ >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}])
+ {'name': 'bar'}
+
+ >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None
+ True
+
+ >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None
+ True
+
+ >>> find_object_in_list('foo', 'bar', []) is None
+ True
+
+ :param key: The name of the key.
+ :param value: The value to search for.
+ :param iterable: The list to process.
+ :return: Returns the found object or None.
+ """
+ for obj in iterable:
+ if key in obj and obj[key] == value:
+ return obj
+ return None