From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/pybind/mgr/dashboard/settings.py | 255 +++++++++++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 src/pybind/mgr/dashboard/settings.py (limited to 'src/pybind/mgr/dashboard/settings.py') diff --git a/src/pybind/mgr/dashboard/settings.py b/src/pybind/mgr/dashboard/settings.py new file mode 100644 index 000000000..3b4a8e4ee --- /dev/null +++ b/src/pybind/mgr/dashboard/settings.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import errno +import inspect +from ast import literal_eval +from typing import Any + +from mgr_module import CLICheckNonemptyFileInput + +from . import mgr + + +class Setting: + """ + Setting representation that allows to set a default value and a list of allowed data types. + :param default_value: The name of the bucket. + :param types: a list consisting of the primary/preferred type and, optionally, + secondary/legacy types for backward compatibility. + """ + + def __init__(self, default_value: Any, types: list): + if not isinstance(types, list): + raise ValueError('Setting types must be a list.') + default_value_type = type(default_value) + if default_value_type not in types: + raise ValueError('Default value type not allowed.') + self.default_value = default_value + self.types = types + + def types_as_str(self): + return ','.join([x.__name__ for x in self.types]) + + def cast(self, value): + for type_index, setting_type in enumerate(self.types): + try: + if setting_type.__name__ == 'bool' and str(value).lower() == 'false': + return False + elif setting_type.__name__ == 'dict': + return literal_eval(value) + return setting_type(value) + except (SyntaxError, TypeError, ValueError) as error: + if type_index == len(self.types) - 1: + raise error + + +class Options(object): + """ + If you need to store some configuration value please add the config option + name as a class attribute to this class. + + Example:: + + GRAFANA_API_HOST = ('localhost', str) + GRAFANA_API_PORT = (3000, int) + """ + ENABLE_BROWSABLE_API = Setting(True, [bool]) + REST_REQUESTS_TIMEOUT = Setting(45, [int]) + + # AUTHENTICATION ATTEMPTS + ACCOUNT_LOCKOUT_ATTEMPTS = Setting(10, [int]) + + # API auditing + AUDIT_API_ENABLED = Setting(False, [bool]) + AUDIT_API_LOG_PAYLOAD = Setting(True, [bool]) + + # RGW settings + RGW_API_ACCESS_KEY = Setting('', [dict, str]) + RGW_API_SECRET_KEY = Setting('', [dict, str]) + RGW_API_ADMIN_RESOURCE = Setting('admin', [str]) + RGW_API_SSL_VERIFY = Setting(True, [bool]) + + # Grafana settings + GRAFANA_API_URL = Setting('', [str]) + GRAFANA_FRONTEND_API_URL = Setting('', [str]) + GRAFANA_API_USERNAME = Setting('admin', [str]) + GRAFANA_API_PASSWORD = Setting('admin', [str]) + GRAFANA_API_SSL_VERIFY = Setting(True, [bool]) + GRAFANA_UPDATE_DASHBOARDS = Setting(False, [bool]) + + # NFS Ganesha settings + GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE = Setting('', [str]) + + # Prometheus settings + PROMETHEUS_API_HOST = Setting('', [str]) + PROMETHEUS_API_SSL_VERIFY = Setting(True, [bool]) + ALERTMANAGER_API_HOST = Setting('', [str]) + ALERTMANAGER_API_SSL_VERIFY = Setting(True, [bool]) + + # iSCSI management settings + ISCSI_API_SSL_VERIFICATION = Setting(True, [bool]) + + # user management settings + # Time span of user passwords to expire in days. + # The default value is '0' which means that user passwords are + # never going to expire. + USER_PWD_EXPIRATION_SPAN = Setting(0, [int]) + # warning levels to notify the user that the password is going + # to expire soon + USER_PWD_EXPIRATION_WARNING_1 = Setting(10, [int]) + USER_PWD_EXPIRATION_WARNING_2 = Setting(5, [int]) + + # Password policy + PWD_POLICY_ENABLED = Setting(True, [bool]) + # Individual checks + PWD_POLICY_CHECK_LENGTH_ENABLED = Setting(True, [bool]) + PWD_POLICY_CHECK_OLDPWD_ENABLED = Setting(True, [bool]) + PWD_POLICY_CHECK_USERNAME_ENABLED = Setting(False, [bool]) + PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = Setting(False, [bool]) + PWD_POLICY_CHECK_COMPLEXITY_ENABLED = Setting(False, [bool]) + PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = Setting(False, [bool]) + PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = Setting(False, [bool]) + # Settings + PWD_POLICY_MIN_LENGTH = Setting(8, [int]) + PWD_POLICY_MIN_COMPLEXITY = Setting(10, [int]) + PWD_POLICY_EXCLUSION_LIST = Setting(','.join(['osd', 'host', 'dashboard', 'pool', + 'block', 'nfs', 'ceph', 'monitors', + 'gateway', 'logs', 'crush', 'maps']), + [str]) + + @staticmethod + def has_default_value(name): + return getattr(Settings, name, None) is None or \ + getattr(Settings, name) == getattr(Options, name).default_value + + +class SettingsMeta(type): + def __getattr__(cls, attr): + setting = getattr(Options, attr) + return setting.cast(mgr.get_module_option(attr, setting.default_value)) + + def __setattr__(cls, attr, value): + if not attr.startswith('_') and hasattr(Options, attr): + mgr.set_module_option(attr, str(value)) + else: + setattr(SettingsMeta, attr, value) + + def __delattr__(cls, attr): + if not attr.startswith('_') and hasattr(Options, attr): + mgr.set_module_option(attr, None) + + +# pylint: disable=no-init +class Settings(object, metaclass=SettingsMeta): + pass + + +def _options_command_map(): + def filter_attr(member): + return not inspect.isroutine(member) + + cmd_map = {} + for option, setting in inspect.getmembers(Options, filter_attr): + if option.startswith('_'): + continue + key_get = 'dashboard get-{}'.format(option.lower().replace('_', '-')) + key_set = 'dashboard set-{}'.format(option.lower().replace('_', '-')) + key_reset = 'dashboard reset-{}'.format(option.lower().replace('_', '-')) + cmd_map[key_get] = {'name': option, 'type': None} + cmd_map[key_set] = {'name': option, 'type': setting.types_as_str()} + cmd_map[key_reset] = {'name': option, 'type': None} + return cmd_map + + +_OPTIONS_COMMAND_MAP = _options_command_map() + + +def options_command_list(): + """ + This function generates a list of ``get`` and ``set`` commands + for each declared configuration option in class ``Options``. + """ + def py2ceph(pytype): + if pytype == str: + return 'CephString' + elif pytype == int: + return 'CephInt' + return 'CephString' + + cmd_list = [] + for cmd, opt in _OPTIONS_COMMAND_MAP.items(): + if cmd.startswith('dashboard get'): + cmd_list.append({ + 'cmd': '{}'.format(cmd), + 'desc': 'Get the {} option value'.format(opt['name']), + 'perm': 'r' + }) + elif cmd.startswith('dashboard set'): + cmd_entry = { + 'cmd': '{} name=value,type={}' + .format(cmd, py2ceph(opt['type'])), + 'desc': 'Set the {} option value'.format(opt['name']), + 'perm': 'w' + } + if handles_secret(cmd): + cmd_entry['cmd'] = cmd + cmd_entry['desc'] = '{} read from -i '.format(cmd_entry['desc']) + cmd_list.append(cmd_entry) + elif cmd.startswith('dashboard reset'): + desc = 'Reset the {} option to its default value'.format( + opt['name']) + cmd_list.append({ + 'cmd': '{}'.format(cmd), + 'desc': desc, + 'perm': 'w' + }) + + return cmd_list + + +def options_schema_list(): + def filter_attr(member): + return not inspect.isroutine(member) + + result = [] + for option, setting in inspect.getmembers(Options, filter_attr): + if option.startswith('_'): + continue + result.append({'name': option, 'default': setting.default_value, + 'type': setting.types_as_str()}) + + return result + + +def handle_option_command(cmd, inbuf): + if cmd['prefix'] not in _OPTIONS_COMMAND_MAP: + return -errno.ENOSYS, '', "Command not found '{}'".format(cmd['prefix']) + + opt = _OPTIONS_COMMAND_MAP[cmd['prefix']] + + if cmd['prefix'].startswith('dashboard reset'): + delattr(Settings, opt['name']) + return 0, 'Option {} reset to default value "{}"'.format( + opt['name'], getattr(Settings, opt['name'])), '' + elif cmd['prefix'].startswith('dashboard get'): + return 0, str(getattr(Settings, opt['name'])), '' + elif cmd['prefix'].startswith('dashboard set'): + if handles_secret(cmd['prefix']): + value, stdout, stderr = get_secret(inbuf=inbuf) + if stderr: + return value, stdout, stderr + else: + value = cmd['value'] + setting = getattr(Options, opt['name']) + setattr(Settings, opt['name'], setting.cast(value)) + return 0, 'Option {} updated'.format(opt['name']), '' + + +def handles_secret(cmd: str) -> bool: + return bool([cmd for secret_word in ['password', 'key'] if (secret_word in cmd)]) + + +@CLICheckNonemptyFileInput(desc='password/secret') +def get_secret(inbuf=None): + return inbuf, None, None -- cgit v1.2.3