diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/dashboard/services | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/dashboard/services')
19 files changed, 5962 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/services/__init__.py b/src/pybind/mgr/dashboard/services/__init__.py new file mode 100644 index 000000000..40a96afc6 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/pybind/mgr/dashboard/services/_paginate.py b/src/pybind/mgr/dashboard/services/_paginate.py new file mode 100644 index 000000000..c8ba300a5 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/_paginate.py @@ -0,0 +1,71 @@ +from typing import Any, Dict, List + +from ..exceptions import DashboardException + + +class ListPaginator: + # pylint: disable=W0102 + def __init__(self, offset: int, limit: int, sort: str, search: str, + input_list: List[Any], default_sort: str, + searchable_params: List[str] = [], sortable_params: List[str] = []): + self.offset = offset + if limit < -1: + raise DashboardException(msg=f'Wrong limit value {limit}', code=400) + self.limit = limit + self.sort = sort + self.search = search + self.input_list = input_list + self.default_sort = default_sort + self.searchable_params = searchable_params + self.sortable_params = sortable_params + self.count = len(self.input_list) + + def get_count(self): + return self.count + + def find_value(self, item: Dict[str, Any], key: str): + # dot separated keys to lookup nested values + keys = key.split('.') + value = item + for nested_key in keys: + if nested_key in value: + value = value[nested_key] + else: + return '' + return value + + def list(self): + end = self.offset + self.limit + # '-1' is a special number to refer to all items in list + if self.limit == -1: + end = len(self.input_list) + + if not self.sort: + self.sort = self.default_sort + + desc = self.sort[0] == '-' + sort_by = self.sort[1:] + + if sort_by not in self.sortable_params: + sort_by = self.default_sort[1:] + + # trim down by search + trimmed_list = [] + if self.search: + for item in self.input_list: + for searchable_param in self.searchable_params: + value = self.find_value(item, searchable_param) + if isinstance(value, str): + if self.search in str(value): + trimmed_list.append(item) + + else: + trimmed_list = self.input_list + + def sort(item): + return self.find_value(item, sort_by) + + sorted_list = sorted(trimmed_list, key=sort, reverse=desc) + self.count = len(sorted_list) + for item in sorted_list[self.offset:end]: + yield item diff --git a/src/pybind/mgr/dashboard/services/access_control.py b/src/pybind/mgr/dashboard/services/access_control.py new file mode 100644 index 000000000..0cbe49bb1 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/access_control.py @@ -0,0 +1,942 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-arguments,too-many-return-statements +# pylint: disable=too-many-branches, too-many-locals, too-many-statements + +import errno +import json +import logging +import re +import threading +import time +from datetime import datetime, timedelta +from string import ascii_lowercase, ascii_uppercase, digits, punctuation +from typing import List, Optional, Sequence + +import bcrypt +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand +from mgr_util import password_hash + +from .. import mgr +from ..exceptions import PasswordPolicyException, PermissionNotValid, \ + PwdExpirationDateNotValid, RoleAlreadyExists, RoleDoesNotExist, \ + RoleIsAssociatedWithUser, RoleNotInUser, ScopeNotInRole, ScopeNotValid, \ + UserAlreadyExists, UserDoesNotExist +from ..security import Permission, Scope +from ..settings import Settings + +logger = logging.getLogger('access_control') +DEFAULT_FILE_DESC = 'password/secret' + + +_P = Permission # short alias + + +class PasswordPolicy(object): + def __init__(self, password, username=None, old_password=None): + """ + :param password: The new plain password. + :type password: str + :param username: The name of the user. + :type username: str | None + :param old_password: The old plain password. + :type old_password: str | None + """ + self.password = password + self.username = username + self.old_password = old_password + self.forbidden_words = Settings.PWD_POLICY_EXCLUSION_LIST.split(',') + self.complexity_credits = 0 + + @staticmethod + def _check_if_contains_word(password, word): + return re.compile('(?:{0})'.format(word), + flags=re.IGNORECASE).search(password) + + def check_password_complexity(self): + if not Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED: + return Settings.PWD_POLICY_MIN_COMPLEXITY + digit_credit = 1 + small_letter_credit = 1 + big_letter_credit = 2 + special_character_credit = 3 + other_character_credit = 5 + self.complexity_credits = 0 + for ch in self.password: + if ch in ascii_uppercase: + self.complexity_credits += big_letter_credit + elif ch in ascii_lowercase: + self.complexity_credits += small_letter_credit + elif ch in digits: + self.complexity_credits += digit_credit + elif ch in punctuation: + self.complexity_credits += special_character_credit + else: + self.complexity_credits += other_character_credit + return self.complexity_credits + + def check_is_old_password(self): + if not Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED: + return False + return self.old_password and self.password == self.old_password + + def check_if_contains_username(self): + if not Settings.PWD_POLICY_CHECK_USERNAME_ENABLED: + return False + if not self.username: + return False + return self._check_if_contains_word(self.password, self.username) + + def check_if_contains_forbidden_words(self): + if not Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED: + return False + return self._check_if_contains_word(self.password, + '|'.join(self.forbidden_words)) + + def check_if_sequential_characters(self): + if not Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED: + return False + for i in range(1, len(self.password) - 1): + if ord(self.password[i - 1]) + 1 == ord(self.password[i])\ + == ord(self.password[i + 1]) - 1: + return True + return False + + def check_if_repetitive_characters(self): + if not Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED: + return False + for i in range(1, len(self.password) - 1): + if self.password[i - 1] == self.password[i] == self.password[i + 1]: + return True + return False + + def check_password_length(self): + if not Settings.PWD_POLICY_CHECK_LENGTH_ENABLED: + return True + return len(self.password) >= Settings.PWD_POLICY_MIN_LENGTH + + def check_all(self): + """ + Perform all password policy checks. + :raise PasswordPolicyException: If a password policy check fails. + """ + if not Settings.PWD_POLICY_ENABLED: + return + if self.check_password_complexity() < Settings.PWD_POLICY_MIN_COMPLEXITY: + raise PasswordPolicyException('Password is too weak.') + if not self.check_password_length(): + raise PasswordPolicyException('Password is too weak.') + if self.check_is_old_password(): + raise PasswordPolicyException('Password must not be the same as the previous one.') + if self.check_if_contains_username(): + raise PasswordPolicyException('Password must not contain username.') + result = self.check_if_contains_forbidden_words() + if result: + raise PasswordPolicyException('Password must not contain the keyword "{}".'.format( + result.group(0))) + if self.check_if_repetitive_characters(): + raise PasswordPolicyException('Password must not contain repetitive characters.') + if self.check_if_sequential_characters(): + raise PasswordPolicyException('Password must not contain sequential characters.') + + +class Role(object): + def __init__(self, name, description=None, scope_permissions=None): + self.name = name + self.description = description + if scope_permissions is None: + self.scopes_permissions = {} + else: + self.scopes_permissions = scope_permissions + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return self.name == other.name + + def set_scope_permissions(self, scope, permissions): + if not Scope.valid_scope(scope): + raise ScopeNotValid(scope) + for perm in permissions: + if not Permission.valid_permission(perm): + raise PermissionNotValid(perm) + + permissions.sort() + self.scopes_permissions[scope] = permissions + + def del_scope_permissions(self, scope): + if scope not in self.scopes_permissions: + raise ScopeNotInRole(scope, self.name) + del self.scopes_permissions[scope] + + def reset_scope_permissions(self): + self.scopes_permissions = {} + + def authorize(self, scope, permissions): + if scope in self.scopes_permissions: + role_perms = self.scopes_permissions[scope] + for perm in permissions: + if perm not in role_perms: + return False + return True + return False + + def to_dict(self): + return { + 'name': self.name, + 'description': self.description, + 'scopes_permissions': self.scopes_permissions + } + + @classmethod + def from_dict(cls, r_dict): + return Role(r_dict['name'], r_dict['description'], + r_dict['scopes_permissions']) + + +# static pre-defined system roles +# this roles cannot be deleted nor updated + +# admin role provides all permissions for all scopes +ADMIN_ROLE = Role( + 'administrator', 'allows full permissions for all security scopes', { + scope_name: Permission.all_permissions() + for scope_name in Scope.all_scopes() + }) + + +# read-only role provides read-only permission for all scopes +READ_ONLY_ROLE = Role( + 'read-only', + 'allows read permission for all security scope except dashboard settings and config-opt', { + scope_name: [_P.READ] for scope_name in Scope.all_scopes() + if scope_name not in (Scope.DASHBOARD_SETTINGS, Scope.CONFIG_OPT) + }) + + +# block manager role provides all permission for block related scopes +BLOCK_MGR_ROLE = Role( + 'block-manager', 'allows full permissions for rbd-image, rbd-mirroring, and iscsi scopes', { + Scope.RBD_IMAGE: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.POOL: [_P.READ], + Scope.ISCSI: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RBD_MIRRORING: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# RadosGW manager role provides all permissions for block related scopes +RGW_MGR_ROLE = Role( + 'rgw-manager', 'allows full permissions for the rgw scope', { + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# Cluster manager role provides all permission for OSDs, Monitors, and +# Config options +CLUSTER_MGR_ROLE = Role( + 'cluster-manager', """allows full permissions for the hosts, osd, mon, mgr, + and config-opt scopes""", { + Scope.HOSTS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.OSD: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MONITOR: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MANAGER: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CONFIG_OPT: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.LOG: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# Pool manager role provides all permissions for pool related scopes +POOL_MGR_ROLE = Role( + 'pool-manager', 'allows full permissions for the pool scope', { + Scope.POOL: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + +# CephFS manager role provides all permissions for CephFS related scopes +CEPHFS_MGR_ROLE = Role( + 'cephfs-manager', 'allows full permissions for the cephfs scope', { + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + +GANESHA_MGR_ROLE = Role( + 'ganesha-manager', 'allows full permissions for the nfs-ganesha scope', { + Scope.NFS_GANESHA: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +SYSTEM_ROLES = { + ADMIN_ROLE.name: ADMIN_ROLE, + READ_ONLY_ROLE.name: READ_ONLY_ROLE, + BLOCK_MGR_ROLE.name: BLOCK_MGR_ROLE, + RGW_MGR_ROLE.name: RGW_MGR_ROLE, + CLUSTER_MGR_ROLE.name: CLUSTER_MGR_ROLE, + POOL_MGR_ROLE.name: POOL_MGR_ROLE, + CEPHFS_MGR_ROLE.name: CEPHFS_MGR_ROLE, + GANESHA_MGR_ROLE.name: GANESHA_MGR_ROLE, +} + + +class User(object): + def __init__(self, username, password, name=None, email=None, roles=None, + last_update=None, enabled=True, pwd_expiration_date=None, + pwd_update_required=False): + self.username = username + self.password = password + self.name = name + self.email = email + self.invalid_auth_attempt = 0 + if roles is None: + self.roles = set() + else: + self.roles = roles + if last_update is None: + self.refresh_last_update() + else: + self.last_update = last_update + self._enabled = enabled + self.pwd_expiration_date = pwd_expiration_date + if self.pwd_expiration_date is None: + self.refresh_pwd_expiration_date() + self.pwd_update_required = pwd_update_required + + def refresh_last_update(self): + self.last_update = int(time.time()) + + def refresh_pwd_expiration_date(self): + if Settings.USER_PWD_EXPIRATION_SPAN > 0: + expiration_date = datetime.utcnow() + timedelta( + days=Settings.USER_PWD_EXPIRATION_SPAN) + self.pwd_expiration_date = int(time.mktime(expiration_date.timetuple())) + else: + self.pwd_expiration_date = None + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, value): + self._enabled = value + self.refresh_last_update() + + def set_password(self, password): + self.set_password_hash(password_hash(password)) + + def set_password_hash(self, hashed_password): + self.invalid_auth_attempt = 0 + self.password = hashed_password + self.refresh_last_update() + self.refresh_pwd_expiration_date() + self.pwd_update_required = False + + def compare_password(self, password): + """ + Compare the specified password with the user password. + :param password: The plain password to check. + :type password: str + :return: `True` if the passwords are equal, otherwise `False`. + :rtype: bool + """ + pass_hash = password_hash(password, salt_password=self.password) + return pass_hash == self.password + + def is_pwd_expired(self): + if self.pwd_expiration_date: + current_time = int(time.mktime(datetime.utcnow().timetuple())) + return self.pwd_expiration_date < current_time + return False + + def set_roles(self, roles): + self.roles = set(roles) + self.refresh_last_update() + + def add_roles(self, roles): + self.roles = self.roles.union(set(roles)) + self.refresh_last_update() + + def del_roles(self, roles): + for role in roles: + if role not in self.roles: + raise RoleNotInUser(role.name, self.username) + self.roles.difference_update(set(roles)) + self.refresh_last_update() + + def authorize(self, scope, permissions): + if self.pwd_update_required: + return False + + for role in self.roles: + if role.authorize(scope, permissions): + return True + return False + + def permissions_dict(self): + # type: () -> dict + perms = {} # type: dict + for role in self.roles: + for scope, perms_list in role.scopes_permissions.items(): + if scope in perms: + perms_tmp = set(perms[scope]).union(set(perms_list)) + perms[scope] = list(perms_tmp) + else: + perms[scope] = perms_list + + return perms + + def to_dict(self): + return { + 'username': self.username, + 'password': self.password, + 'roles': sorted([r.name for r in self.roles]), + 'name': self.name, + 'email': self.email, + 'lastUpdate': self.last_update, + 'enabled': self.enabled, + 'pwdExpirationDate': self.pwd_expiration_date, + 'pwdUpdateRequired': self.pwd_update_required + } + + @classmethod + def from_dict(cls, u_dict, roles): + return User(u_dict['username'], u_dict['password'], u_dict['name'], + u_dict['email'], {roles[r] for r in u_dict['roles']}, + u_dict['lastUpdate'], u_dict['enabled'], + u_dict['pwdExpirationDate'], u_dict['pwdUpdateRequired']) + + +class AccessControlDB(object): + VERSION = 2 + ACDB_CONFIG_KEY = "accessdb_v" + + def __init__(self, version, users, roles): + self.users = users + self.version = version + self.roles = roles + self.lock = threading.RLock() + + def create_role(self, name, description=None): + with self.lock: + if name in SYSTEM_ROLES or name in self.roles: + raise RoleAlreadyExists(name) + role = Role(name, description) + self.roles[name] = role + return role + + def get_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + return self.roles[name] + + def increment_attempt(self, username): + with self.lock: + if username in self.users: + self.users[username].invalid_auth_attempt += 1 + + def reset_attempt(self, username): + with self.lock: + if username in self.users: + self.users[username].invalid_auth_attempt = 0 + + def get_attempt(self, username): + with self.lock: + try: + return self.users[username].invalid_auth_attempt + except KeyError: + return 0 + + def delete_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + role = self.roles[name] + + # check if role is not associated with a user + for username, user in self.users.items(): + if role in user.roles: + raise RoleIsAssociatedWithUser(name, username) + + del self.roles[name] + + def create_user(self, username, password, name, email, enabled=True, + pwd_expiration_date=None, pwd_update_required=False): + logger.debug("creating user: username=%s", username) + with self.lock: + if username in self.users: + raise UserAlreadyExists(username) + if pwd_expiration_date and \ + (pwd_expiration_date < int(time.mktime(datetime.utcnow().timetuple()))): + raise PwdExpirationDateNotValid() + user = User(username, password_hash(password), name, email, enabled=enabled, + pwd_expiration_date=pwd_expiration_date, + pwd_update_required=pwd_update_required) + self.users[username] = user + return user + + def get_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + return self.users[username] + + def delete_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + del self.users[username] + + def update_users_with_roles(self, role): + with self.lock: + if not role: + return + for _, user in self.users.items(): + if role in user.roles: + user.refresh_last_update() + + def save(self): + with self.lock: + db = { + 'users': {un: u.to_dict() for un, u in self.users.items()}, + 'roles': {rn: r.to_dict() for rn, r in self.roles.items()}, + 'version': self.version + } + mgr.set_store(self.accessdb_config_key(), json.dumps(db)) + + @classmethod + def accessdb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.ACDB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("Checking for previous DB versions") + + def check_migrate_v1_to_current(): + # Check if version 1 exists in the DB and migrate it to current version + v1_db = mgr.get_store(self.accessdb_config_key(1)) + if v1_db: + logger.debug("Found database v1 credentials") + v1_db = json.loads(v1_db) + + for user, _ in v1_db['users'].items(): + v1_db['users'][user]['enabled'] = True + v1_db['users'][user]['pwdExpirationDate'] = None + v1_db['users'][user]['pwdUpdateRequired'] = False + + self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()} + self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES)) + for un, u in v1_db.get('users', {}).items()} + + self.save() + + check_migrate_v1_to_current() + + @classmethod + def load(cls): + logger.info("Loading user roles DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.accessdb_config_key()) + if json_db is None: + logger.debug("No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, {}, {}) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + dict_db = json.loads(json_db) + roles = {rn: Role.from_dict(r) + for rn, r in dict_db.get('roles', {}).items()} + users = {un: User.from_dict(u, dict(roles, **SYSTEM_ROLES)) + for un, u in dict_db.get('users', {}).items()} + return cls(dict_db['version'], users, roles) + + +def load_access_control_db(): + mgr.ACCESS_CTRL_DB = AccessControlDB.load() # type: ignore + + +# CLI dashboard access control scope commands + +@CLIWriteCommand('dashboard set-login-credentials') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def set_login_credentials_cmd(_, username: str, inbuf: str): + ''' + Set the login credentials. Password read from -i <file> + ''' + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password(password) + except UserDoesNotExist: + user = mgr.ACCESS_CTRL_DB.create_user(username, password, None, None) + user.set_roles([ADMIN_ROLE]) + + mgr.ACCESS_CTRL_DB.save() + + return 0, '''\ +****************************************************************** +*** WARNING: this command is deprecated. *** +*** Please use the ac-user-* related commands to manage users. *** +****************************************************************** +Username and password updated''', '' + + +@CLIReadCommand('dashboard ac-role-show') +def ac_role_show_cmd(_, rolename: Optional[str] = None): + ''' + Show role info + ''' + if not rolename: + roles = dict(mgr.ACCESS_CTRL_DB.roles) + roles.update(SYSTEM_ROLES) + roles_list = [name for name, _ in roles.items()] + return 0, json.dumps(roles_list), '' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + return 0, json.dumps(role.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-role-create') +def ac_role_create_cmd(_, rolename: str, description: Optional[str] = None): + ''' + Create a new access control role + ''' + try: + role = mgr.ACCESS_CTRL_DB.create_role(rolename, description) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-delete') +def ac_role_delete_cmd(_, rolename: str): + ''' + Delete an access control role + ''' + try: + mgr.ACCESS_CTRL_DB.delete_role(rolename) + mgr.ACCESS_CTRL_DB.save() + return 0, "Role '{}' deleted".format(rolename), "" + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot delete system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except RoleIsAssociatedWithUser as ex: + return -errno.EPERM, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-add-scope-perms') +def ac_role_add_scope_perms_cmd(_, + rolename: str, + scopename: str, + permissions: Sequence[str]): + ''' + Add the scope permissions for a role + ''' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + perms_array = [perm.strip() for perm in permissions] + role.set_scope_permissions(scopename, perms_array) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotValid as ex: + return -errno.EINVAL, '', str(ex) + "\n Possible values: {}" \ + .format(Scope.all_scopes()) + except PermissionNotValid as ex: + return -errno.EINVAL, '', str(ex) + \ + "\n Possible values: {}" \ + .format(Permission.all_permissions()) + + +@CLIWriteCommand('dashboard ac-role-del-scope-perms') +def ac_role_del_scope_perms_cmd(_, rolename: str, scopename: str): + ''' + Delete the scope permissions for a role + ''' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + role.del_scope_permissions(scopename) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotInRole as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIReadCommand('dashboard ac-user-show') +def ac_user_show_cmd(_, username: Optional[str] = None): + ''' + Show user info + ''' + if not username: + users = mgr.ACCESS_CTRL_DB.users + users_list = [name for name, _ in users.items()] + return 0, json.dumps(users_list), '' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-create') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_create_cmd(_, username: str, inbuf: str, + rolename: Optional[str] = None, + name: Optional[str] = None, + email: Optional[str] = None, + enabled: bool = True, + force_password: bool = False, + pwd_expiration_date: Optional[int] = None, + pwd_update_required: bool = False): + ''' + Create a user. Password read from -i <file> + ''' + password = inbuf + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + + try: + if not force_password: + pw_check = PasswordPolicy(password, username) + pw_check.check_all() + user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email, + enabled, pwd_expiration_date, + pwd_update_required) + except PasswordPolicyException as ex: + return -errno.EINVAL, '', str(ex) + except UserAlreadyExists as ex: + return 0, str(ex), '' + + if role: + user.set_roles([role]) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-user-enable') +def ac_user_enable(_, username: str): + ''' + Enable a user + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.enabled = True + mgr.ACCESS_CTRL_DB.reset_attempt(username) + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-disable') +def ac_user_disable(_, username: str): + ''' + Disable a user + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.enabled = False + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-delete') +def ac_user_delete_cmd(_, username: str): + ''' + Delete user + ''' + try: + mgr.ACCESS_CTRL_DB.delete_user(username) + mgr.ACCESS_CTRL_DB.save() + return 0, "User '{}' deleted".format(username), "" + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-roles') +def ac_user_set_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Set user roles + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-add-roles') +def ac_user_add_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Add roles to user + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.add_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-del-roles') +def ac_user_del_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Delete roles from user + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.del_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except RoleNotInUser as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-password') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_set_password(_, username: str, inbuf: str, + force_password: bool = False): + ''' + Set user password from -i <file> + ''' + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if not force_password: + pw_check = PasswordPolicy(password, user.name) + pw_check.check_all() + user.set_password(password) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except PasswordPolicyException as ex: + return -errno.EINVAL, '', str(ex) + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-password-hash') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_set_password_hash(_, username: str, inbuf: str): + ''' + Set user password bcrypt hash from -i <file> + ''' + hashed_password = inbuf + try: + # make sure the hashed_password is actually a bcrypt hash + bcrypt.checkpw(b'', hashed_password.encode('utf-8')) + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password_hash(hashed_password) + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except ValueError: + return -errno.EINVAL, '', 'Invalid password hash' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-info') +def ac_user_set_info(_, username: str, name: str, email: str): + ''' + Set user info + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if name: + user.name = name + if email: + user.email = email + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +class LocalAuthenticator(object): + def __init__(self): + load_access_control_db() + + def get_user(self, username): + return mgr.ACCESS_CTRL_DB.get_user(username) + + def authenticate(self, username, password): + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if user.password: + if user.enabled and user.compare_password(password) \ + and not user.is_pwd_expired(): + return {'permissions': user.permissions_dict(), + 'pwdExpirationDate': user.pwd_expiration_date, + 'pwdUpdateRequired': user.pwd_update_required} + except UserDoesNotExist: + logger.debug("User '%s' does not exist", username) + return None + + def authorize(self, username, scope, permissions): + user = mgr.ACCESS_CTRL_DB.get_user(username) + return user.authorize(scope, permissions) diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py new file mode 100644 index 000000000..f13963abf --- /dev/null +++ b/src/pybind/mgr/dashboard/services/auth.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- + +import json +import logging +import os +import threading +import time +import uuid +from base64 import b64encode + +import cherrypy +import jwt + +from .. import mgr +from .access_control import LocalAuthenticator, UserDoesNotExist + +cherrypy.config.update({ + 'response.headers.server': 'Ceph-Dashboard', + 'response.headers.content-security-policy': "frame-ancestors 'self';", + 'response.headers.x-content-type-options': 'nosniff', + 'response.headers.strict-transport-security': 'max-age=63072000; includeSubDomains; preload' +}) + + +class JwtManager(object): + JWT_TOKEN_BLOCKLIST_KEY = "jwt_token_block_list" + JWT_TOKEN_TTL = 28800 # default 8 hours + JWT_ALGORITHM = 'HS256' + _secret = None + + LOCAL_USER = threading.local() + + @staticmethod + def _gen_secret(): + secret = os.urandom(16) + return b64encode(secret).decode('utf-8') + + @classmethod + def init(cls): + cls.logger = logging.getLogger('jwt') # type: ignore + # generate a new secret if it does not exist + secret = mgr.get_store('jwt_secret') + if secret is None: + secret = cls._gen_secret() + mgr.set_store('jwt_secret', secret) + cls._secret = secret + + @classmethod + def gen_token(cls, username): + if not cls._secret: + cls.init() + ttl = mgr.get_module_option('jwt_token_ttl', cls.JWT_TOKEN_TTL) + ttl = int(ttl) + now = int(time.time()) + payload = { + 'iss': 'ceph-dashboard', + 'jti': str(uuid.uuid4()), + 'exp': now + ttl, + 'iat': now, + 'username': username + } + return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) # type: ignore + + @classmethod + def decode_token(cls, token): + if not cls._secret: + cls.init() + return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) # type: ignore + + @classmethod + def get_token_from_header(cls): + auth_cookie_name = 'token' + try: + # use cookie + return cherrypy.request.cookie[auth_cookie_name].value + except KeyError: + try: + # fall-back: use Authorization header + auth_header = cherrypy.request.headers.get('authorization') + if auth_header is not None: + scheme, params = auth_header.split(' ', 1) + if scheme.lower() == 'bearer': + return params + except IndexError: + return None + + @classmethod + def set_user(cls, username): + cls.LOCAL_USER.username = username + + @classmethod + def reset_user(cls): + cls.set_user(None) + + @classmethod + def get_username(cls): + return getattr(cls.LOCAL_USER, 'username', None) + + @classmethod + def get_user(cls, token): + try: + dtoken = JwtManager.decode_token(token) + if not JwtManager.is_blocklisted(dtoken['jti']): + user = AuthManager.get_user(dtoken['username']) + if user.last_update <= dtoken['iat']: + return user + cls.logger.debug( # type: ignore + "user info changed after token was issued, iat=%s last_update=%s", + dtoken['iat'], user.last_update + ) + else: + cls.logger.debug('Token is block-listed') # type: ignore + except jwt.ExpiredSignatureError: + cls.logger.debug("Token has expired") # type: ignore + except jwt.InvalidTokenError: + cls.logger.debug("Failed to decode token") # type: ignore + except UserDoesNotExist: + cls.logger.debug( # type: ignore + "Invalid token: user %s does not exist", dtoken['username'] + ) + return None + + @classmethod + def blocklist_token(cls, token): + token = cls.decode_token(token) + blocklist_json = mgr.get_store(cls.JWT_TOKEN_BLOCKLIST_KEY) + if not blocklist_json: + blocklist_json = "{}" + bl_dict = json.loads(blocklist_json) + now = time.time() + + # remove expired tokens + to_delete = [] + for jti, exp in bl_dict.items(): + if exp < now: + to_delete.append(jti) + for jti in to_delete: + del bl_dict[jti] + + bl_dict[token['jti']] = token['exp'] + mgr.set_store(cls.JWT_TOKEN_BLOCKLIST_KEY, json.dumps(bl_dict)) + + @classmethod + def is_blocklisted(cls, jti): + blocklist_json = mgr.get_store(cls.JWT_TOKEN_BLOCKLIST_KEY) + if not blocklist_json: + blocklist_json = "{}" + bl_dict = json.loads(blocklist_json) + return jti in bl_dict + + +class AuthManager(object): + AUTH_PROVIDER = None + + @classmethod + def initialize(cls): + cls.AUTH_PROVIDER = LocalAuthenticator() + + @classmethod + def get_user(cls, username): + return cls.AUTH_PROVIDER.get_user(username) # type: ignore + + @classmethod + def authenticate(cls, username, password): + return cls.AUTH_PROVIDER.authenticate(username, password) # type: ignore + + @classmethod + def authorize(cls, username, scope, permissions): + return cls.AUTH_PROVIDER.authorize(username, scope, permissions) # type: ignore + + +class AuthManagerTool(cherrypy.Tool): + def __init__(self): + super(AuthManagerTool, self).__init__( + 'before_handler', self._check_authentication, priority=20) + self.logger = logging.getLogger('auth') + + def _check_authentication(self): + JwtManager.reset_user() + token = JwtManager.get_token_from_header() + if token: + user = JwtManager.get_user(token) + if user: + self._check_authorization(user.username) + return + + resp_head = cherrypy.response.headers + req_head = cherrypy.request.headers + req_header_cross_origin_url = req_head.get('Access-Control-Allow-Origin') + cross_origin_urls = mgr.get_module_option('cross_origin_url', '') + cross_origin_url_list = [url.strip() for url in cross_origin_urls.split(',')] + + if req_header_cross_origin_url in cross_origin_url_list: + resp_head['Access-Control-Allow-Origin'] = req_header_cross_origin_url + + self.logger.debug('Unauthorized access to %s', + cherrypy.url(relative='server')) + raise cherrypy.HTTPError(401, 'You are not authorized to access ' + 'that resource') + + def _check_authorization(self, username): + self.logger.debug("checking authorization...") + handler = cherrypy.request.handler.callable + controller = handler.__self__ + sec_scope = getattr(controller, '_security_scope', None) + sec_perms = getattr(handler, '_security_permissions', None) + JwtManager.set_user(username) + + if not sec_scope: + # controller does not define any authorization restrictions + return + + self.logger.debug("checking '%s' access to '%s' scope", sec_perms, + sec_scope) + + if not sec_perms: + self.logger.debug("Fail to check permission on: %s:%s", controller, + handler) + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") + + if not AuthManager.authorize(username, sec_scope, sec_perms): + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") diff --git a/src/pybind/mgr/dashboard/services/ceph_service.py b/src/pybind/mgr/dashboard/services/ceph_service.py new file mode 100644 index 000000000..53cd0e7ad --- /dev/null +++ b/src/pybind/mgr/dashboard/services/ceph_service.py @@ -0,0 +1,571 @@ +# -*- coding: utf-8 -*- + +import json +import logging + +import rados +from mgr_module import CommandResult +from mgr_util import get_most_recent_rate, get_time_series_rates, name_to_config_section + +from .. import mgr + +try: + from typing import Any, Dict, Optional, Union +except ImportError: + pass # For typing only + +logger = logging.getLogger('ceph_service') + + +class SendCommandError(rados.Error): + def __init__(self, err, prefix, argdict, errno): + self.prefix = prefix + self.argdict = argdict + super(SendCommandError, self).__init__(err, errno) + + +# pylint: disable=too-many-public-methods +class CephService(object): + + OSD_FLAG_NO_SCRUB = 'noscrub' + OSD_FLAG_NO_DEEP_SCRUB = 'nodeep-scrub' + + PG_STATUS_SCRUBBING = 'scrubbing' + PG_STATUS_DEEP_SCRUBBING = 'deep' + + SCRUB_STATUS_DISABLED = 'Disabled' + SCRUB_STATUS_ACTIVE = 'Active' + SCRUB_STATUS_INACTIVE = 'Inactive' + + @classmethod + def get_service_map(cls, service_name): + service_map = {} # type: Dict[str, dict] + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_name: + if server['hostname'] not in service_map: + service_map[server['hostname']] = { + 'server': server, + 'services': [] + } + inst_id = service['id'] + metadata = mgr.get_metadata(service_name, inst_id) + status = mgr.get_daemon_status(service_name, inst_id) + service_map[server['hostname']]['services'].append({ + 'id': inst_id, + 'type': service_name, + 'hostname': server['hostname'], + 'metadata': metadata, + 'status': status + }) + return service_map + + @classmethod + def get_service_list(cls, service_name): + service_map = cls.get_service_map(service_name) + return [svc for _, svcs in service_map.items() for svc in svcs['services']] + + @classmethod + def get_service_data_by_metadata_id(cls, + service_type: str, + metadata_id: str) -> Optional[Dict[str, Any]]: + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_type: + metadata = mgr.get_metadata(service_type, service['id']) + if metadata_id == metadata['id']: + return { + 'id': metadata['id'], + 'service_map_id': str(service['id']), + 'type': service_type, + 'hostname': server['hostname'], + 'metadata': metadata + } + return None + + @classmethod + def get_service(cls, service_type: str, metadata_id: str) -> Optional[Dict[str, Any]]: + svc_data = cls.get_service_data_by_metadata_id(service_type, metadata_id) + if svc_data: + svc_data['status'] = mgr.get_daemon_status(svc_data['type'], svc_data['service_map_id']) + return svc_data + + @classmethod + def get_service_perf_counters(cls, service_type: str, service_id: str) -> Dict[str, Any]: + schema_dict = mgr.get_perf_schema(service_type, service_id) + schema = schema_dict["{}.{}".format(service_type, service_id)] + counters = [] + for key, value in sorted(schema.items()): + counter = {'name': str(key), 'description': value['description']} + # pylint: disable=W0212 + if mgr._stattype_to_str(value['type']) == 'counter': + counter['value'] = cls.get_rate( + service_type, service_id, key) + counter['unit'] = mgr._unit_to_str(value['units']) + else: + counter['value'] = mgr.get_latest( + service_type, service_id, key) + counter['unit'] = '' + counters.append(counter) + + return { + 'service': { + 'type': service_type, + 'id': str(service_id) + }, + 'counters': counters + } + + @classmethod + def get_pool_list(cls, application=None): + osd_map = mgr.get('osd_map') + if not application: + return osd_map['pools'] + return [pool for pool in osd_map['pools'] + if application in pool.get('application_metadata', {})] + + @classmethod + def get_pool_list_with_stats(cls, application=None): + # pylint: disable=too-many-locals + pools = cls.get_pool_list(application) + + pools_w_stats = [] + + pg_summary = mgr.get("pg_summary") + pool_stats = mgr.get_updated_pool_stats() + + for pool in pools: + pool['pg_status'] = pg_summary['by_pool'][pool['pool'].__str__()] + stats = pool_stats[pool['pool']] + s = {} + + for stat_name, stat_series in stats.items(): + rates = get_time_series_rates(stat_series) + s[stat_name] = { + 'latest': stat_series[0][1], + 'rate': get_most_recent_rate(rates), + 'rates': rates + } + pool['stats'] = s + pools_w_stats.append(pool) + return pools_w_stats + + @classmethod + def get_erasure_code_profiles(cls): + def _serialize_ecp(name, ecp): + def serialize_numbers(key): + value = ecp.get(key) + if value is not None: + ecp[key] = int(value) + + ecp['name'] = name + serialize_numbers('k') + serialize_numbers('m') + return ecp + + ret = [] + for name, ecp in mgr.get('osd_map').get('erasure_code_profiles', {}).items(): + ret.append(_serialize_ecp(name, ecp)) + return ret + + @classmethod + def get_pool_name_from_id(cls, pool_id): + # type: (int) -> Union[str, None] + return mgr.rados.pool_reverse_lookup(pool_id) + + @classmethod + def get_pool_by_attribute(cls, attribute, value): + # type: (str, Any) -> Union[dict, None] + pool_list = cls.get_pool_list() + for pool in pool_list: + if attribute in pool and pool[attribute] == value: + return pool + return None + + @classmethod + def get_encryption_config(cls, daemon_name): + kms_vault_configured = False + s3_vault_configured = False + kms_backend: str = '' + sse_s3_backend: str = '' + vault_stats = [] + full_daemon_name = 'rgw.' + daemon_name + + kms_backend = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), + key='rgw_crypt_s3_kms_backend') + sse_s3_backend = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), + key='rgw_crypt_sse_s3_backend') + + if kms_backend.strip() == 'vault': + kms_vault_auth: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_vault_auth') + kms_vault_engine: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_vault_secret_engine') + kms_vault_address: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_vault_addr') + kms_vault_token: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_vault_token_file') # noqa E501 #pylint: disable=line-too-long + if (kms_vault_auth.strip() != "" and kms_vault_engine.strip() != "" and kms_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long + if(kms_vault_auth == 'token' and kms_vault_token.strip() == ""): + kms_vault_configured = False + else: + kms_vault_configured = True + + if sse_s3_backend.strip() == 'vault': + s3_vault_auth: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_sse_s3_vault_auth') + s3_vault_engine: str = CephService.send_command('mon', + 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_sse_s3_vault_secret_engine') # noqa E501 #pylint: disable=line-too-long + s3_vault_address: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_sse_s3_vault_addr') + s3_vault_token: str = CephService.send_command('mon', 'config get', + who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long + key='rgw_crypt_sse_s3_vault_token_file') # noqa E501 #pylint: disable=line-too-long + + if (s3_vault_auth.strip() != "" and s3_vault_engine.strip() != "" and s3_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long + if(s3_vault_auth == 'token' and s3_vault_token.strip() == ""): + s3_vault_configured = False + else: + s3_vault_configured = True + + vault_stats.append(kms_vault_configured) + vault_stats.append(s3_vault_configured) + return vault_stats + + @classmethod + def set_encryption_config(cls, encryption_type, kms_provider, auth_method, + secret_engine, secret_path, namespace, address, + token, daemon_name, ssl_cert, client_cert, client_key): + full_daemon_name = 'rgw.' + daemon_name + if encryption_type == 'aws:kms': + + KMS_CONFIG = [ + ['rgw_crypt_s3_kms_backend', kms_provider], + ['rgw_crypt_vault_auth', auth_method], + ['rgw_crypt_vault_prefix', secret_path], + ['rgw_crypt_vault_namespace', namespace], + ['rgw_crypt_vault_secret_engine', secret_engine], + ['rgw_crypt_vault_addr', address], + ['rgw_crypt_vault_token_file', token], + ['rgw_crypt_vault_ssl_cacert', ssl_cert], + ['rgw_crypt_vault_ssl_clientcert', client_cert], + ['rgw_crypt_vault_ssl_clientkey', client_key] + ] + + for (key, value) in KMS_CONFIG: + if value == 'null': + continue + CephService.send_command('mon', 'config set', + who=name_to_config_section(full_daemon_name), + name=key, value=value) + + if encryption_type == 'AES256': + + SSE_S3_CONFIG = [ + ['rgw_crypt_sse_s3_backend', kms_provider], + ['rgw_crypt_sse_s3_vault_auth', auth_method], + ['rgw_crypt_sse_s3_vault_prefix', secret_path], + ['rgw_crypt_sse_s3_vault_namespace', namespace], + ['rgw_crypt_sse_s3_vault_secret_engine', secret_engine], + ['rgw_crypt_sse_s3_vault_addr', address], + ['rgw_crypt_sse_s3_vault_token_file', token], + ['rgw_crypt_sse_s3_vault_ssl_cacert', ssl_cert], + ['rgw_crypt_sse_s3_vault_ssl_clientcert', client_cert], + ['rgw_crypt_sse_s3_vault_ssl_clientkey', client_key] + ] + + for (key, value) in SSE_S3_CONFIG: + if value == 'null': + continue + CephService.send_command('mon', 'config set', + who=name_to_config_section(full_daemon_name), + name=key, value=value) + + return {} + + @classmethod + def set_multisite_config(cls, realm_name, zonegroup_name, zone_name, daemon_name): + full_daemon_name = 'rgw.' + daemon_name + + KMS_CONFIG = [ + ['rgw_realm', realm_name], + ['rgw_zonegroup', zonegroup_name], + ['rgw_zone', zone_name] + ] + + for (key, value) in KMS_CONFIG: + if value == 'null': + continue + CephService.send_command('mon', 'config set', + who=name_to_config_section(full_daemon_name), + name=key, value=value) + return {} + + @classmethod + def get_realm_tokens(cls): + tokens_info = mgr.remote('rgw', 'get_realm_tokens') + return tokens_info + + @classmethod + def import_realm_token(cls, realm_token, zone_name, port, placement_spec): + tokens_info = mgr.remote('rgw', 'import_realm_token', zone_name=zone_name, + realm_token=realm_token, port=port, placement=placement_spec, + start_radosgw=True) + return tokens_info + + @classmethod + def get_pool_pg_status(cls, pool_name): + # type: (str) -> dict + pool = cls.get_pool_by_attribute('pool_name', pool_name) + if pool is None: + return {} + return mgr.get("pg_summary")['by_pool'][pool['pool'].__str__()] + + @staticmethod + def send_command(srv_type, prefix, srv_spec='', to_json=True, inbuf='', **kwargs): + # type: (str, str, Optional[str], bool, str, Any) -> Any + """ + :type prefix: str + :param srv_type: mon | + :param kwargs: will be added to argdict + :param srv_spec: typically empty. or something like "<fs_id>:0" + :param to_json: if true return as json format + + :raises PermissionError: See rados.make_ex + :raises ObjectNotFound: See rados.make_ex + :raises IOError: See rados.make_ex + :raises NoSpace: See rados.make_ex + :raises ObjectExists: See rados.make_ex + :raises ObjectBusy: See rados.make_ex + :raises NoData: See rados.make_ex + :raises InterruptedOrTimeoutError: See rados.make_ex + :raises TimedOut: See rados.make_ex + :raises ValueError: return code != 0 + """ + argdict = { + "prefix": prefix, + } + if to_json: + argdict["format"] = "json" + argdict.update({k: v for k, v in kwargs.items() if v is not None}) + result = CommandResult("") + mgr.send_command(result, srv_type, srv_spec, json.dumps(argdict), "", inbuf=inbuf) + r, outb, outs = result.wait() + if r != 0: + logger.error("send_command '%s' failed. (r=%s, outs=\"%s\", kwargs=%s)", prefix, r, + outs, kwargs) + + raise SendCommandError(outs, prefix, argdict, r) + + try: + return json.loads(outb or outs) + except Exception: # pylint: disable=broad-except + return outb + + @staticmethod + def _get_smart_data_by_device(device): + # type: (dict) -> Dict[str, dict] + # Check whether the device is associated with daemons. + if 'daemons' in device and device['daemons']: + dev_smart_data: Dict[str, Any] = {} + + # Get a list of all OSD daemons on all hosts that are 'up' + # because SMART data can not be retrieved from daemons that + # are 'down' or 'destroyed'. + osd_tree = CephService.send_command('mon', 'osd tree') + osd_daemons_up = [ + node['name'] for node in osd_tree.get('nodes', {}) + if node.get('status') == 'up' + ] + + # All daemons on the same host can deliver SMART data, + # thus it is not relevant for us which daemon we are using. + # NOTE: the list may contain daemons that are 'down' or 'destroyed'. + for daemon in device['daemons']: + svc_type, svc_id = daemon.split('.', 1) + if 'osd' in svc_type: + if daemon not in osd_daemons_up: + continue + try: + dev_smart_data = CephService.send_command( + svc_type, 'smart', svc_id, devid=device['devid']) + except SendCommandError as error: + logger.warning(str(error)) + # Try to retrieve SMART data from another daemon. + continue + elif 'mon' in svc_type: + try: + dev_smart_data = CephService.send_command( + svc_type, 'device query-daemon-health-metrics', who=daemon) + except SendCommandError as error: + logger.warning(str(error)) + # Try to retrieve SMART data from another daemon. + continue + else: + dev_smart_data = {} + + CephService.log_dev_data_error(dev_smart_data) + + break + + return dev_smart_data + logger.warning('[SMART] No daemons associated with device ID "%s"', + device['devid']) + return {} + + @staticmethod + def log_dev_data_error(dev_smart_data): + for dev_id, dev_data in dev_smart_data.items(): + if 'error' in dev_data: + logger.warning( + '[SMART] Error retrieving smartctl data for device ID "%s": %s', + dev_id, dev_data) + + @staticmethod + def get_devices_by_host(hostname): + # type: (str) -> dict + return CephService.send_command('mon', + 'device ls-by-host', + host=hostname) + + @staticmethod + def get_devices_by_daemon(daemon_type, daemon_id): + # type: (str, str) -> dict + return CephService.send_command('mon', + 'device ls-by-daemon', + who='{}.{}'.format( + daemon_type, daemon_id)) + + @staticmethod + def get_smart_data_by_host(hostname): + # type: (str) -> dict + """ + Get the SMART data of all devices on the given host, regardless + of the daemon (osd, mon, ...). + :param hostname: The name of the host. + :return: A dictionary containing the SMART data of every device + on the given host. The device name is used as the key in the + dictionary. + """ + devices = CephService.get_devices_by_host(hostname) + smart_data = {} # type: dict + if devices: + for device in devices: + if device['devid'] not in smart_data: + smart_data.update( + CephService._get_smart_data_by_device(device)) + else: + logger.debug('[SMART] could not retrieve device list from host %s', hostname) + return smart_data + + @staticmethod + def get_smart_data_by_daemon(daemon_type, daemon_id): + # type: (str, str) -> Dict[str, dict] + """ + Get the SMART data of the devices associated with the given daemon. + :param daemon_type: The daemon type, e.g. 'osd' or 'mon'. + :param daemon_id: The daemon identifier. + :return: A dictionary containing the SMART data of every device + associated with the given daemon. The device name is used as the + key in the dictionary. + """ + devices = CephService.get_devices_by_daemon(daemon_type, daemon_id) + smart_data = {} # type: Dict[str, dict] + if devices: + for device in devices: + if device['devid'] not in smart_data: + smart_data.update( + CephService._get_smart_data_by_device(device)) + else: + msg = '[SMART] could not retrieve device list from daemon with type %s and ' +\ + 'with ID %s' + logger.debug(msg, daemon_type, daemon_id) + return smart_data + + @classmethod + def get_rates(cls, svc_type, svc_name, path): + """ + :return: the derivative of mgr.get_counter() + :rtype: list[tuple[int, float]]""" + data = mgr.get_counter(svc_type, svc_name, path)[path] + return get_time_series_rates(data) + + @classmethod + def get_rate(cls, svc_type, svc_name, path): + """returns most recent rate""" + return get_most_recent_rate(cls.get_rates(svc_type, svc_name, path)) + + @classmethod + def get_client_perf(cls): + pools_stats = mgr.get('osd_pool_stats')['pool_stats'] + + io_stats = { + 'read_bytes_sec': 0, + 'read_op_per_sec': 0, + 'write_bytes_sec': 0, + 'write_op_per_sec': 0, + } + recovery_stats = {'recovering_bytes_per_sec': 0} + + for pool_stats in pools_stats: + client_io = pool_stats['client_io_rate'] + for stat in list(io_stats.keys()): + if stat in client_io: + io_stats[stat] += client_io[stat] + + client_recovery = pool_stats['recovery_rate'] + for stat in list(recovery_stats.keys()): + if stat in client_recovery: + recovery_stats[stat] += client_recovery[stat] + + client_perf = io_stats.copy() + client_perf.update(recovery_stats) + + return client_perf + + @classmethod + def get_scrub_status(cls): + enabled_flags = mgr.get('osd_map')['flags_set'] + if cls.OSD_FLAG_NO_SCRUB in enabled_flags or cls.OSD_FLAG_NO_DEEP_SCRUB in enabled_flags: + return cls.SCRUB_STATUS_DISABLED + + grouped_pg_statuses = mgr.get('pg_summary')['all'] + for grouped_pg_status in grouped_pg_statuses.keys(): + if len(grouped_pg_status.split(cls.PG_STATUS_SCRUBBING)) > 1 \ + or len(grouped_pg_status.split(cls.PG_STATUS_DEEP_SCRUBBING)) > 1: + return cls.SCRUB_STATUS_ACTIVE + + return cls.SCRUB_STATUS_INACTIVE + + @classmethod + def get_pg_info(cls): + pg_summary = mgr.get('pg_summary') + object_stats = {stat: pg_summary['pg_stats_sum']['stat_sum'][stat] for stat in [ + 'num_objects', 'num_object_copies', 'num_objects_degraded', + 'num_objects_misplaced', 'num_objects_unfound']} + + pgs_per_osd = 0.0 + total_osds = len(pg_summary['by_osd']) + if total_osds > 0: + total_pgs = 0.0 + for _, osd_pg_statuses in pg_summary['by_osd'].items(): + for _, pg_amount in osd_pg_statuses.items(): + total_pgs += pg_amount + + pgs_per_osd = total_pgs / total_osds + + return { + 'object_stats': object_stats, + 'statuses': pg_summary['all'], + 'pgs_per_osd': pgs_per_osd, + } diff --git a/src/pybind/mgr/dashboard/services/cephfs.py b/src/pybind/mgr/dashboard/services/cephfs.py new file mode 100644 index 000000000..8e9a07365 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cephfs.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- + +import datetime +import logging +import os +from contextlib import contextmanager + +import cephfs + +from .. import mgr + +logger = logging.getLogger('cephfs') + + +class CephFS(object): + @classmethod + def list_filesystems(cls): + fsmap = mgr.get("fs_map") + return [{'id': fs['id'], 'name': fs['mdsmap']['fs_name']} + for fs in fsmap['filesystems']] + + @classmethod + def fs_name_from_id(cls, fs_id): + """ + Get the filesystem name from ID. + :param fs_id: The filesystem ID. + :type fs_id: int | str + :return: The filesystem name or None. + :rtype: str | None + """ + fs_map = mgr.get("fs_map") + fs_info = list(filter(lambda x: str(x['id']) == str(fs_id), + fs_map['filesystems'])) + if not fs_info: + return None + return fs_info[0]['mdsmap']['fs_name'] + + def __init__(self, fs_name=None): + logger.debug("initializing cephfs connection") + self.cfs = cephfs.LibCephFS(rados_inst=mgr.rados) + logger.debug("mounting cephfs filesystem: %s", fs_name) + if fs_name: + self.cfs.mount(filesystem_name=fs_name) + else: + self.cfs.mount() + logger.debug("mounted cephfs filesystem") + + def __del__(self): + logger.debug("shutting down cephfs filesystem") + self.cfs.shutdown() + + @contextmanager + def opendir(self, dirpath): + d = None + try: + d = self.cfs.opendir(dirpath) + yield d + finally: + if d: + self.cfs.closedir(d) + + def ls_dir(self, path, depth): + """ + List directories of specified path with additional information. + :param path: The root directory path. + :type path: str | bytes + :param depth: The number of steps to go down the directory tree. + :type depth: int | str + :return: A list of directory dicts which consist of name, path, + parent, snapshots and quotas. + :rtype: list + """ + paths = self._ls_dir(path, int(depth)) + # Convert (bytes => string), prettify paths (strip slashes) + # and append additional information. + return [self.get_directory(p) for p in paths if p != path.encode()] + + def _ls_dir(self, path, depth): + """ + List directories of specified path. + :param path: The root directory path. + :type path: str | bytes + :param depth: The number of steps to go down the directory tree. + :type depth: int + :return: A list of directory paths (bytes encoded). + Example: + ls_dir('/photos', 1) => [ + b'/photos/flowers', b'/photos/cars' + ] + :rtype: list + """ + if isinstance(path, str): + path = path.encode() + logger.debug("get_dir_list dirpath=%s depth=%s", path, + depth) + if depth == 0: + return [path] + logger.debug("opening dirpath=%s", path) + with self.opendir(path) as d: + dent = self.cfs.readdir(d) + paths = [path] + while dent: + logger.debug("found entry=%s", dent.d_name) + if dent.d_name in [b'.', b'..']: + dent = self.cfs.readdir(d) + continue + if dent.is_dir(): + logger.debug("found dir=%s", dent.d_name) + subdir_path = os.path.join(path, dent.d_name) + paths.extend(self._ls_dir(subdir_path, depth - 1)) + dent = self.cfs.readdir(d) + return paths + + def get_directory(self, path): + """ + Transforms path of directory into a meaningful dictionary. + :param path: The root directory path. + :type path: str | bytes + :return: Dict consists of name, path, parent, snapshots and quotas. + :rtype: dict + """ + path = path.decode() + not_root = path != os.sep + return { + 'name': os.path.basename(path) if not_root else path, + 'path': path, + 'parent': os.path.dirname(path) if not_root else None, + 'snapshots': self.ls_snapshots(path), + 'quotas': self.get_quotas(path) if not_root else None + } + + def dir_exists(self, path): + try: + with self.opendir(path): + return True + except cephfs.ObjectNotFound: + return False + + def mk_dirs(self, path): + """ + Create a directory. + :param path: The path of the directory. + """ + if path == os.sep: + raise Exception('Cannot create root directory "/"') + if self.dir_exists(path): + return + logger.info("Creating directory: %s", path) + self.cfs.mkdirs(path, 0o755) + + def rm_dir(self, path): + """ + Remove a directory. + :param path: The path of the directory. + """ + if path == os.sep: + raise Exception('Cannot remove root directory "/"') + if not self.dir_exists(path): + return + logger.info("Removing directory: %s", path) + self.cfs.rmdir(path) + + def mk_snapshot(self, path, name=None, mode=0o755): + """ + Create a snapshot. + :param path: The path of the directory. + :type path: str + :param name: The name of the snapshot. If not specified, + a name using the current time in RFC3339 UTC format + will be generated. + :type name: str | None + :param mode: The permissions the directory should have + once created. + :type mode: int + :return: Returns the name of the snapshot. + :rtype: str + """ + if name is None: + now = datetime.datetime.now() + tz = now.astimezone().tzinfo + name = now.replace(tzinfo=tz).isoformat('T') + client_snapdir = self.cfs.conf_get('client_snapdir') + snapshot_path = os.path.join(path, client_snapdir, name) + logger.info("Creating snapshot: %s", snapshot_path) + self.cfs.mkdir(snapshot_path, mode) + return name + + def ls_snapshots(self, path): + """ + List snapshots for the specified path. + :param path: The path of the directory. + :type path: str + :return: A list of dictionaries containing the name and the + creation time of the snapshot. + :rtype: list + """ + result = [] + client_snapdir = self.cfs.conf_get('client_snapdir') + path = os.path.join(path, client_snapdir).encode() + with self.opendir(path) as d: + dent = self.cfs.readdir(d) + while dent: + if dent.is_dir(): + if dent.d_name not in [b'.', b'..'] and not dent.d_name.startswith(b'_'): + snapshot_path = os.path.join(path, dent.d_name) + stat = self.cfs.stat(snapshot_path) + result.append({ + 'name': dent.d_name.decode(), + 'path': snapshot_path.decode(), + 'created': '{}Z'.format(stat.st_ctime.isoformat('T')) + }) + dent = self.cfs.readdir(d) + return result + + def rm_snapshot(self, path, name): + """ + Remove a snapshot. + :param path: The path of the directory. + :type path: str + :param name: The name of the snapshot. + :type name: str + """ + client_snapdir = self.cfs.conf_get('client_snapdir') + snapshot_path = os.path.join(path, client_snapdir, name) + logger.info("Removing snapshot: %s", snapshot_path) + self.cfs.rmdir(snapshot_path) + + def get_quotas(self, path): + """ + Get the quotas of the specified path. + :param path: The path of the directory/file. + :type path: str + :return: Returns a dictionary containing 'max_bytes' + and 'max_files'. + :rtype: dict + """ + try: + max_bytes = int(self.cfs.getxattr(path, 'ceph.quota.max_bytes')) + except cephfs.NoData: + max_bytes = 0 + try: + max_files = int(self.cfs.getxattr(path, 'ceph.quota.max_files')) + except cephfs.NoData: + max_files = 0 + return {'max_bytes': max_bytes, 'max_files': max_files} + + def set_quotas(self, path, max_bytes=None, max_files=None): + """ + Set the quotas of the specified path. + :param path: The path of the directory/file. + :type path: str + :param max_bytes: The byte limit. + :type max_bytes: int | None + :param max_files: The file limit. + :type max_files: int | None + """ + if max_bytes is not None: + self.cfs.setxattr(path, 'ceph.quota.max_bytes', + str(max_bytes).encode(), 0) + if max_files is not None: + self.cfs.setxattr(path, 'ceph.quota.max_files', + str(max_files).encode(), 0) diff --git a/src/pybind/mgr/dashboard/services/cluster.py b/src/pybind/mgr/dashboard/services/cluster.py new file mode 100644 index 000000000..9caaf1963 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cluster.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +from enum import Enum +from typing import NamedTuple + +from .. import mgr + + +class ClusterCapacity(NamedTuple): + total_avail_bytes: int + total_bytes: int + total_used_raw_bytes: int + total_objects: int + total_pool_bytes_used: int + average_object_size: int + + +class ClusterModel: + + class Status(Enum): + INSTALLED = 0 + POST_INSTALLED = 1 + + status: Status + + def __init__(self, status=Status.POST_INSTALLED.name): + """ + :param status: The status of the cluster. Assume that the cluster + is already functional by default. + :type status: str + """ + self.status = self.Status[status] + + def dict(self): + return {'status': self.status.name} + + def to_db(self): + mgr.set_store('cluster/status', self.status.name) + + @classmethod + def from_db(cls): + """ + Get the stored cluster status from the configuration key/value store. + If the status is not set, assume it is already fully functional. + """ + return cls(status=mgr.get_store('cluster/status', cls.Status.POST_INSTALLED.name)) + + @classmethod + def get_capacity(cls) -> ClusterCapacity: + df = mgr.get('df') + total_pool_bytes_used = 0 + average_object_size = 0 + total_data_pool_objects = 0 + total_data_pool_bytes_used = 0 + rgw_pools_data = cls.get_rgw_pools() + + for pool in df['pools']: + pool_name = str(pool['name']) + if pool_name in rgw_pools_data: + if pool_name.endswith('.data'): + objects = pool['stats']['objects'] + pool_bytes_used = pool['stats']['bytes_used'] + total_pool_bytes_used += pool_bytes_used + total_data_pool_objects += objects + replica = rgw_pools_data[pool_name] + total_data_pool_bytes_used += pool_bytes_used / replica + + average_object_size = total_data_pool_bytes_used / total_data_pool_objects if total_data_pool_objects != 0 else 0 # noqa E501 #pylint: disable=line-too-long + + return ClusterCapacity( + total_avail_bytes=df['stats']['total_avail_bytes'], + total_bytes=df['stats']['total_bytes'], + total_used_raw_bytes=df['stats']['total_used_raw_bytes'], + total_objects=total_data_pool_objects, + total_pool_bytes_used=total_pool_bytes_used, + average_object_size=average_object_size + )._asdict() + + @classmethod + def get_rgw_pools(cls): + rgw_pool_size = {} + + osd_map = mgr.get('osd_map') + for pool in osd_map['pools']: + if 'rgw' in pool.get('application_metadata', {}): + name = pool['pool_name'] + rgw_pool_size[name] = pool['size'] + return rgw_pool_size diff --git a/src/pybind/mgr/dashboard/services/exception.py b/src/pybind/mgr/dashboard/services/exception.py new file mode 100644 index 000000000..c39209569 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/exception.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- + +import json +import logging +from contextlib import contextmanager + +import cephfs +import cherrypy +import rados +import rbd +from orchestrator import OrchestratorError + +from ..exceptions import DashboardException, ViewCacheNoDataException +from ..rest_client import RequestException +from ..services.ceph_service import SendCommandError + +logger = logging.getLogger('exception') + + +def serialize_dashboard_exception(e, include_http_status=False, task=None): + """ + :type e: Exception + :param include_http_status: Used for Tasks, where the HTTP status code is not available. + """ + from ..tools import ViewCache + if isinstance(e, ViewCacheNoDataException): + return {'status': ViewCache.VALUE_NONE, 'value': None} + + out = dict(detail=str(e)) + try: + out['code'] = e.code + except AttributeError: + pass + component = getattr(e, 'component', None) + out['component'] = component if component else None + if include_http_status: + out['status'] = getattr(e, 'status', 500) # type: ignore + if task: + out['task'] = dict(name=task.name, metadata=task.metadata) # type: ignore + return out + + +# pylint: disable=broad-except +def dashboard_exception_handler(handler, *args, **kwargs): + try: + with handle_rados_error(component=None): # make the None controller the fallback. + return handler(*args, **kwargs) + # pylint: disable=try-except-raise + except (cherrypy.HTTPRedirect, cherrypy.NotFound, cherrypy.HTTPError): + raise + except (ViewCacheNoDataException, DashboardException) as error: + logger.exception('Dashboard Exception') + cherrypy.response.headers['Content-Type'] = 'application/json' + cherrypy.response.status = getattr(error, 'status', 400) + return json.dumps(serialize_dashboard_exception(error)).encode('utf-8') + except Exception as error: + logger.exception('Internal Server Error') + raise error + + +@contextmanager +def handle_cephfs_error(): + try: + yield + except cephfs.OSError as e: + raise DashboardException(e, component='cephfs') from e + + +@contextmanager +def handle_rbd_error(): + try: + yield + except rbd.OSError as e: + raise DashboardException(e, component='rbd') + except rbd.Error as e: + raise DashboardException(e, component='rbd', code=e.__class__.__name__) + + +@contextmanager +def handle_rados_error(component): + try: + yield + except rados.OSError as e: + raise DashboardException(e, component=component) + except rados.Error as e: + raise DashboardException(e, component=component, code=e.__class__.__name__) + + +@contextmanager +def handle_send_command_error(component): + try: + yield + except SendCommandError as e: + raise DashboardException(e, component=component) + + +@contextmanager +def handle_orchestrator_error(component): + try: + yield + except OrchestratorError as e: + raise DashboardException(e, component=component) + + +@contextmanager +def handle_request_error(component): + try: + yield + except RequestException as e: + if e.content: + content = json.loads(e.content) + content_message = content.get('message') + if content_message: + raise DashboardException( + msg=content_message, component=component) + raise DashboardException(e=e, component=component) + + +@contextmanager +def handle_error(component, http_status_code=None): + try: + yield + except Exception as e: # pylint: disable=broad-except + raise DashboardException(e, component=component, http_status_code=http_status_code) + + +@contextmanager +def handle_custom_error(component, http_status_code=None, exceptions=()): + try: + yield + except exceptions as e: + raise DashboardException(e, component=component, http_status_code=http_status_code) diff --git a/src/pybind/mgr/dashboard/services/iscsi_cli.py b/src/pybind/mgr/dashboard/services/iscsi_cli.py new file mode 100644 index 000000000..0e2e0b215 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_cli.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +import errno +import json +from typing import Optional + +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand + +from ..rest_client import RequestException +from .iscsi_client import IscsiClient +from .iscsi_config import InvalidServiceUrl, IscsiGatewayAlreadyExists, \ + IscsiGatewayDoesNotExist, IscsiGatewaysConfig, \ + ManagedByOrchestratorException + + +@CLIReadCommand('dashboard iscsi-gateway-list') +def list_iscsi_gateways(_): + ''' + List iSCSI gateways + ''' + return 0, json.dumps(IscsiGatewaysConfig.get_gateways_config()), '' + + +@CLIWriteCommand('dashboard iscsi-gateway-add') +@CLICheckNonemptyFileInput(desc='iSCSI gateway configuration') +def add_iscsi_gateway(_, inbuf, name: Optional[str] = None): + ''' + Add iSCSI gateway configuration. Gateway URL read from -i <file> + ''' + service_url = inbuf + try: + IscsiGatewaysConfig.validate_service_url(service_url) + if name is None: + name = IscsiClient.instance(service_url=service_url).get_hostname()['data'] + IscsiGatewaysConfig.add_gateway(name, service_url) + return 0, 'Success', '' + except IscsiGatewayAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + except InvalidServiceUrl as ex: + return -errno.EINVAL, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) + except RequestException as ex: + return -errno.EINVAL, '', str(ex) + + +@CLIWriteCommand('dashboard iscsi-gateway-rm') +def remove_iscsi_gateway(_, name: str): + ''' + Remove iSCSI gateway configuration + ''' + try: + IscsiGatewaysConfig.remove_gateway(name) + return 0, 'Success', '' + except IscsiGatewayDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) diff --git a/src/pybind/mgr/dashboard/services/iscsi_client.py b/src/pybind/mgr/dashboard/services/iscsi_client.py new file mode 100644 index 000000000..c222fbb0d --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_client.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-public-methods + +import json +import logging + +from requests.auth import HTTPBasicAuth + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from ..rest_client import RestClient +from ..settings import Settings +from .iscsi_config import IscsiGatewaysConfig + +logger = logging.getLogger('iscsi_client') + + +class IscsiClient(RestClient): + _CLIENT_NAME = 'iscsi' + _instances = {} # type: dict + + service_url = None + gateway_name = None + + @classmethod + def instance(cls, gateway_name=None, service_url=None): + if not service_url: + if not gateway_name: + gateway_name = list(IscsiGatewaysConfig.get_gateways_config()['gateways'].keys())[0] + gateways_config = IscsiGatewaysConfig.get_gateway_config(gateway_name) + service_url = gateways_config['service_url'] + + instance = cls._instances.get(gateway_name) + if not instance or service_url != instance.service_url or \ + instance.session.verify != Settings.ISCSI_API_SSL_VERIFICATION: + url = urlparse(service_url) + ssl = url.scheme == 'https' + host = url.hostname + port = url.port + username = url.username + password = url.password + if not port: + port = 443 if ssl else 80 + + auth = HTTPBasicAuth(username, password) + instance = IscsiClient(host, port, IscsiClient._CLIENT_NAME, ssl, + auth, Settings.ISCSI_API_SSL_VERIFICATION) + instance.service_url = service_url + instance.gateway_name = gateway_name + if gateway_name: + cls._instances[gateway_name] = instance + + return instance + + @RestClient.api_get('/api/_ping') + def ping(self, request=None): + return request() + + @RestClient.api_get('/api/settings') + def get_settings(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/ip_addresses') + def get_ip_addresses(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/hostname') + def get_hostname(self, request=None): + return request() + + @RestClient.api_get('/api/config') + def get_config(self, request=None): + return request({ + 'decrypt_passwords': True + }) + + @RestClient.api_put('/api/target/{target_iqn}') + def create_target(self, target_iqn, target_controls, request=None): + logger.debug("[%s] Creating target: %s", self.gateway_name, target_iqn) + return request({ + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_delete('/api/target/{target_iqn}') + def delete_target(self, target_iqn, request=None): + logger.debug("[%s] Deleting target: %s", self.gateway_name, target_iqn) + return request() + + @RestClient.api_put('/api/target/{target_iqn}') + def reconfigure_target(self, target_iqn, target_controls, request=None): + logger.debug("[%s] Reconfiguring target: %s", self.gateway_name, target_iqn) + return request({ + 'mode': 'reconfigure', + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_put('/api/gateway/{target_iqn}/{gateway_name}') + def create_gateway(self, target_iqn, gateway_name, ip_address, request=None): + logger.debug("[%s] Creating gateway: %s/%s", self.gateway_name, target_iqn, + gateway_name) + return request({ + 'ip_address': ','.join(ip_address), + 'skipchecks': 'true' + }) + + @RestClient.api_get('/api/gatewayinfo') + def get_gatewayinfo(self, request=None): + return request() + + @RestClient.api_delete('/api/gateway/{target_iqn}/{gateway_name}') + def delete_gateway(self, target_iqn, gateway_name, request=None): + logger.debug("Deleting gateway: %s/%s", target_iqn, gateway_name) + return request() + + @RestClient.api_put('/api/disk/{pool}/{image}') + def create_disk(self, pool, image, backstore, wwn, request=None): + logger.debug("[%s] Creating disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'mode': 'create', + 'backstore': backstore, + 'wwn': wwn + }) + + @RestClient.api_delete('/api/disk/{pool}/{image}') + def delete_disk(self, pool, image, request=None): + logger.debug("[%s] Deleting disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'preserve_image': 'true' + }) + + @RestClient.api_put('/api/disk/{pool}/{image}') + def reconfigure_disk(self, pool, image, controls, request=None): + logger.debug("[%s] Reconfiguring disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'controls': json.dumps(controls), + 'mode': 'reconfigure' + }) + + @RestClient.api_put('/api/targetlun/{target_iqn}') + def create_target_lun(self, target_iqn, image_id, lun, request=None): + logger.debug("[%s] Creating target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id, + 'lun_id': lun + }) + + @RestClient.api_delete('/api/targetlun/{target_iqn}') + def delete_target_lun(self, target_iqn, image_id, request=None): + logger.debug("[%s] Deleting target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/client/{target_iqn}/{client_iqn}') + def create_client(self, target_iqn, client_iqn, request=None): + logger.debug("[%s] Creating client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_delete('/api/client/{target_iqn}/{client_iqn}') + def delete_client(self, target_iqn, client_iqn, request=None): + logger.debug("[%s] Deleting client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_put('/api/clientlun/{target_iqn}/{client_iqn}') + def create_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("[%s] Creating client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_delete('/api/clientlun/{target_iqn}/{client_iqn}') + def delete_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("iSCSI[%s] Deleting client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/clientauth/{target_iqn}/{client_iqn}') + def create_client_auth(self, target_iqn, client_iqn, username, password, mutual_username, + mutual_password, request=None): + logger.debug("[%s] Creating client auth: %s/%s/%s/%s/%s/%s", + self.gateway_name, target_iqn, client_iqn, username, password, mutual_username, + mutual_password) + return request({ + 'username': username, + 'password': password, + 'mutual_username': mutual_username, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def create_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("[%s] Creating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def update_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("iSCSI[%s] Updating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'action': 'remove', + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_delete('/api/hostgroup/{target_iqn}/{group_name}') + def delete_group(self, target_iqn, group_name, request=None): + logger.debug("[%s] Deleting group: %s/%s", self.gateway_name, target_iqn, group_name) + return request() + + @RestClient.api_put('/api/discoveryauth') + def update_discoveryauth(self, user, password, mutual_user, mutual_password, request=None): + logger.debug("[%s] Updating discoveryauth: %s/%s/%s/%s", self.gateway_name, user, + password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetacl(self, target_iqn, action, request=None): + logger.debug("[%s] Updating targetacl: %s/%s", self.gateway_name, target_iqn, action) + return request({ + 'action': action + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetauth(self, target_iqn, user, password, mutual_user, mutual_password, + request=None): + logger.debug("[%s] Updating targetauth: %s/%s/%s/%s/%s", self.gateway_name, + target_iqn, user, password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_get('/api/targetinfo/{target_iqn}') + def get_targetinfo(self, target_iqn, request=None): + # pylint: disable=unused-argument + return request() + + @RestClient.api_get('/api/clientinfo/{target_iqn}/{client_iqn}') + def get_clientinfo(self, target_iqn, client_iqn, request=None): + # pylint: disable=unused-argument + return request() diff --git a/src/pybind/mgr/dashboard/services/iscsi_config.py b/src/pybind/mgr/dashboard/services/iscsi_config.py new file mode 100644 index 000000000..c1898a463 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_config.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- + +import json + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from .. import mgr + + +class IscsiGatewayAlreadyExists(Exception): + def __init__(self, gateway_name): + super(IscsiGatewayAlreadyExists, self).__init__( + "iSCSI gateway '{}' already exists".format(gateway_name)) + + +class IscsiGatewayDoesNotExist(Exception): + def __init__(self, hostname): + super(IscsiGatewayDoesNotExist, self).__init__( + "iSCSI gateway '{}' does not exist".format(hostname)) + + +class InvalidServiceUrl(Exception): + def __init__(self, service_url): + super(InvalidServiceUrl, self).__init__( + "Invalid service URL '{}'. " + "Valid format: '<scheme>://<username>:<password>@<host>[:port]'.".format(service_url)) + + +class ManagedByOrchestratorException(Exception): + def __init__(self): + super(ManagedByOrchestratorException, self).__init__( + "iSCSI configuration is managed by the orchestrator") + + +_ISCSI_STORE_KEY = "_iscsi_config" + + +class IscsiGatewaysConfig(object): + @classmethod + def _load_config_from_store(cls): + json_db = mgr.get_store(_ISCSI_STORE_KEY, + '{"gateways": {}}') + config = json.loads(json_db) + cls.update_iscsi_config(config) + return config + + @classmethod + def update_iscsi_config(cls, config): + """ + Since `ceph-iscsi` config v10, gateway names were renamed from host short name to FQDN. + If Ceph Dashboard were configured before v10, we try to update our internal gateways + database automatically. + """ + for gateway_name, gateway_config in list(config['gateways'].items()): + if '.' not in gateway_name: + from ..rest_client import RequestException + from .iscsi_client import IscsiClient # pylint: disable=cyclic-import + try: + service_url = gateway_config['service_url'] + new_gateway_name = IscsiClient.instance( + service_url=service_url).get_hostname()['data'] + if gateway_name != new_gateway_name: + config['gateways'][new_gateway_name] = gateway_config + del config['gateways'][gateway_name] + cls._save_config(config) + except RequestException: + # If gateway is not acessible, it should be removed manually + # or we will try to update automatically next time + continue + + @classmethod + def _save_config(cls, config): + mgr.set_store(_ISCSI_STORE_KEY, json.dumps(config)) + + @classmethod + def validate_service_url(cls, service_url): + url = urlparse(service_url) + if not url.scheme or not url.hostname or not url.username or not url.password: + raise InvalidServiceUrl(service_url) + + @classmethod + def add_gateway(cls, name, service_url): + config = cls.get_gateways_config() + if name in config: + raise IscsiGatewayAlreadyExists(name) + IscsiGatewaysConfig.validate_service_url(service_url) + config['gateways'][name] = {'service_url': service_url} + cls._save_config(config) + + @classmethod + def remove_gateway(cls, name): + config = cls._load_config_from_store() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + + del config['gateways'][name] + cls._save_config(config) + + @classmethod + def get_gateways_config(cls): + return cls._load_config_from_store() + + @classmethod + def get_gateway_config(cls, name): + config = IscsiGatewaysConfig.get_gateways_config() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + return config['gateways'][name] diff --git a/src/pybind/mgr/dashboard/services/orchestrator.py b/src/pybind/mgr/dashboard/services/orchestrator.py new file mode 100644 index 000000000..e49ab80bf --- /dev/null +++ b/src/pybind/mgr/dashboard/services/orchestrator.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- + +import logging +from functools import wraps +from typing import Any, Dict, List, Optional, Tuple + +from ceph.deployment.service_spec import ServiceSpec +from orchestrator import DaemonDescription, DeviceLightLoc, HostSpec, \ + InventoryFilter, OrchestratorClientMixin, OrchestratorError, OrchResult, \ + ServiceDescription, raise_if_exception + +from .. import mgr +from ._paginate import ListPaginator + +logger = logging.getLogger('orchestrator') + + +# pylint: disable=abstract-method +class OrchestratorAPI(OrchestratorClientMixin): + def __init__(self): + super(OrchestratorAPI, self).__init__() + self.set_mgr(mgr) # type: ignore + + def status(self): + try: + status, message, _module_details = super().available() + logger.info("is orchestrator available: %s, %s", status, message) + return dict(available=status, message=message) + except (RuntimeError, OrchestratorError, ImportError) as e: + return dict( + available=False, + message='Orchestrator is unavailable: {}'.format(str(e))) + + +def wait_api_result(method): + @wraps(method) + def inner(self, *args, **kwargs): + completion = method(self, *args, **kwargs) + raise_if_exception(completion) + return completion.result + return inner + + +class ResourceManager(object): + def __init__(self, api): + self.api = api + + +class HostManger(ResourceManager): + @wait_api_result + def list(self) -> List[HostSpec]: + return self.api.get_hosts() + + @wait_api_result + def enter_maintenance(self, hostname: str, force: bool = False): + return self.api.enter_host_maintenance(hostname, force) + + @wait_api_result + def exit_maintenance(self, hostname: str): + return self.api.exit_host_maintenance(hostname) + + def get(self, hostname: str) -> Optional[HostSpec]: + hosts = [host for host in self.list() if host.hostname == hostname] + return hosts[0] if hosts else None + + @wait_api_result + def add(self, hostname: str, addr: str, labels: List[str]): + return self.api.add_host(HostSpec(hostname, addr=addr, labels=labels)) + + @wait_api_result + def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: + return self.api.get_facts(hostname) + + @wait_api_result + def remove(self, hostname: str): + return self.api.remove_host(hostname) + + @wait_api_result + def add_label(self, host: str, label: str) -> OrchResult[str]: + return self.api.add_host_label(host, label) + + @wait_api_result + def remove_label(self, host: str, label: str) -> OrchResult[str]: + return self.api.remove_host_label(host, label) + + @wait_api_result + def drain(self, hostname: str): + return self.api.drain_host(hostname) + + +class InventoryManager(ResourceManager): + @wait_api_result + def list(self, hosts=None, refresh=False): + host_filter = InventoryFilter(hosts=hosts) if hosts else None + return self.api.get_inventory(host_filter=host_filter, refresh=refresh) + + +class ServiceManager(ResourceManager): + def list(self, + service_type: Optional[str] = None, + service_name: Optional[str] = None, + offset: int = 0, limit: int = -1, + sort: str = '+service_name', search: str = '') -> Tuple[List[Dict[Any, Any]], int]: + services = self.api.describe_service(service_type, service_name) + services = [service.to_dict() for service in services.result] + paginator = ListPaginator(offset, limit, sort, search, + input_list=services, + searchable_params=['service_name', 'status.running', + 'status.last_refreshed', 'status.size'], + sortable_params=['service_name', 'status.running', + 'status.last_refreshed', 'status.size'], + default_sort='+service_name') + return list(paginator.list()), paginator.get_count() + + @wait_api_result + def get(self, service_name: str) -> ServiceDescription: + return self.api.describe_service(None, service_name) + + @wait_api_result + def list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + hostname: Optional[str] = None) -> List[DaemonDescription]: + return self.api.list_daemons(service_name=service_name, + daemon_type=daemon_type, + host=hostname) + + def reload(self, service_type, service_ids): + if not isinstance(service_ids, list): + service_ids = [service_ids] + + completion_list = [ + self.api.service_action('reload', service_type, service_name, + service_id) + for service_name, service_id in service_ids + ] + self.api.orchestrator_wait(completion_list) + for c in completion_list: + raise_if_exception(c) + + @wait_api_result + def apply(self, + service_spec: Dict, + no_overwrite: Optional[bool] = False) -> OrchResult[List[str]]: + spec = ServiceSpec.from_json(service_spec) + return self.api.apply([spec], no_overwrite) + + @wait_api_result + def remove(self, service_name: str) -> List[str]: + return self.api.remove_service(service_name) + + +class OsdManager(ResourceManager): + @wait_api_result + def create(self, drive_group_specs): + return self.api.apply_drivegroups(drive_group_specs) + + @wait_api_result + def remove(self, osd_ids, replace=False, force=False): + return self.api.remove_osds(osd_ids, replace, force) + + @wait_api_result + def removing_status(self): + return self.api.remove_osds_status() + + +class DaemonManager(ResourceManager): + @wait_api_result + def action(self, daemon_name='', action='', image=None): + return self.api.daemon_action(daemon_name=daemon_name, action=action, image=image) + + +class UpgradeManager(ResourceManager): + @wait_api_result + def list(self, image: Optional[str], tags: bool, + show_all_versions: Optional[bool]) -> Dict[Any, Any]: + return self.api.upgrade_ls(image, tags, show_all_versions) + + @wait_api_result + def status(self): + return self.api.upgrade_status() + + @wait_api_result + def start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, + host_placement: Optional[str] = None, services: Optional[List[str]] = None, + limit: Optional[int] = None) -> str: + return self.api.upgrade_start(image, version, daemon_types, host_placement, services, + limit) + + @wait_api_result + def pause(self) -> str: + return self.api.upgrade_pause() + + @wait_api_result + def resume(self) -> str: + return self.api.upgrade_resume() + + @wait_api_result + def stop(self) -> str: + return self.api.upgrade_stop() + + +class OrchClient(object): + + _instance = None + + @classmethod + def instance(cls): + # type: () -> OrchClient + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def __init__(self): + self.api = OrchestratorAPI() + + self.hosts = HostManger(self.api) + self.inventory = InventoryManager(self.api) + self.services = ServiceManager(self.api) + self.osds = OsdManager(self.api) + self.daemons = DaemonManager(self.api) + self.upgrades = UpgradeManager(self.api) + + def available(self, features: Optional[List[str]] = None) -> bool: + available = self.status()['available'] + if available and features is not None: + return not self.get_missing_features(features) + return available + + def status(self) -> Dict[str, Any]: + status = self.api.status() + status['features'] = {} + if status['available']: + status['features'] = self.api.get_feature_set() + return status + + def get_missing_features(self, features: List[str]) -> List[str]: + supported_features = {k for k, v in self.api.get_feature_set().items() if v['available']} + return list(set(features) - supported_features) + + @wait_api_result + def blink_device_light(self, hostname, device, ident_fault, on): + # type: (str, str, str, bool) -> OrchResult[List[str]] + return self.api.blink_device_light( + ident_fault, on, [DeviceLightLoc(hostname, device, device)]) + + +class OrchFeature(object): + HOST_LIST = 'get_hosts' + HOST_ADD = 'add_host' + HOST_REMOVE = 'remove_host' + HOST_LABEL_ADD = 'add_host_label' + HOST_LABEL_REMOVE = 'remove_host_label' + HOST_MAINTENANCE_ENTER = 'enter_host_maintenance' + HOST_MAINTENANCE_EXIT = 'exit_host_maintenance' + HOST_DRAIN = 'drain_host' + + SERVICE_LIST = 'describe_service' + SERVICE_CREATE = 'apply' + SERVICE_EDIT = 'apply' + SERVICE_DELETE = 'remove_service' + SERVICE_RELOAD = 'service_action' + DAEMON_LIST = 'list_daemons' + + OSD_GET_REMOVE_STATUS = 'remove_osds_status' + + OSD_CREATE = 'apply_drivegroups' + OSD_DELETE = 'remove_osds' + + DEVICE_LIST = 'get_inventory' + DEVICE_BLINK_LIGHT = 'blink_device_light' + + DAEMON_ACTION = 'daemon_action' + + UPGRADE_LIST = 'upgrade_ls' + UPGRADE_STATUS = 'upgrade_status' + UPGRADE_START = 'upgrade_start' + UPGRADE_PAUSE = 'upgrade_pause' + UPGRADE_RESUME = 'upgrade_resume' + UPGRADE_STOP = 'upgrade_stop' diff --git a/src/pybind/mgr/dashboard/services/osd.py b/src/pybind/mgr/dashboard/services/osd.py new file mode 100644 index 000000000..12db733cc --- /dev/null +++ b/src/pybind/mgr/dashboard/services/osd.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +from enum import Enum + + +class OsdDeploymentOptions(str, Enum): + COST_CAPACITY = 'cost_capacity' + THROUGHPUT = 'throughput_optimized' + IOPS = 'iops_optimized' + + +class HostStorageSummary: + def __init__(self, name: str, title=None, desc=None, available=False, + capacity=0, used=0, hdd_used=0, ssd_used=0, nvme_used=0): + self.name = name + self.title = title + self.desc = desc + self.available = available + self.capacity = capacity + self.used = used + self.hdd_used = hdd_used + self.ssd_used = ssd_used + self.nvme_used = nvme_used + + def as_dict(self): + return self.__dict__ diff --git a/src/pybind/mgr/dashboard/services/progress.py b/src/pybind/mgr/dashboard/services/progress.py new file mode 100644 index 000000000..d1afefbac --- /dev/null +++ b/src/pybind/mgr/dashboard/services/progress.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +''' +Progress Mgr Module Helper + +This python module implements helper methods to retrieve the +executing and completed tasks tacked by the progress mgr module +using the same structure of dashboard tasks +''' + + +import logging +from datetime import datetime + +from .. import mgr +from . import rbd # pylint: disable=no-name-in-module + +logger = logging.getLogger('progress') + + +def _progress_event_to_dashboard_task_common(event, task): + if event['refs'] and isinstance(event['refs'], dict): + refs = event['refs'] + if refs['origin'] == "rbd_support": + # rbd mgr module event, we can transform this event into an rbd dashboard task + action_map = { + 'remove': "delete", + 'flatten': "flatten", + 'trash remove': "trash/remove" + } + action = action_map.get(refs['action'], refs['action']) + metadata = {} + if 'image_name' in refs: + metadata['image_spec'] = rbd.get_image_spec(refs['pool_name'], + refs['pool_namespace'], + refs['image_name']) + else: + metadata['image_id_spec'] = rbd.get_image_spec(refs['pool_name'], + refs['pool_namespace'], + refs['image_id']) + task.update({ + 'name': "rbd/{}".format(action), + 'metadata': metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(event["started_at"]) + .isoformat()), + }) + return + + task.update({ + # we're prepending the "progress/" prefix to tag tasks that come + # from the progress module + 'name': "progress/{}".format(event['message']), + 'metadata': dict(event.get('refs', {})), + 'begin_time': "{}Z".format(datetime.fromtimestamp(event["started_at"]) + .isoformat()), + }) + + +def _progress_event_to_dashboard_task(event, completed=False): + task = {} + _progress_event_to_dashboard_task_common(event, task) + if not completed: + task.update({ + 'progress': int(100 * event['progress']) + }) + else: + task.update({ + 'end_time': "{}Z".format(datetime.fromtimestamp(event['finished_at']) + .isoformat()), + 'duration': event['finished_at'] - event['started_at'], + 'progress': 100, + 'success': 'failed' not in event, + 'ret_value': None, + 'exception': {'detail': event['failure_message']} if 'failed' in event else None + }) + return task + + +def get_progress_tasks(): + executing_t = [] + finished_t = [] + progress_events = mgr.remote('progress', "_json") + + for ev in progress_events['events']: + logger.debug("event=%s", ev) + executing_t.append(_progress_event_to_dashboard_task(ev)) + + for ev in progress_events['completed']: + logger.debug("finished event=%s", ev) + finished_t.append(_progress_event_to_dashboard_task(ev, True)) + + return executing_t, finished_t diff --git a/src/pybind/mgr/dashboard/services/rbd.py b/src/pybind/mgr/dashboard/services/rbd.py new file mode 100644 index 000000000..bb769ce19 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rbd.py @@ -0,0 +1,775 @@ +# -*- coding: utf-8 -*- +# pylint: disable=unused-argument +import errno +import json +import math +from enum import IntEnum + +import cherrypy +import rados +import rbd + +from .. import mgr +from ..exceptions import DashboardException +from ..plugins.ttl_cache import ttl_cache, ttl_cache_invalidator +from ._paginate import ListPaginator +from .ceph_service import CephService + +try: + from typing import List, Optional +except ImportError: + pass # For typing only + + +RBD_FEATURES_NAME_MAPPING = { + rbd.RBD_FEATURE_LAYERING: "layering", + rbd.RBD_FEATURE_STRIPINGV2: "striping", + rbd.RBD_FEATURE_EXCLUSIVE_LOCK: "exclusive-lock", + rbd.RBD_FEATURE_OBJECT_MAP: "object-map", + rbd.RBD_FEATURE_FAST_DIFF: "fast-diff", + rbd.RBD_FEATURE_DEEP_FLATTEN: "deep-flatten", + rbd.RBD_FEATURE_JOURNALING: "journaling", + rbd.RBD_FEATURE_DATA_POOL: "data-pool", + rbd.RBD_FEATURE_OPERATIONS: "operations", +} + +RBD_IMAGE_REFS_CACHE_REFERENCE = 'rbd_image_refs' +GET_IOCTX_CACHE = 'get_ioctx' +POOL_NAMESPACES_CACHE = 'pool_namespaces' + + +class MIRROR_IMAGE_MODE(IntEnum): + journal = rbd.RBD_MIRROR_IMAGE_MODE_JOURNAL + snapshot = rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT + + +def _rbd_support_remote(method_name: str, *args, **kwargs): + try: + return mgr.remote('rbd_support', method_name, *args, **kwargs) + except ImportError as ie: + raise DashboardException(f'rbd_support module not found {ie}') + except RuntimeError as ie: + raise DashboardException(f'rbd_support.{method_name} error: {ie}') + + +def format_bitmask(features): + """ + Formats the bitmask: + + @DISABLEDOCTEST: >>> format_bitmask(45) + ['deep-flatten', 'exclusive-lock', 'layering', 'object-map'] + """ + names = [val for key, val in RBD_FEATURES_NAME_MAPPING.items() + if key & features == key] + return sorted(names) + + +def format_features(features): + """ + Converts the features list to bitmask: + + @DISABLEDOCTEST: >>> format_features(['deep-flatten', 'exclusive-lock', + 'layering', 'object-map']) + 45 + + @DISABLEDOCTEST: >>> format_features(None) is None + True + + @DISABLEDOCTEST: >>> format_features('deep-flatten, exclusive-lock') + 32 + """ + if isinstance(features, str): + features = features.split(',') + + if not isinstance(features, list): + return None + + res = 0 + for key, value in RBD_FEATURES_NAME_MAPPING.items(): + if value in features: + res = key | res + return res + + +def _sort_features(features, enable=True): + """ + Sorts image features according to feature dependencies: + + object-map depends on exclusive-lock + journaling depends on exclusive-lock + fast-diff depends on object-map + """ + ORDER = ['exclusive-lock', 'journaling', 'object-map', 'fast-diff'] # noqa: N806 + + def key_func(feat): + try: + return ORDER.index(feat) + except ValueError: + return id(feat) + + features.sort(key=key_func, reverse=not enable) + + +def get_image_spec(pool_name, namespace, rbd_name): + namespace = '{}/'.format(namespace) if namespace else '' + return '{}/{}{}'.format(pool_name, namespace, rbd_name) + + +def parse_image_spec(image_spec): + namespace_spec, image_name = image_spec.rsplit('/', 1) + if '/' in namespace_spec: + pool_name, namespace = namespace_spec.rsplit('/', 1) + else: + pool_name, namespace = namespace_spec, None + return pool_name, namespace, image_name + + +def rbd_call(pool_name, namespace, func, *args, **kwargs): + with mgr.rados.open_ioctx(pool_name) as ioctx: + ioctx.set_namespace(namespace if namespace is not None else '') + return func(ioctx, *args, **kwargs) + + +def rbd_image_call(pool_name, namespace, image_name, func, *args, **kwargs): + def _ioctx_func(ioctx, image_name, func, *args, **kwargs): + with rbd.Image(ioctx, image_name) as img: + return func(ioctx, img, *args, **kwargs) + + return rbd_call(pool_name, namespace, _ioctx_func, image_name, func, *args, **kwargs) + + +class RbdConfiguration(object): + _rbd = rbd.RBD() + + def __init__(self, pool_name: str = '', namespace: str = '', image_name: str = '', + pool_ioctx: Optional[rados.Ioctx] = None, image_ioctx: Optional[rbd.Image] = None): + assert bool(pool_name) != bool(pool_ioctx) # xor + self._pool_name = pool_name + self._namespace = namespace if namespace is not None else '' + self._image_name = image_name + self._pool_ioctx = pool_ioctx + self._image_ioctx = image_ioctx + + @staticmethod + def _ensure_prefix(option): + # type: (str) -> str + return option if option.startswith('conf_') else 'conf_' + option + + def list(self): + # type: () -> List[dict] + def _list(ioctx): + if self._image_name: # image config + try: + # No need to open the context of the image again + # if we already did open it. + if self._image_ioctx: + result = self._image_ioctx.config_list() + else: + with rbd.Image(ioctx, self._image_name) as image: + result = image.config_list() + except rbd.ImageNotFound: + result = [] + else: # pool config + pg_status = list(CephService.get_pool_pg_status(self._pool_name).keys()) + if len(pg_status) == 1 and 'incomplete' in pg_status[0]: + # If config_list would be called with ioctx if it's a bad pool, + # the dashboard would stop working, waiting for the response + # that would not happen. + # + # This is only a workaround for https://tracker.ceph.com/issues/43771 which + # already got rejected as not worth the effort. + # + # Are more complete workaround for the dashboard will be implemented with + # https://tracker.ceph.com/issues/44224 + # + # @TODO: If #44224 is addressed remove this workaround + return [] + result = self._rbd.config_list(ioctx) + return list(result) + + if self._pool_name: + ioctx = mgr.rados.open_ioctx(self._pool_name) + ioctx.set_namespace(self._namespace) + else: + ioctx = self._pool_ioctx + + return _list(ioctx) + + def get(self, option_name): + # type: (str) -> str + option_name = self._ensure_prefix(option_name) + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + pool_ioctx.set_namespace(self._namespace) + if self._image_name: + with rbd.Image(pool_ioctx, self._image_name) as image: + return image.metadata_get(option_name) + return self._rbd.pool_metadata_get(pool_ioctx, option_name) + + def set(self, option_name, option_value): + # type: (str, str) -> None + + option_value = str(option_value) + option_name = self._ensure_prefix(option_name) + + pool_ioctx = self._pool_ioctx + if self._pool_name: # open ioctx + pool_ioctx = mgr.rados.open_ioctx(self._pool_name) + pool_ioctx.__enter__() # type: ignore + pool_ioctx.set_namespace(self._namespace) # type: ignore + + image_ioctx = self._image_ioctx + if self._image_name: + image_ioctx = rbd.Image(pool_ioctx, self._image_name) + image_ioctx.__enter__() # type: ignore + + if image_ioctx: + image_ioctx.metadata_set(option_name, option_value) # type: ignore + else: + self._rbd.pool_metadata_set(pool_ioctx, option_name, option_value) + + if self._image_name: # Name provided, so we opened it and now have to close it + image_ioctx.__exit__(None, None, None) # type: ignore + if self._pool_name: + pool_ioctx.__exit__(None, None, None) # type: ignore + + def remove(self, option_name): + """ + Removes an option by name. Will not raise an error, if the option hasn't been found. + :type option_name str + """ + def _remove(ioctx): + try: + if self._image_name: + with rbd.Image(ioctx, self._image_name) as image: + image.metadata_remove(option_name) + else: + self._rbd.pool_metadata_remove(ioctx, option_name) + except KeyError: + pass + + option_name = self._ensure_prefix(option_name) + + if self._pool_name: + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + pool_ioctx.set_namespace(self._namespace) + _remove(pool_ioctx) + else: + _remove(self._pool_ioctx) + + def set_configuration(self, configuration): + if configuration: + for option_name, option_value in configuration.items(): + if option_value is not None: + self.set(option_name, option_value) + else: + self.remove(option_name) + + +class RbdService(object): + _rbd_inst = rbd.RBD() + + # set of image features that can be enable on existing images + ALLOW_ENABLE_FEATURES = {"exclusive-lock", "object-map", "fast-diff", "journaling"} + + # set of image features that can be disabled on existing images + ALLOW_DISABLE_FEATURES = {"exclusive-lock", "object-map", "fast-diff", "deep-flatten", + "journaling"} + + @classmethod + def _rbd_disk_usage(cls, image, snaps, whole_object=True): + class DUCallback(object): + def __init__(self): + self.used_size = 0 + + def __call__(self, offset, length, exists): + if exists: + self.used_size += length + + snap_map = {} + prev_snap = None + total_used_size = 0 + for _, size, name in snaps: + image.set_snap(name) + du_callb = DUCallback() + image.diff_iterate(0, size, prev_snap, du_callb, + whole_object=whole_object) + snap_map[name] = du_callb.used_size + total_used_size += du_callb.used_size + prev_snap = name + + return total_used_size, snap_map + + @classmethod + def _rbd_image(cls, ioctx, pool_name, namespace, image_name): # pylint: disable=R0912 + with rbd.Image(ioctx, image_name) as img: + stat = img.stat() + mirror_info = img.mirror_image_get_info() + mirror_mode = img.mirror_image_get_mode() + if mirror_mode == rbd.RBD_MIRROR_IMAGE_MODE_JOURNAL and mirror_info['state'] != rbd.RBD_MIRROR_IMAGE_DISABLED: # noqa E501 #pylint: disable=line-too-long + stat['mirror_mode'] = 'journal' + elif mirror_mode == rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + stat['mirror_mode'] = 'snapshot' + schedule_status = json.loads(_rbd_support_remote( + 'mirror_snapshot_schedule_status')[1]) + for scheduled_image in schedule_status['scheduled_images']: + if scheduled_image['image'] == get_image_spec(pool_name, namespace, image_name): + stat['schedule_info'] = scheduled_image + else: + stat['mirror_mode'] = 'Disabled' + + stat['name'] = image_name + + stat['primary'] = None + if mirror_info['state'] == rbd.RBD_MIRROR_IMAGE_ENABLED: + stat['primary'] = mirror_info['primary'] + + if img.old_format(): + stat['unique_id'] = get_image_spec(pool_name, namespace, stat['block_name_prefix']) + stat['id'] = stat['unique_id'] + stat['image_format'] = 1 + else: + stat['unique_id'] = get_image_spec(pool_name, namespace, img.id()) + stat['id'] = img.id() + stat['image_format'] = 2 + + stat['pool_name'] = pool_name + stat['namespace'] = namespace + features = img.features() + stat['features'] = features + stat['features_name'] = format_bitmask(features) + + # the following keys are deprecated + del stat['parent_pool'] + del stat['parent_name'] + + stat['timestamp'] = "{}Z".format(img.create_timestamp() + .isoformat()) + + stat['stripe_count'] = img.stripe_count() + stat['stripe_unit'] = img.stripe_unit() + + data_pool_name = CephService.get_pool_name_from_id( + img.data_pool_id()) + if data_pool_name == pool_name: + data_pool_name = None + stat['data_pool'] = data_pool_name + + stat['parent'] = cls._rbd_image_stat_parent(img) + + # snapshots + stat['snapshots'] = [] + for snap in img.list_snaps(): + try: + snap['mirror_mode'] = MIRROR_IMAGE_MODE(img.mirror_image_get_mode()).name + except ValueError as ex: + raise DashboardException(f'Unknown RBD Mirror mode: {ex}') + + snap['timestamp'] = "{}Z".format( + img.get_snap_timestamp(snap['id']).isoformat()) + + snap['is_protected'] = None + if mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + snap['is_protected'] = img.is_protected_snap(snap['name']) + snap['used_bytes'] = None + snap['children'] = [] + + if mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + img.set_snap(snap['name']) + for child_pool_name, child_image_name in img.list_children(): + snap['children'].append({ + 'pool_name': child_pool_name, + 'image_name': child_image_name + }) + stat['snapshots'].append(snap) + + # disk usage + img_flags = img.flags() + if 'fast-diff' in stat['features_name'] and \ + not rbd.RBD_FLAG_FAST_DIFF_INVALID & img_flags and \ + mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + snaps = [(s['id'], s['size'], s['name']) + for s in stat['snapshots']] + snaps.sort(key=lambda s: s[0]) + snaps += [(snaps[-1][0] + 1 if snaps else 0, stat['size'], None)] + total_prov_bytes, snaps_prov_bytes = cls._rbd_disk_usage( + img, snaps, True) + stat['total_disk_usage'] = total_prov_bytes + for snap, prov_bytes in snaps_prov_bytes.items(): + if snap is None: + stat['disk_usage'] = prov_bytes + continue + for ss in stat['snapshots']: + if ss['name'] == snap: + ss['disk_usage'] = prov_bytes + break + else: + stat['total_disk_usage'] = None + stat['disk_usage'] = None + + stat['configuration'] = RbdConfiguration( + pool_ioctx=ioctx, image_name=image_name, image_ioctx=img).list() + + stat['metadata'] = RbdImageMetadataService(img).list() + + return stat + + @classmethod + def _rbd_image_stat_parent(cls, img): + stat_parent = None + try: + stat_parent = img.get_parent_image_spec() + except rbd.ImageNotFound: + # no parent image + stat_parent = None + return stat_parent + + @classmethod + @ttl_cache(10, label=GET_IOCTX_CACHE) + def get_ioctx(cls, pool_name, namespace=''): + ioctx = mgr.rados.open_ioctx(pool_name) + ioctx.set_namespace(namespace) + return ioctx + + @classmethod + @ttl_cache(30, label=RBD_IMAGE_REFS_CACHE_REFERENCE) + def _rbd_image_refs(cls, pool_name, namespace=''): + # We add and set the namespace here so that we cache by ioctx and namespace. + images = [] + ioctx = cls.get_ioctx(pool_name, namespace) + images = cls._rbd_inst.list2(ioctx) + return images + + @classmethod + @ttl_cache(30, label=POOL_NAMESPACES_CACHE) + def _pool_namespaces(cls, pool_name, namespace=None): + namespaces = [] + if namespace: + namespaces = [namespace] + else: + ioctx = cls.get_ioctx(pool_name, namespace=rados.LIBRADOS_ALL_NSPACES) + namespaces = cls._rbd_inst.namespace_list(ioctx) + # images without namespace + namespaces.append('') + return namespaces + + @classmethod + def _rbd_image_stat(cls, ioctx, pool_name, namespace, image_name): + return cls._rbd_image(ioctx, pool_name, namespace, image_name) + + @classmethod + def _rbd_image_stat_removing(cls, ioctx, pool_name, namespace, image_id): + img = cls._rbd_inst.trash_get(ioctx, image_id) + img_spec = get_image_spec(pool_name, namespace, image_id) + + if img['source'] == 'REMOVING': + img['unique_id'] = img_spec + img['pool_name'] = pool_name + img['namespace'] = namespace + img['deletion_time'] = "{}Z".format(img['deletion_time'].isoformat()) + img['deferment_end_time'] = "{}Z".format(img['deferment_end_time'].isoformat()) + return img + raise rbd.ImageNotFound('No image {} in status `REMOVING` found.'.format(img_spec), + errno=errno.ENOENT) + + @classmethod + def _rbd_pool_image_refs(cls, pool_names: List[str], namespace: Optional[str] = None): + joint_refs = [] + for pool in pool_names: + for current_namespace in cls._pool_namespaces(pool, namespace=namespace): + image_refs = cls._rbd_image_refs(pool, current_namespace) + for image in image_refs: + image['namespace'] = current_namespace + image['pool_name'] = pool + joint_refs.append(image) + return joint_refs + + @classmethod + def rbd_pool_list(cls, pool_names: List[str], namespace: Optional[str] = None, offset: int = 0, + limit: int = 5, search: str = '', sort: str = ''): + image_refs = cls._rbd_pool_image_refs(pool_names, namespace) + params = ['name', 'pool_name', 'namespace'] + paginator = ListPaginator(offset, limit, sort, search, image_refs, + searchable_params=params, sortable_params=params, + default_sort='+name') + + result = [] + for image_ref in paginator.list(): + with mgr.rados.open_ioctx(image_ref['pool_name']) as ioctx: + ioctx.set_namespace(image_ref['namespace']) + # Check if the RBD has been deleted partially. This happens for example if + # the deletion process of the RBD has been started and was interrupted. + + try: + stat = cls._rbd_image_stat( + ioctx, image_ref['pool_name'], image_ref['namespace'], image_ref['name']) + except rbd.ImageNotFound: + try: + stat = cls._rbd_image_stat_removing( + ioctx, image_ref['pool_name'], image_ref['namespace'], image_ref['id']) + except rbd.ImageNotFound: + continue + result.append(stat) + return result, paginator.get_count() + + @classmethod + def get_image(cls, image_spec): + pool_name, namespace, image_name = parse_image_spec(image_spec) + ioctx = mgr.rados.open_ioctx(pool_name) + if namespace: + ioctx.set_namespace(namespace) + try: + return cls._rbd_image(ioctx, pool_name, namespace, image_name) + except rbd.ImageNotFound: + raise cherrypy.HTTPError(404, 'Image not found') + + @classmethod + @ttl_cache_invalidator(RBD_IMAGE_REFS_CACHE_REFERENCE) + def create(cls, name, pool_name, size, namespace=None, + obj_size=None, features=None, stripe_unit=None, stripe_count=None, + data_pool=None, configuration=None, metadata=None): + size = int(size) + + def _create(ioctx): + rbd_inst = cls._rbd_inst + + # Set order + l_order = None + if obj_size and obj_size > 0: + l_order = int(round(math.log(float(obj_size), 2))) + + # Set features + feature_bitmask = format_features(features) + + rbd_inst.create(ioctx, name, size, order=l_order, old_format=False, + features=feature_bitmask, stripe_unit=stripe_unit, + stripe_count=stripe_count, data_pool=data_pool) + RbdConfiguration(pool_ioctx=ioctx, namespace=namespace, + image_name=name).set_configuration(configuration) + if metadata: + with rbd.Image(ioctx, name) as image: + RbdImageMetadataService(image).set_metadata(metadata) + rbd_call(pool_name, namespace, _create) + + @classmethod + @ttl_cache_invalidator(RBD_IMAGE_REFS_CACHE_REFERENCE) + def set(cls, image_spec, name=None, size=None, features=None, + configuration=None, metadata=None, enable_mirror=None, primary=None, + force=False, resync=False, mirror_mode=None, schedule_interval='', + remove_scheduling=False): + # pylint: disable=too-many-branches + pool_name, namespace, image_name = parse_image_spec(image_spec) + + def _edit(ioctx, image): + rbd_inst = cls._rbd_inst + # check rename image + if name and name != image_name: + rbd_inst.rename(ioctx, image_name, name) + + # check resize + if size and size != image.size(): + image.resize(size) + + mirror_image_info = image.mirror_image_get_info() + if enable_mirror and mirror_image_info['state'] == rbd.RBD_MIRROR_IMAGE_DISABLED: + RbdMirroringService.enable_image( + image_name, pool_name, namespace, + MIRROR_IMAGE_MODE[mirror_mode]) + elif (enable_mirror is False + and mirror_image_info['state'] == rbd.RBD_MIRROR_IMAGE_ENABLED): + RbdMirroringService.disable_image( + image_name, pool_name, namespace) + + # check enable/disable features + if features is not None: + curr_features = format_bitmask(image.features()) + # check disabled features + _sort_features(curr_features, enable=False) + for feature in curr_features: + if (feature not in features + and feature in cls.ALLOW_DISABLE_FEATURES + and feature in format_bitmask(image.features())): + f_bitmask = format_features([feature]) + image.update_features(f_bitmask, False) + # check enabled features + _sort_features(features) + for feature in features: + if (feature not in curr_features + and feature in cls.ALLOW_ENABLE_FEATURES + and feature not in format_bitmask(image.features())): + f_bitmask = format_features([feature]) + image.update_features(f_bitmask, True) + + RbdConfiguration(pool_ioctx=ioctx, image_name=image_name).set_configuration( + configuration) + if metadata: + RbdImageMetadataService(image).set_metadata(metadata) + + if primary and not mirror_image_info['primary']: + RbdMirroringService.promote_image( + image_name, pool_name, namespace, force) + elif primary is False and mirror_image_info['primary']: + RbdMirroringService.demote_image( + image_name, pool_name, namespace) + + if resync: + RbdMirroringService.resync_image(image_name, pool_name, namespace) + + if schedule_interval: + RbdMirroringService.snapshot_schedule_add(image_spec, schedule_interval) + + if remove_scheduling: + RbdMirroringService.snapshot_schedule_remove(image_spec) + + return rbd_image_call(pool_name, namespace, image_name, _edit) + + @classmethod + @ttl_cache_invalidator(RBD_IMAGE_REFS_CACHE_REFERENCE) + def delete(cls, image_spec): + pool_name, namespace, image_name = parse_image_spec(image_spec) + + image = RbdService.get_image(image_spec) + snapshots = image['snapshots'] + for snap in snapshots: + RbdSnapshotService.remove_snapshot(image_spec, snap['name'], snap['is_protected']) + + rbd_inst = rbd.RBD() + return rbd_call(pool_name, namespace, rbd_inst.remove, image_name) + + @classmethod + @ttl_cache_invalidator(RBD_IMAGE_REFS_CACHE_REFERENCE) + def copy(cls, image_spec, dest_pool_name, dest_namespace, dest_image_name, + snapshot_name=None, obj_size=None, features=None, + stripe_unit=None, stripe_count=None, data_pool=None, + configuration=None, metadata=None): + pool_name, namespace, image_name = parse_image_spec(image_spec) + + def _src_copy(s_ioctx, s_img): + def _copy(d_ioctx): + # Set order + l_order = None + if obj_size and obj_size > 0: + l_order = int(round(math.log(float(obj_size), 2))) + + # Set features + feature_bitmask = format_features(features) + + if snapshot_name: + s_img.set_snap(snapshot_name) + + s_img.copy(d_ioctx, dest_image_name, feature_bitmask, l_order, + stripe_unit, stripe_count, data_pool) + RbdConfiguration(pool_ioctx=d_ioctx, image_name=dest_image_name).set_configuration( + configuration) + if metadata: + with rbd.Image(d_ioctx, dest_image_name) as image: + RbdImageMetadataService(image).set_metadata(metadata) + + return rbd_call(dest_pool_name, dest_namespace, _copy) + + return rbd_image_call(pool_name, namespace, image_name, _src_copy) + + @classmethod + @ttl_cache_invalidator(RBD_IMAGE_REFS_CACHE_REFERENCE) + def flatten(cls, image_spec): + def _flatten(ioctx, image): + image.flatten() + + pool_name, namespace, image_name = parse_image_spec(image_spec) + return rbd_image_call(pool_name, namespace, image_name, _flatten) + + @classmethod + def move_image_to_trash(cls, image_spec, delay): + pool_name, namespace, image_name = parse_image_spec(image_spec) + rbd_inst = cls._rbd_inst + return rbd_call(pool_name, namespace, rbd_inst.trash_move, image_name, delay) + + +class RbdSnapshotService(object): + + @classmethod + def remove_snapshot(cls, image_spec, snapshot_name, unprotect=False): + def _remove_snapshot(ioctx, img, snapshot_name, unprotect): + if unprotect: + img.unprotect_snap(snapshot_name) + img.remove_snap(snapshot_name) + + pool_name, namespace, image_name = parse_image_spec(image_spec) + return rbd_image_call(pool_name, namespace, image_name, + _remove_snapshot, snapshot_name, unprotect) + + +class RBDSchedulerInterval: + def __init__(self, interval: str): + self.amount = int(interval[:-1]) + self.unit = interval[-1] + if self.unit not in 'mhd': + raise ValueError(f'Invalid interval unit {self.unit}') + + def __str__(self): + return f'{self.amount}{self.unit}' + + +class RbdMirroringService: + + @classmethod + def enable_image(cls, image_name: str, pool_name: str, namespace: str, mode: MIRROR_IMAGE_MODE): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_enable(mode)) + + @classmethod + def disable_image(cls, image_name: str, pool_name: str, namespace: str, force: bool = False): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_disable(force)) + + @classmethod + def promote_image(cls, image_name: str, pool_name: str, namespace: str, force: bool = False): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_promote(force)) + + @classmethod + def demote_image(cls, image_name: str, pool_name: str, namespace: str): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_demote()) + + @classmethod + def resync_image(cls, image_name: str, pool_name: str, namespace: str): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_resync()) + + @classmethod + def snapshot_schedule_add(cls, image_spec: str, interval: str): + _rbd_support_remote('mirror_snapshot_schedule_add', image_spec, + str(RBDSchedulerInterval(interval))) + + @classmethod + def snapshot_schedule_remove(cls, image_spec: str): + _rbd_support_remote('mirror_snapshot_schedule_remove', image_spec) + + +class RbdImageMetadataService(object): + def __init__(self, image): + self._image = image + + def list(self): + result = self._image.metadata_list() + # filter out configuration metadata + return {v[0]: v[1] for v in result if not v[0].startswith('conf_')} + + def get(self, name): + return self._image.metadata_get(name) + + def set(self, name, value): + self._image.metadata_set(name, value) + + def remove(self, name): + try: + self._image.metadata_remove(name) + except KeyError: + pass + + def set_metadata(self, metadata): + for name, value in metadata.items(): + if value is not None: + self.set(name, value) + else: + self.remove(name) diff --git a/src/pybind/mgr/dashboard/services/rgw_client.py b/src/pybind/mgr/dashboard/services/rgw_client.py new file mode 100644 index 000000000..5120806d8 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rgw_client.py @@ -0,0 +1,1638 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0302 +# pylint: disable=too-many-branches +# pylint: disable=too-many-lines + +import ipaddress +import json +import logging +import os +import re +import xml.etree.ElementTree as ET # noqa: N814 +from subprocess import SubprocessError + +from mgr_util import build_url, name_to_config_section + +from .. import mgr +from ..awsauth import S3Auth +from ..exceptions import DashboardException +from ..rest_client import RequestException, RestClient +from ..settings import Settings +from ..tools import dict_contains_path, dict_get, json_str_to_object, str_to_bool +from .ceph_service import CephService + +try: + from typing import Any, Dict, List, Optional, Tuple, Union +except ImportError: + pass # For typing only + +logger = logging.getLogger('rgw_client') + + +class NoRgwDaemonsException(Exception): + def __init__(self): + super().__init__('No RGW service is running.') + + +class NoCredentialsException(Exception): + def __init__(self): + super(NoCredentialsException, self).__init__( + 'No RGW credentials found, ' + 'please consult the documentation on how to enable RGW for ' + 'the dashboard.') + + +class RgwAdminException(Exception): + pass + + +class RgwDaemon: + """Simple representation of a daemon.""" + host: str + name: str + port: int + ssl: bool + realm_name: str + zonegroup_name: str + zone_name: str + + +def _get_daemons() -> Dict[str, RgwDaemon]: + """ + Retrieve RGW daemon info from MGR. + """ + service_map = mgr.get('service_map') + if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']): + raise NoRgwDaemonsException + + daemons = {} + daemon_map = service_map['services']['rgw']['daemons'] + for key in daemon_map.keys(): + if dict_contains_path(daemon_map[key], ['metadata', 'frontend_config#0']): + daemon = _determine_rgw_addr(daemon_map[key]) + daemon.name = daemon_map[key]['metadata']['id'] + daemon.realm_name = daemon_map[key]['metadata']['realm_name'] + daemon.zonegroup_name = daemon_map[key]['metadata']['zonegroup_name'] + daemon.zone_name = daemon_map[key]['metadata']['zone_name'] + daemons[daemon.name] = daemon + logger.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s', + daemon.host, daemon.port, str(daemon.ssl)) + if not daemons: + raise NoRgwDaemonsException + + return daemons + + +def _determine_rgw_addr(daemon_info: Dict[str, Any]) -> RgwDaemon: + """ + Parse RGW daemon info to determine the configured host (IP address) and port. + """ + daemon = RgwDaemon() + rgw_dns_name = CephService.send_command('mon', 'config get', + who=name_to_config_section('rgw.' + daemon_info['metadata']['id']), # noqa E501 #pylint: disable=line-too-long + key='rgw_dns_name').rstrip() + + daemon.port, daemon.ssl = _parse_frontend_config(daemon_info['metadata']['frontend_config#0']) + + if rgw_dns_name: + daemon.host = rgw_dns_name + elif daemon.ssl: + daemon.host = daemon_info['metadata']['hostname'] + else: + daemon.host = _parse_addr(daemon_info['addr']) + + return daemon + + +def _parse_addr(value) -> str: + """ + Get the IP address the RGW is running on. + + >>> _parse_addr('192.168.178.3:49774/1534999298') + '192.168.178.3' + + >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298') + '2001:db8:85a3::8a2e:370:7334' + + >>> _parse_addr('xyz') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + >>> _parse_addr('192.168.178.a:8080/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '192.168.178.a' found + + >>> _parse_addr('[2001:0db8:1234]:443/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '2001:0db8:1234' found + + >>> _parse_addr('2001:0db8::1234:49774/1534999298') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'. + :type: str + :raises LookupError if parsing fails to determine the IP address. + :return: The IP address. + :rtype: str + """ + match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value) + if match: + # IPv4: + # Group 0: 192.168.178.3:49774/1534999298 + # Group 3: 192.168.178.3 + # IPv6: + # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298 + # Group 1: [ + # Group 2: 2001:db8:85a3::8a2e:370:7334 + addr = match.group(3) if match.group(3) else match.group(2) + try: + ipaddress.ip_address(addr) + return addr + except ValueError: + raise LookupError('Invalid RGW address \'{}\' found'.format(addr)) + raise LookupError('Failed to determine RGW address') + + +def _parse_frontend_config(config) -> Tuple[int, bool]: + """ + Get the port the RGW is running on. Due the complexity of the + syntax not all variations are supported. + + If there are multiple (ssl_)ports/(ssl_)endpoints options, then + the first found option will be returned. + + Get more details about the configuration syntax here: + http://docs.ceph.com/en/latest/radosgw/frontends/ + https://civetweb.github.io/civetweb/UserManual.html + + :param config: The configuration string to parse. + :type config: str + :raises LookupError if parsing fails to determine the port. + :return: A tuple containing the port number and the information + whether SSL is used. + :rtype: (int, boolean) + """ + match = re.search(r'^(beast|civetweb)\s+.+$', config) + if match: + if match.group(1) == 'beast': + match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)', + config) + if match: + option_name = match.group(1) + if option_name in ['port', 'ssl_port']: + match = re.search(r'(\d+)', match.group(2)) + if match: + port = int(match.group(1)) + ssl = option_name == 'ssl_port' + return port, ssl + if option_name in ['endpoint', 'ssl_endpoint']: + match = re.search(r'([\d.]+|\[.+\])(:(\d+))?', + match.group(2)) # type: ignore + if match: + port = int(match.group(3)) if \ + match.group(2) is not None else 443 if \ + option_name == 'ssl_endpoint' else \ + 80 + ssl = option_name == 'ssl_endpoint' + return port, ssl + if match.group(1) == 'civetweb': # type: ignore + match = re.search(r'port=(.*:)?(\d+)(s)?', config) + if match: + port = int(match.group(2)) + ssl = match.group(3) == 's' + return port, ssl + raise LookupError('Failed to determine RGW port from "{}"'.format(config)) + + +def _parse_secrets(user: str, data: dict) -> Tuple[str, str]: + for key in data.get('keys', []): + if key.get('user') == user and data.get('system') in ['true', True]: + access_key = key.get('access_key') + secret_key = key.get('secret_key') + return access_key, secret_key + return '', '' + + +def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]: + access_key = '' + secret_key = '' + rgw_user_info_cmd = ['user', 'info', '--uid', user] + cmd_realm_option = ['--rgw-realm', realm] if realm else [] + if realm: + rgw_user_info_cmd += cmd_realm_option + try: + _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd) + if out: + access_key, secret_key = _parse_secrets(user, out) + if not access_key: + rgw_create_user_cmd = [ + 'user', 'create', + '--uid', user, + '--display-name', 'Ceph Dashboard', + '--system', + ] + cmd_realm_option + _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd) + if out: + access_key, secret_key = _parse_secrets(user, out) + if not access_key: + logger.error('Unable to create rgw user "%s": %s', user, err) + except SubprocessError as error: + logger.exception(error) + + return access_key, secret_key + + +def configure_rgw_credentials(): + logger.info('Configuring dashboard RGW credentials') + user = 'dashboard' + realms = [] + access_key = '' + secret_key = '' + try: + _, out, err = mgr.send_rgwadmin_command(['realm', 'list']) + if out: + realms = out.get('realms', []) + if err: + logger.error('Unable to list RGW realms: %s', err) + if realms: + realm_access_keys = {} + realm_secret_keys = {} + for realm in realms: + realm_access_key, realm_secret_key = _get_user_keys(user, realm) + if realm_access_key: + realm_access_keys[realm] = realm_access_key + realm_secret_keys[realm] = realm_secret_key + if realm_access_keys: + access_key = json.dumps(realm_access_keys) + secret_key = json.dumps(realm_secret_keys) + else: + access_key, secret_key = _get_user_keys(user) + + assert access_key and secret_key + Settings.RGW_API_ACCESS_KEY = access_key + Settings.RGW_API_SECRET_KEY = secret_key + except (AssertionError, SubprocessError) as error: + logger.exception(error) + raise NoCredentialsException + + +# pylint: disable=R0904 +class RgwClient(RestClient): + _host = None + _port = None + _ssl = None + _user_instances = {} # type: Dict[str, Dict[str, RgwClient]] + _config_instances = {} # type: Dict[str, RgwClient] + _rgw_settings_snapshot = None + _daemons: Dict[str, RgwDaemon] = {} + daemon: RgwDaemon + got_keys_from_config: bool + userid: str + + @staticmethod + def _handle_response_status_code(status_code: int) -> int: + # Do not return auth error codes (so they are not handled as ceph API user auth errors). + return 404 if status_code in [401, 403] else status_code + + @staticmethod + def _get_daemon_connection_info(daemon_name: str) -> dict: + try: + realm_name = RgwClient._daemons[daemon_name].realm_name + access_key = Settings.RGW_API_ACCESS_KEY[realm_name] + secret_key = Settings.RGW_API_SECRET_KEY[realm_name] + except TypeError: + # Legacy string values. + access_key = Settings.RGW_API_ACCESS_KEY + secret_key = Settings.RGW_API_SECRET_KEY + except KeyError as error: + raise DashboardException(msg='Credentials not found for RGW Daemon: {}'.format(error), + http_status_code=404, + component='rgw') + + return {'access_key': access_key, 'secret_key': secret_key} + + def _get_daemon_zone_info(self): # type: () -> dict + return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None)) + + def _get_realms_info(self): # type: () -> dict + return json_str_to_object(self.proxy('GET', 'realm?list', None, None)) + + def _get_realm_info(self, realm_id: str) -> Dict[str, Any]: + return json_str_to_object(self.proxy('GET', f'realm?id={realm_id}', None, None)) + + @staticmethod + def _rgw_settings(): + return (Settings.RGW_API_ACCESS_KEY, + Settings.RGW_API_SECRET_KEY, + Settings.RGW_API_ADMIN_RESOURCE, + Settings.RGW_API_SSL_VERIFY) + + @staticmethod + def instance(userid: Optional[str] = None, + daemon_name: Optional[str] = None) -> 'RgwClient': + # pylint: disable=too-many-branches + + RgwClient._daemons = _get_daemons() + + # The API access key and secret key are mandatory for a minimal configuration. + if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY): + configure_rgw_credentials() + + if not daemon_name: + # Select 1st daemon: + daemon_name = next(iter(RgwClient._daemons.keys())) + + # Discard all cached instances if any rgw setting has changed + if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings(): + RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings() + RgwClient.drop_instance() + + if daemon_name not in RgwClient._config_instances: + connection_info = RgwClient._get_daemon_connection_info(daemon_name) + RgwClient._config_instances[daemon_name] = RgwClient(connection_info['access_key'], + connection_info['secret_key'], + daemon_name) + + if not userid or userid == RgwClient._config_instances[daemon_name].userid: + return RgwClient._config_instances[daemon_name] + + if daemon_name not in RgwClient._user_instances \ + or userid not in RgwClient._user_instances[daemon_name]: + # Get the access and secret keys for the specified user. + keys = RgwClient._config_instances[daemon_name].get_user_keys(userid) + if not keys: + raise RequestException( + "User '{}' does not have any keys configured.".format( + userid)) + instance = RgwClient(keys['access_key'], + keys['secret_key'], + daemon_name, + userid) + RgwClient._user_instances.update({daemon_name: {userid: instance}}) + + return RgwClient._user_instances[daemon_name][userid] + + @staticmethod + def admin_instance(daemon_name: Optional[str] = None) -> 'RgwClient': + return RgwClient.instance(daemon_name=daemon_name) + + @staticmethod + def drop_instance(instance: Optional['RgwClient'] = None): + """ + Drop a cached instance or all. + """ + if instance: + if instance.got_keys_from_config: + del RgwClient._config_instances[instance.daemon.name] + else: + del RgwClient._user_instances[instance.daemon.name][instance.userid] + else: + RgwClient._config_instances.clear() + RgwClient._user_instances.clear() + + def _reset_login(self): + if self.got_keys_from_config: + raise RequestException('Authentication failed for the "{}" user: wrong credentials' + .format(self.userid), status_code=401) + logger.info("Fetching new keys for user: %s", self.userid) + keys = RgwClient.admin_instance(daemon_name=self.daemon.name).get_user_keys(self.userid) + self.auth = S3Auth(keys['access_key'], keys['secret_key'], + service_url=self.service_url) + + def __init__(self, + access_key: str, + secret_key: str, + daemon_name: str, + user_id: Optional[str] = None) -> None: + try: + daemon = RgwClient._daemons[daemon_name] + except KeyError as error: + raise DashboardException(msg='RGW Daemon not found: {}'.format(error), + http_status_code=404, + component='rgw') + ssl_verify = Settings.RGW_API_SSL_VERIFY + self.admin_path = Settings.RGW_API_ADMIN_RESOURCE + self.service_url = build_url(host=daemon.host, port=daemon.port) + + self.auth = S3Auth(access_key, secret_key, service_url=self.service_url) + super(RgwClient, self).__init__(daemon.host, + daemon.port, + 'RGW', + daemon.ssl, + self.auth, + ssl_verify=ssl_verify) + self.got_keys_from_config = not user_id + try: + self.userid = self._get_user_id(self.admin_path) if self.got_keys_from_config \ + else user_id + except RequestException as error: + logger.exception(error) + msg = 'Error connecting to Object Gateway' + if error.status_code == 404: + msg = '{}: {}'.format(msg, str(error)) + raise DashboardException(msg=msg, + http_status_code=error.status_code, + component='rgw') + self.daemon = daemon + + logger.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d", + daemon.name, daemon.host, daemon.port, daemon.ssl, ssl_verify) + + @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)') + def is_service_online(self, request=None) -> bool: + """ + Consider the service as online if the response contains the + specified keys. Nothing more is checked here. + """ + _ = request({'format': 'json'}) + return True + + @RestClient.api_get('/{admin_path}/metadata/user?myself', + resp_structure='data > user_id') + def _get_user_id(self, admin_path, request=None): + # pylint: disable=unused-argument + """ + Get the user ID of the user that is used to communicate with the + RGW Admin Ops API. + :rtype: str + :return: The user ID of the user that is used to sign the + RGW Admin Ops API calls. + """ + response = request() + return response['data']['user_id'] + + @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]') + def _user_exists(self, admin_path, user_id, request=None): + # pylint: disable=unused-argument + response = request() + if user_id: + return user_id in response + return self.userid in response + + def user_exists(self, user_id=None): + return self._user_exists(self.admin_path, user_id) + + @RestClient.api_get('/{admin_path}/metadata/user?key={userid}', + resp_structure='data > system') + def _is_system_user(self, admin_path, userid, request=None) -> bool: + # pylint: disable=unused-argument + response = request() + return response['data']['system'] + + def is_system_user(self) -> bool: + return self._is_system_user(self.admin_path, self.userid) + + @RestClient.api_get( + '/{admin_path}/user', + resp_structure='tenant & user_id & email & keys[*] > ' + ' (user & access_key & secret_key)') + def _admin_get_user_keys(self, admin_path, userid, request=None): + # pylint: disable=unused-argument + colon_idx = userid.find(':') + user = userid if colon_idx == -1 else userid[:colon_idx] + response = request({'uid': user}) + for key in response['keys']: + if key['user'] == userid: + return { + 'access_key': key['access_key'], + 'secret_key': key['secret_key'] + } + return None + + def get_user_keys(self, userid): + return self._admin_get_user_keys(self.admin_path, userid) + + @RestClient.api('/{admin_path}/{path}') + def _proxy_request( + self, # pylint: disable=too-many-arguments + admin_path, + path, + method, + params, + data, + request=None): + # pylint: disable=unused-argument + return request(method=method, params=params, data=data, + raw_content=True) + + def proxy(self, method, path, params, data): + logger.debug("proxying method=%s path=%s params=%s data=%s", + method, path, params, data) + return self._proxy_request(self.admin_path, path, method, + params, data) + + @RestClient.api_get('/', resp_structure='[1][*] > Name') + def get_buckets(self, request=None): + """ + Get a list of names from all existing buckets of this user. + :return: Returns a list of bucket names. + """ + response = request({'format': 'json'}) + return [bucket['Name'] for bucket in response[1]] + + @RestClient.api_get('/{bucket_name}') + def bucket_exists(self, bucket_name, userid, request=None): + """ + Check if the specified bucket exists for this user. + :param bucket_name: The name of the bucket. + :return: Returns True if the bucket exists, otherwise False. + """ + # pylint: disable=unused-argument + try: + request() + my_buckets = self.get_buckets() + if bucket_name not in my_buckets: + raise RequestException( + 'Bucket "{}" belongs to other user'.format(bucket_name), + 403) + return True + except RequestException as e: + if e.status_code == 404: + return False + + raise e + + @RestClient.api_put('/{bucket_name}') + def create_bucket(self, bucket_name, zonegroup=None, + placement_target=None, lock_enabled=False, + request=None): + logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s", + bucket_name, zonegroup, placement_target) + data = None + if zonegroup and placement_target: + create_bucket_configuration = ET.Element('CreateBucketConfiguration') + location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint') + location_constraint.text = '{}:{}'.format(zonegroup, placement_target) + data = ET.tostring(create_bucket_configuration, encoding='unicode') + + headers = None # type: Optional[dict] + if lock_enabled: + headers = {'x-amz-bucket-object-lock-enabled': 'true'} + + return request(data=data, headers=headers) + + def get_placement_targets(self): # type: () -> dict + zone = self._get_daemon_zone_info() + placement_targets = [] # type: List[Dict] + for placement_pool in zone['placement_pools']: + placement_targets.append( + { + 'name': placement_pool['key'], + 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool'] + } + ) + + return {'zonegroup': self.daemon.zonegroup_name, + 'placement_targets': placement_targets} + + def get_realms(self): # type: () -> List + realms_info = self._get_realms_info() + if 'realms' in realms_info and realms_info['realms']: + return realms_info['realms'] + return [] + + def get_default_realm(self): + realms_info = self._get_realms_info() + if 'default_info' in realms_info and realms_info['default_info']: + realm_info = self._get_realm_info(realms_info['default_info']) + if 'name' in realm_info and realm_info['name']: + return realm_info['name'] + return None + + @RestClient.api_get('/{bucket_name}?versioning') + def get_bucket_versioning(self, bucket_name, request=None): + """ + Get bucket versioning. + :param str bucket_name: the name of the bucket. + :return: versioning info + :rtype: Dict + """ + # pylint: disable=unused-argument + result = request() + if 'Status' not in result: + result['Status'] = 'Suspended' + if 'MfaDelete' not in result: + result['MfaDelete'] = 'Disabled' + return result + + @RestClient.api_put('/{bucket_name}?versioning') + def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete, + mfa_token_serial, mfa_token_pin, request=None): + """ + Set bucket versioning. + :param str bucket_name: the name of the bucket. + :param str versioning_state: + https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html + :param str mfa_delete: MFA Delete state. + :param str mfa_token_serial: + https://docs.ceph.com/docs/master/radosgw/mfa/ + :param str mfa_token_pin: value of a TOTP token at a certain time (auth code) + :return: None + """ + # pylint: disable=unused-argument + versioning_configuration = ET.Element('VersioningConfiguration') + status_element = ET.SubElement(versioning_configuration, 'Status') + status_element.text = versioning_state + + headers = {} + if mfa_delete and mfa_token_serial and mfa_token_pin: + headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin) + mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete') + mfa_delete_element.text = mfa_delete + + data = ET.tostring(versioning_configuration, encoding='unicode') + + try: + request(data=data, headers=headers) + except RequestException as error: + msg = str(error) + if mfa_delete and mfa_token_serial and mfa_token_pin \ + and 'AccessDenied' in error.content.decode(): + msg = 'Bad MFA credentials: {}'.format(msg) + raise DashboardException(msg=msg, + http_status_code=error.status_code, + component='rgw') + + @RestClient.api_get('/{bucket_name}?encryption') + def get_bucket_encryption(self, bucket_name, request=None): + # pylint: disable=unused-argument + try: + result = request() # type: ignore + result['Status'] = 'Enabled' + return result + except RequestException as e: + if e.content: + content = json_str_to_object(e.content) + if content.get( + 'Code') == 'ServerSideEncryptionConfigurationNotFoundError': + return { + 'Status': 'Disabled', + } + raise e + + @RestClient.api_delete('/{bucket_name}?encryption') + def delete_bucket_encryption(self, bucket_name, request=None): + # pylint: disable=unused-argument + result = request() # type: ignore + return result + + @RestClient.api_put('/{bucket_name}?encryption') + def set_bucket_encryption(self, bucket_name, key_id, + sse_algorithm, request: Optional[object] = None): + # pylint: disable=unused-argument + encryption_configuration = ET.Element('ServerSideEncryptionConfiguration') + rule_element = ET.SubElement(encryption_configuration, 'Rule') + default_encryption_element = ET.SubElement(rule_element, + 'ApplyServerSideEncryptionByDefault') + sse_algo_element = ET.SubElement(default_encryption_element, + 'SSEAlgorithm') + sse_algo_element.text = sse_algorithm + if sse_algorithm == 'aws:kms': + kms_master_key_element = ET.SubElement(default_encryption_element, + 'KMSMasterKeyID') + kms_master_key_element.text = key_id + data = ET.tostring(encryption_configuration, encoding='unicode') + try: + _ = request(data=data) # type: ignore + except RequestException as e: + raise DashboardException(msg=str(e), component='rgw') + + @RestClient.api_get('/{bucket_name}?object-lock') + def get_bucket_locking(self, bucket_name, request=None): + # type: (str, Optional[object]) -> dict + """ + Gets the locking configuration for a bucket. The locking + configuration will be applied by default to every new object + placed in the specified bucket. + :param bucket_name: The name of the bucket. + :type bucket_name: str + :return: The locking configuration. + :rtype: Dict + """ + # pylint: disable=unused-argument + + # Try to get the Object Lock configuration. If there is none, + # then return default values. + try: + result = request() # type: ignore + return { + 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled', + 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'), + 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0), + 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0) + } + except RequestException as e: + if e.content: + content = json_str_to_object(e.content) + if content.get( + 'Code') == 'ObjectLockConfigurationNotFoundError': + return { + 'lock_enabled': False, + 'lock_mode': 'compliance', + 'lock_retention_period_days': None, + 'lock_retention_period_years': None + } + raise e + + @RestClient.api_put('/{bucket_name}?object-lock') + def set_bucket_locking(self, + bucket_name: str, + mode: str, + retention_period_days: Optional[Union[int, str]] = None, + retention_period_years: Optional[Union[int, str]] = None, + request: Optional[object] = None) -> None: + """ + Places the locking configuration on the specified bucket. The + locking configuration will be applied by default to every new + object placed in the specified bucket. + :param bucket_name: The name of the bucket. + :type bucket_name: str + :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`. + :type mode: str + :param retention_period_days: + :type retention_period_days: int + :param retention_period_years: + :type retention_period_years: int + :rtype: None + """ + # pylint: disable=unused-argument + + retention_period_days, retention_period_years = self.perform_validations( + retention_period_days, retention_period_years, mode) + + # Generate the XML data like this: + # <ObjectLockConfiguration> + # <ObjectLockEnabled>string</ObjectLockEnabled> + # <Rule> + # <DefaultRetention> + # <Days>integer</Days> + # <Mode>string</Mode> + # <Years>integer</Years> + # </DefaultRetention> + # </Rule> + # </ObjectLockConfiguration> + locking_configuration = ET.Element('ObjectLockConfiguration') + enabled_element = ET.SubElement(locking_configuration, + 'ObjectLockEnabled') + enabled_element.text = 'Enabled' # Locking can't be disabled. + rule_element = ET.SubElement(locking_configuration, 'Rule') + default_retention_element = ET.SubElement(rule_element, + 'DefaultRetention') + mode_element = ET.SubElement(default_retention_element, 'Mode') + mode_element.text = mode.upper() + if retention_period_days: + days_element = ET.SubElement(default_retention_element, 'Days') + days_element.text = str(retention_period_days) + if retention_period_years: + years_element = ET.SubElement(default_retention_element, 'Years') + years_element.text = str(retention_period_years) + + data = ET.tostring(locking_configuration, encoding='unicode') + + try: + _ = request(data=data) # type: ignore + except RequestException as e: + raise DashboardException(msg=str(e), component='rgw') + + def list_roles(self) -> List[Dict[str, Any]]: + rgw_list_roles_command = ['role', 'list'] + code, roles, err = mgr.send_rgwadmin_command(rgw_list_roles_command) + if code < 0: + logger.warning('Error listing roles with code %d: %s', code, err) + return [] + + return roles + + def create_role(self, role_name: str, role_path: str, role_assume_policy_doc: str) -> None: + try: + json.loads(role_assume_policy_doc) + except: # noqa: E722 + raise DashboardException('Assume role policy document is not a valid json') + + # valid values: + # pylint: disable=C0301 + # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html#cfn-iam-role-path # noqa: E501 + if len(role_name) > 64: + raise DashboardException( + f'Role name "{role_name}" is invalid. Should be 64 characters or less') + + role_name_regex = '[0-9a-zA-Z_+=,.@-]+' + if not re.fullmatch(role_name_regex, role_name): + raise DashboardException( + f'Role name "{role_name}" is invalid. Valid characters are "{role_name_regex}"') + + if not os.path.isabs(role_path): + raise DashboardException( + f'Role path "{role_path}" is invalid. It should be an absolute path') + if role_path[-1] != '/': + raise DashboardException( + f'Role path "{role_path}" is invalid. It should start and end with a slash') + path_regex = '(\u002F)|(\u002F[\u0021-\u007E]+\u002F)' + if not re.fullmatch(path_regex, role_path): + raise DashboardException( + (f'Role path "{role_path}" is invalid.' + f'Role path should follow the pattern "{path_regex}"')) + + rgw_create_role_command = ['role', 'create', '--role-name', role_name, '--path', role_path] + if role_assume_policy_doc: + rgw_create_role_command += ['--assume-role-policy-doc', f"{role_assume_policy_doc}"] + + code, _roles, _err = mgr.send_rgwadmin_command(rgw_create_role_command, + stdout_as_json=False) + if code != 0: + # pylint: disable=C0301 + link = 'https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html#cfn-iam-role-path' # noqa: E501 + msg = (f'Error creating role with code {code}: ' + 'Looks like the document has a wrong format.' + f' For more information about the format look at {link}') + raise DashboardException(msg=msg, component='rgw') + + def perform_validations(self, retention_period_days, retention_period_years, mode): + try: + retention_period_days = int(retention_period_days) if retention_period_days else 0 + retention_period_years = int(retention_period_years) if retention_period_years else 0 + if retention_period_days < 0 or retention_period_years < 0: + raise ValueError + except (TypeError, ValueError): + msg = "Retention period must be a positive integer." + raise DashboardException(msg=msg, component='rgw') + if retention_period_days and retention_period_years: + # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html + msg = "Retention period requires either Days or Years. "\ + "You can't specify both at the same time." + raise DashboardException(msg=msg, component='rgw') + if not retention_period_days and not retention_period_years: + msg = "Retention period requires either Days or Years. "\ + "You must specify at least one." + raise DashboardException(msg=msg, component='rgw') + if not isinstance(mode, str) or mode.upper() not in ['COMPLIANCE', 'GOVERNANCE']: + msg = "Retention mode must be either COMPLIANCE or GOVERNANCE." + raise DashboardException(msg=msg, component='rgw') + return retention_period_days, retention_period_years + + +class RgwMultisite: + def migrate_to_multisite(self, realm_name: str, zonegroup_name: str, zone_name: str, + zonegroup_endpoints: str, zone_endpoints: str, access_key: str, + secret_key: str): + rgw_realm_create_cmd = ['realm', 'create', '--rgw-realm', realm_name, '--default'] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_realm_create_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to create realm', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + rgw_zonegroup_edit_cmd = ['zonegroup', 'rename', '--rgw-zonegroup', 'default', + '--zonegroup-new-name', zonegroup_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_edit_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to rename zonegroup to {}'.format(zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + rgw_zone_edit_cmd = ['zone', 'rename', '--rgw-zone', + 'default', '--zone-new-name', zone_name, + '--rgw-zonegroup', zonegroup_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_edit_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to rename zone to {}'.format(zone_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + rgw_zonegroup_modify_cmd = ['zonegroup', 'modify', + '--rgw-realm', realm_name, + '--rgw-zonegroup', zonegroup_name] + if zonegroup_endpoints: + rgw_zonegroup_modify_cmd.append('--endpoints') + rgw_zonegroup_modify_cmd.append(zonegroup_endpoints) + rgw_zonegroup_modify_cmd.append('--master') + rgw_zonegroup_modify_cmd.append('--default') + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_modify_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to modify zonegroup {}'.format(zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + rgw_zone_modify_cmd = ['zone', 'modify', '--rgw-realm', realm_name, + '--rgw-zonegroup', zonegroup_name, + '--rgw-zone', zone_name] + if zone_endpoints: + rgw_zone_modify_cmd.append('--endpoints') + rgw_zone_modify_cmd.append(zone_endpoints) + rgw_zone_modify_cmd.append('--master') + rgw_zone_modify_cmd.append('--default') + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_modify_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to modify zone', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + if access_key and secret_key: + rgw_zone_modify_cmd = ['zone', 'modify', '--rgw-zone', zone_name, + '--access-key', access_key, '--secret', secret_key] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_modify_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to modify zone', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def create_realm(self, realm_name: str, default: bool): + rgw_realm_create_cmd = ['realm', 'create'] + cmd_create_realm_options = ['--rgw-realm', realm_name] + if default != 'false': + cmd_create_realm_options.append('--default') + rgw_realm_create_cmd += cmd_create_realm_options + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_realm_create_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to create realm', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def list_realms(self): + rgw_realm_list = {} + rgw_realm_list_cmd = ['realm', 'list'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_realm_list_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to fetch realm list', + http_status_code=500, component='rgw') + rgw_realm_list = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return rgw_realm_list + + def get_realm(self, realm_name: str): + realm_info = {} + rgw_realm_info_cmd = ['realm', 'get', '--rgw-realm', realm_name] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_realm_info_cmd) + if exit_code > 0: + raise DashboardException('Unable to get realm info', + http_status_code=500, component='rgw') + realm_info = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return realm_info + + def get_all_realms_info(self): + all_realms_info = {} + realms_info = [] + rgw_realm_list = self.list_realms() + if 'realms' in rgw_realm_list: + if rgw_realm_list['realms'] != []: + for rgw_realm in rgw_realm_list['realms']: + realm_info = self.get_realm(rgw_realm) + realms_info.append(realm_info) + all_realms_info['realms'] = realms_info # type: ignore + else: + all_realms_info['realms'] = [] # type: ignore + if 'default_info' in rgw_realm_list and rgw_realm_list['default_info'] != '': + all_realms_info['default_realm'] = rgw_realm_list['default_info'] # type: ignore + else: + all_realms_info['default_realm'] = '' # type: ignore + return all_realms_info + + def edit_realm(self, realm_name: str, new_realm_name: str, default: str = ''): + rgw_realm_edit_cmd = [] + if new_realm_name != realm_name: + rgw_realm_edit_cmd = ['realm', 'rename', '--rgw-realm', + realm_name, '--realm-new-name', new_realm_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_realm_edit_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to edit realm', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + if default and str_to_bool(default): + rgw_realm_edit_cmd = ['realm', 'default', '--rgw-realm', new_realm_name] + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_realm_edit_cmd, False) + if exit_code > 0: + raise DashboardException(msg='Unable to set {} as default realm'.format(new_realm_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def delete_realm(self, realm_name: str): + rgw_delete_realm_cmd = ['realm', 'rm', '--rgw-realm', realm_name] + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_realm_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to delete realm', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def create_zonegroup(self, realm_name: str, zonegroup_name: str, + default: bool, master: bool, endpoints: str): + rgw_zonegroup_create_cmd = ['zonegroup', 'create'] + cmd_create_zonegroup_options = ['--rgw-zonegroup', zonegroup_name] + if realm_name != 'null': + cmd_create_zonegroup_options.append('--rgw-realm') + cmd_create_zonegroup_options.append(realm_name) + if default != 'false': + cmd_create_zonegroup_options.append('--default') + if master != 'false': + cmd_create_zonegroup_options.append('--master') + if endpoints: + cmd_create_zonegroup_options.append('--endpoints') + cmd_create_zonegroup_options.append(endpoints) + rgw_zonegroup_create_cmd += cmd_create_zonegroup_options + try: + exit_code, out, err = mgr.send_rgwadmin_command(rgw_zonegroup_create_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to get realm info', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return out + + def list_zonegroups(self): + rgw_zonegroup_list = {} + rgw_zonegroup_list_cmd = ['zonegroup', 'list'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_zonegroup_list_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to fetch zonegroup list', + http_status_code=500, component='rgw') + rgw_zonegroup_list = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return rgw_zonegroup_list + + def get_zonegroup(self, zonegroup_name: str): + zonegroup_info = {} + if zonegroup_name != 'default': + rgw_zonegroup_info_cmd = ['zonegroup', 'get', '--rgw-zonegroup', zonegroup_name] + else: + rgw_zonegroup_info_cmd = ['zonegroup', 'get', '--rgw-zonegroup', + zonegroup_name, '--rgw-realm', 'default'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_zonegroup_info_cmd) + if exit_code > 0: + raise DashboardException('Unable to get zonegroup info', + http_status_code=500, component='rgw') + zonegroup_info = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return zonegroup_info + + def get_all_zonegroups_info(self): + all_zonegroups_info = {} + zonegroups_info = [] + rgw_zonegroup_list = self.list_zonegroups() + if 'zonegroups' in rgw_zonegroup_list: + if rgw_zonegroup_list['zonegroups'] != []: + for rgw_zonegroup in rgw_zonegroup_list['zonegroups']: + zonegroup_info = self.get_zonegroup(rgw_zonegroup) + zonegroups_info.append(zonegroup_info) + all_zonegroups_info['zonegroups'] = zonegroups_info # type: ignore + else: + all_zonegroups_info['zonegroups'] = [] # type: ignore + if 'default_info' in rgw_zonegroup_list and rgw_zonegroup_list['default_info'] != '': + all_zonegroups_info['default_zonegroup'] = rgw_zonegroup_list['default_info'] + else: + all_zonegroups_info['default_zonegroup'] = '' # type: ignore + return all_zonegroups_info + + def delete_zonegroup(self, zonegroup_name: str, delete_pools: str, pools: List[str]): + if delete_pools == 'true': + zonegroup_info = self.get_zonegroup(zonegroup_name) + rgw_delete_zonegroup_cmd = ['zonegroup', 'delete', '--rgw-zonegroup', zonegroup_name] + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_zonegroup_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to delete zonegroup', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + if delete_pools == 'true': + for zone in zonegroup_info['zones']: + self.delete_zone(zone['name'], 'true', pools) + + def modify_zonegroup(self, realm_name: str, zonegroup_name: str, default: str, master: str, + endpoints: str): + + rgw_zonegroup_modify_cmd = ['zonegroup', 'modify', + '--rgw-realm', realm_name, + '--rgw-zonegroup', zonegroup_name] + if endpoints: + rgw_zonegroup_modify_cmd.append('--endpoints') + rgw_zonegroup_modify_cmd.append(endpoints) + if master and str_to_bool(master): + rgw_zonegroup_modify_cmd.append('--master') + if default and str_to_bool(default): + rgw_zonegroup_modify_cmd.append('--default') + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_modify_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to modify zonegroup {}'.format(zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def add_or_remove_zone(self, zonegroup_name: str, zone_name: str, action: str): + if action == 'add': + rgw_zonegroup_add_zone_cmd = ['zonegroup', 'add', '--rgw-zonegroup', + zonegroup_name, '--rgw-zone', zone_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_add_zone_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to add zone {} to zonegroup {}'.format(zone_name, zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + if action == 'remove': + rgw_zonegroup_rm_zone_cmd = ['zonegroup', 'remove', + '--rgw-zonegroup', zonegroup_name, '--rgw-zone', zone_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_rm_zone_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to remove zone {} from zonegroup {}'.format(zone_name, zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def get_placement_targets_by_zonegroup(self, zonegroup_name: str): + rgw_get_placement_cmd = ['zonegroup', 'placement', + 'list', '--rgw-zonegroup', zonegroup_name] + try: + exit_code, out, err = mgr.send_rgwadmin_command(rgw_get_placement_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to get placement targets', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return out + + def add_placement_targets(self, zonegroup_name: str, placement_targets: List[Dict]): + rgw_add_placement_cmd = ['zonegroup', 'placement', 'add'] + for placement_target in placement_targets: + cmd_add_placement_options = ['--rgw-zonegroup', zonegroup_name, + '--placement-id', placement_target['placement_id']] + if placement_target['tags']: + cmd_add_placement_options += ['--tags', placement_target['tags']] + rgw_add_placement_cmd += cmd_add_placement_options + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_add_placement_cmd) + if exit_code > 0: + raise DashboardException(e=err, + msg='Unable to add placement target {} to zonegroup {}'.format(placement_target['placement_id'], zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + storage_classes = placement_target['storage_class'].split(",") if placement_target['storage_class'] else [] # noqa E501 #pylint: disable=line-too-long + if storage_classes: + for sc in storage_classes: + cmd_add_placement_options = ['--storage-class', sc] + try: + exit_code, _, err = mgr.send_rgwadmin_command( + rgw_add_placement_cmd + cmd_add_placement_options) + if exit_code > 0: + raise DashboardException(e=err, + msg='Unable to add placement target {} to zonegroup {}'.format(placement_target['placement_id'], zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def modify_placement_targets(self, zonegroup_name: str, placement_targets: List[Dict]): + rgw_add_placement_cmd = ['zonegroup', 'placement', 'modify'] + for placement_target in placement_targets: + cmd_add_placement_options = ['--rgw-zonegroup', zonegroup_name, + '--placement-id', placement_target['placement_id']] + if placement_target['tags']: + cmd_add_placement_options += ['--tags', placement_target['tags']] + rgw_add_placement_cmd += cmd_add_placement_options + storage_classes = placement_target['storage_class'].split(",") if placement_target['storage_class'] else [] # noqa E501 #pylint: disable=line-too-long + if storage_classes: + for sc in storage_classes: + cmd_add_placement_options = [] + cmd_add_placement_options = ['--storage-class', sc] + try: + exit_code, _, err = mgr.send_rgwadmin_command( + rgw_add_placement_cmd + cmd_add_placement_options) + if exit_code > 0: + raise DashboardException(e=err, + msg='Unable to add placement target {} to zonegroup {}'.format(placement_target['placement_id'], zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + else: + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_add_placement_cmd) + if exit_code > 0: + raise DashboardException(e=err, + msg='Unable to add placement target {} to zonegroup {}'.format(placement_target['placement_id'], zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + # pylint: disable=W0102 + def edit_zonegroup(self, realm_name: str, zonegroup_name: str, new_zonegroup_name: str, + default: str = '', master: str = '', endpoints: str = '', + add_zones: List[str] = [], remove_zones: List[str] = [], + placement_targets: List[Dict[str, str]] = []): + rgw_zonegroup_edit_cmd = [] + if new_zonegroup_name != zonegroup_name: + rgw_zonegroup_edit_cmd = ['zonegroup', 'rename', '--rgw-zonegroup', zonegroup_name, + '--zonegroup-new-name', new_zonegroup_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zonegroup_edit_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to rename zonegroup to {}'.format(new_zonegroup_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + self.modify_zonegroup(realm_name, new_zonegroup_name, default, master, endpoints) + if add_zones: + for zone_name in add_zones: + self.add_or_remove_zone(new_zonegroup_name, zone_name, 'add') + if remove_zones: + for zone_name in remove_zones: + self.add_or_remove_zone(new_zonegroup_name, zone_name, 'remove') + existing_placement_targets = self.get_placement_targets_by_zonegroup(new_zonegroup_name) + existing_placement_targets_ids = [pt['key'] for pt in existing_placement_targets] + if placement_targets: + for pt in placement_targets: + if pt['placement_id'] in existing_placement_targets_ids: + self.modify_placement_targets(new_zonegroup_name, placement_targets) + else: + self.add_placement_targets(new_zonegroup_name, placement_targets) + + def update_period(self): + rgw_update_period_cmd = ['period', 'update', '--commit'] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_update_period_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to update period', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def create_zone(self, zone_name, zonegroup_name, default, master, endpoints, access_key, + secret_key): + rgw_zone_create_cmd = ['zone', 'create'] + cmd_create_zone_options = ['--rgw-zone', zone_name] + if zonegroup_name != 'null': + cmd_create_zone_options.append('--rgw-zonegroup') + cmd_create_zone_options.append(zonegroup_name) + if default != 'false': + cmd_create_zone_options.append('--default') + if master != 'false': + cmd_create_zone_options.append('--master') + if endpoints != 'null': + cmd_create_zone_options.append('--endpoints') + cmd_create_zone_options.append(endpoints) + if access_key is not None: + cmd_create_zone_options.append('--access-key') + cmd_create_zone_options.append(access_key) + if secret_key is not None: + cmd_create_zone_options.append('--secret') + cmd_create_zone_options.append(secret_key) + rgw_zone_create_cmd += cmd_create_zone_options + try: + exit_code, out, err = mgr.send_rgwadmin_command(rgw_zone_create_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to create zone', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + self.update_period() + return out + + def parse_secrets(self, user, data): + for key in data.get('keys', []): + if key.get('user') == user: + access_key = key.get('access_key') + secret_key = key.get('secret_key') + return access_key, secret_key + return '', '' + + def modify_zone(self, zone_name: str, zonegroup_name: str, default: str, master: str, + endpoints: str, access_key: str, secret_key: str): + rgw_zone_modify_cmd = ['zone', 'modify', '--rgw-zonegroup', + zonegroup_name, '--rgw-zone', zone_name] + if endpoints: + rgw_zone_modify_cmd.append('--endpoints') + rgw_zone_modify_cmd.append(endpoints) + if default and str_to_bool(default): + rgw_zone_modify_cmd.append('--default') + if master and str_to_bool(master): + rgw_zone_modify_cmd.append('--master') + if access_key is not None: + rgw_zone_modify_cmd.append('--access-key') + rgw_zone_modify_cmd.append(access_key) + if secret_key is not None: + rgw_zone_modify_cmd.append('--secret') + rgw_zone_modify_cmd.append(secret_key) + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_modify_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to modify zone', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def add_placement_targets_zone(self, zone_name: str, placement_target: str, data_pool: str, + index_pool: str, data_extra_pool: str): + rgw_zone_add_placement_cmd = ['zone', 'placement', 'add', '--rgw-zone', zone_name, + '--placement-id', placement_target, '--data-pool', data_pool, + '--index-pool', index_pool, + '--data-extra-pool', data_extra_pool] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_add_placement_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to add placement target {} to zone {}'.format(placement_target, zone_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def add_storage_class_zone(self, zone_name: str, placement_target: str, storage_class: str, + data_pool: str, compression: str): + rgw_zone_add_storage_class_cmd = ['zone', 'placement', 'add', '--rgw-zone', zone_name, + '--placement-id', placement_target, + '--storage-class', storage_class, + '--data-pool', data_pool, + '--compression', compression] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_add_storage_class_cmd) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to add storage class {} to zone {}'.format(storage_class, zone_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + + def edit_zone(self, zone_name: str, new_zone_name: str, zonegroup_name: str, default: str = '', + master: str = '', endpoints: str = '', access_key: str = '', secret_key: str = '', + placement_target: str = '', data_pool: str = '', index_pool: str = '', + data_extra_pool: str = '', storage_class: str = '', data_pool_class: str = '', + compression: str = ''): + if new_zone_name != zone_name: + rgw_zone_rename_cmd = ['zone', 'rename', '--rgw-zone', + zone_name, '--zone-new-name', new_zone_name] + try: + exit_code, _, err = mgr.send_rgwadmin_command(rgw_zone_rename_cmd, False) + if exit_code > 0: + raise DashboardException(e=err, msg='Unable to rename zone to {}'.format(new_zone_name), # noqa E501 #pylint: disable=line-too-long + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + self.modify_zone(new_zone_name, zonegroup_name, default, master, endpoints, access_key, + secret_key) + self.add_placement_targets_zone(new_zone_name, placement_target, + data_pool, index_pool, data_extra_pool) + self.add_storage_class_zone(new_zone_name, placement_target, storage_class, + data_pool_class, compression) + + def list_zones(self): + rgw_zone_list = {} + rgw_zone_list_cmd = ['zone', 'list'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_zone_list_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to fetch zone list', + http_status_code=500, component='rgw') + rgw_zone_list = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return rgw_zone_list + + def get_zone(self, zone_name: str): + zone_info = {} + rgw_zone_info_cmd = ['zone', 'get', '--rgw-zone', zone_name] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_zone_info_cmd) + if exit_code > 0: + raise DashboardException('Unable to get zone info', + http_status_code=500, component='rgw') + zone_info = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return zone_info + + def get_all_zones_info(self): + all_zones_info = {} + zones_info = [] + rgw_zone_list = self.list_zones() + if 'zones' in rgw_zone_list: + if rgw_zone_list['zones'] != []: + for rgw_zone in rgw_zone_list['zones']: + zone_info = self.get_zone(rgw_zone) + zones_info.append(zone_info) + all_zones_info['zones'] = zones_info # type: ignore + else: + all_zones_info['zones'] = [] + if 'default_info' in rgw_zone_list and rgw_zone_list['default_info'] != '': + all_zones_info['default_zone'] = rgw_zone_list['default_info'] # type: ignore + else: + all_zones_info['default_zone'] = '' # type: ignore + return all_zones_info + + def delete_zone(self, zone_name: str, delete_pools: str, pools: List[str], + zonegroup_name: str = '',): + rgw_remove_zone_from_zonegroup_cmd = ['zonegroup', 'remove', '--rgw-zonegroup', + zonegroup_name, '--rgw-zone', zone_name] + rgw_delete_zone_cmd = ['zone', 'delete', '--rgw-zone', zone_name] + if zonegroup_name: + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_remove_zone_from_zonegroup_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to remove zone from zonegroup', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + try: + exit_code, _, _ = mgr.send_rgwadmin_command(rgw_delete_zone_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to delete zone', + http_status_code=500, component='rgw') + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + self.update_period() + if delete_pools == 'true': + self.delete_pools(pools) + + def delete_pools(self, pools): + for pool in pools: + if mgr.rados.pool_exists(pool): + mgr.rados.delete_pool(pool) + + def create_system_user(self, userName: str, zoneName: str): + rgw_user_create_cmd = ['user', 'create', '--uid', userName, + '--display-name', userName, '--rgw-zone', zoneName, '--system'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_user_create_cmd) + if exit_code > 0: + raise DashboardException(msg='Unable to create system user', + http_status_code=500, component='rgw') + return out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + def get_user_list(self, zoneName: str): + all_users_info = [] + user_list = [] + rgw_user_list_cmd = ['user', 'list', '--rgw-zone', zoneName] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_user_list_cmd) + if exit_code > 0: + raise DashboardException('Unable to get user list', + http_status_code=500, component='rgw') + user_list = out + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + + if len(user_list) > 0: + for user_name in user_list: + rgw_user_info_cmd = ['user', 'info', '--uid', user_name, '--rgw-zone', zoneName] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_user_info_cmd) + if exit_code > 0: + raise DashboardException('Unable to get user info', + http_status_code=500, component='rgw') + all_users_info.append(out) + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return all_users_info + + def get_multisite_status(self): + is_multisite_configured = True + rgw_realm_list = self.list_realms() + rgw_zonegroup_list = self.list_zonegroups() + rgw_zone_list = self.list_zones() + if len(rgw_realm_list['realms']) < 1 and len(rgw_zonegroup_list['zonegroups']) < 1 \ + and len(rgw_zone_list['zones']) < 1: + is_multisite_configured = False + return is_multisite_configured + + def get_multisite_sync_status(self): + rgw_multisite_sync_status_cmd = ['sync', 'status'] + try: + exit_code, out, _ = mgr.send_rgwadmin_command(rgw_multisite_sync_status_cmd, False) + if exit_code > 0: + raise DashboardException('Unable to get sync status', + http_status_code=500, component='rgw') + if out: + return self.process_data(out) + except SubprocessError as error: + raise DashboardException(error, http_status_code=500, component='rgw') + return {} + + def process_data(self, data): + primary_zone_data, metadata_sync_data = self.extract_metadata_and_primary_zone_data(data) + replica_zones_info = [] + if metadata_sync_data != {}: + datasync_info = self.extract_datasync_info(data) + replica_zones_info = [self.extract_replica_zone_data(item) for item in datasync_info] + + replica_zones_info_object = { + 'metadataSyncInfo': metadata_sync_data, + 'dataSyncInfo': replica_zones_info, + 'primaryZoneData': primary_zone_data + } + + return replica_zones_info_object + + def extract_metadata_and_primary_zone_data(self, data): + primary_zone_info, metadata_sync_infoormation = self.extract_zones_data(data) + + primary_zone_tree = primary_zone_info.split('\n') if primary_zone_info else [] + realm = self.get_primary_zonedata(primary_zone_tree[0]) + zonegroup = self.get_primary_zonedata(primary_zone_tree[1]) + zone = self.get_primary_zonedata(primary_zone_tree[2]) + + primary_zone_data = [realm, zonegroup, zone] + zonegroup_info = self.get_zonegroup(zonegroup) + metadata_sync_data = {} + if len(zonegroup_info['zones']) > 1: + metadata_sync_data = self.extract_metadata_sync_data(metadata_sync_infoormation) + + return primary_zone_data, metadata_sync_data + + def extract_zones_data(self, data): + result = data + primary_zone_info = result.split('metadata sync')[0] if 'metadata sync' in result else None + metadata_sync_infoormation = result.split('metadata sync')[1] if 'metadata sync' in result else None # noqa E501 #pylint: disable=line-too-long + return primary_zone_info, metadata_sync_infoormation + + def extract_metadata_sync_data(self, metadata_sync_infoormation): + metadata_sync_info = metadata_sync_infoormation.split('data sync source')[0].strip() if 'data sync source' in metadata_sync_infoormation else None # noqa E501 #pylint: disable=line-too-long + + if metadata_sync_info == 'no sync (zone is master)': + return metadata_sync_info + + metadata_sync_data = {} + metadata_sync_info_array = metadata_sync_info.split('\n') if metadata_sync_info else [] + metadata_sync_data['syncstatus'] = metadata_sync_info_array[0].strip() if len(metadata_sync_info_array) > 0 else None # noqa E501 #pylint: disable=line-too-long + + for item in metadata_sync_info_array: + self.extract_metadata_sync_info(metadata_sync_data, item) + + metadata_sync_data['fullSyncStatus'] = metadata_sync_info_array + return metadata_sync_data + + def extract_metadata_sync_info(self, metadata_sync_data, item): + if 'oldest incremental change not applied:' in item: + metadata_sync_data['timestamp'] = item.split('applied:')[1].split()[0].strip() + + def extract_datasync_info(self, data): + metadata_sync_infoormation = data.split('metadata sync')[1] if 'metadata sync' in data else None # noqa E501 #pylint: disable=line-too-long + if 'data sync source' in metadata_sync_infoormation: + datasync_info = metadata_sync_infoormation.split('data sync source')[1].split('source:') + return datasync_info + return [] + + def extract_replica_zone_data(self, datasync_item): + replica_zone_data = {} + datasync_info_array = datasync_item.split('\n') + replica_zone_name = self.get_primary_zonedata(datasync_info_array[0]) + replica_zone_data['name'] = replica_zone_name.strip() + replica_zone_data['syncstatus'] = datasync_info_array[1].strip() + replica_zone_data['fullSyncStatus'] = datasync_info_array + for item in datasync_info_array: + self.extract_metadata_sync_info(replica_zone_data, item) + return replica_zone_data + + def get_primary_zonedata(self, data): + regex = r'\(([^)]+)\)' + match = re.search(regex, data) + + if match and match.group(1): + return match.group(1) + + return '' diff --git a/src/pybind/mgr/dashboard/services/settings.py b/src/pybind/mgr/dashboard/services/settings.py new file mode 100644 index 000000000..373d3966a --- /dev/null +++ b/src/pybind/mgr/dashboard/services/settings.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +from contextlib import contextmanager + +import cherrypy + + +class SettingsService: + @contextmanager + # pylint: disable=no-self-argument + def attribute_handler(name): + """ + :type name: str|dict[str, str] + :rtype: str|dict[str, str] + """ + if isinstance(name, dict): + result = { + _to_native(key): value + for key, value in name.items() + } + else: + result = _to_native(name) + + try: + yield result + except AttributeError: # pragma: no cover - handling is too obvious + raise cherrypy.NotFound(result) # pragma: no cover - handling is too obvious + + +def _to_native(setting): + return setting.upper().replace('-', '_') diff --git a/src/pybind/mgr/dashboard/services/sso.py b/src/pybind/mgr/dashboard/services/sso.py new file mode 100644 index 000000000..2290e6ea3 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/sso.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-return-statements,too-many-branches + +import errno +import json +import logging +import os +import threading +import warnings +from urllib import parse + +from .. import mgr +from ..tools import prepare_url_prefix + +logger = logging.getLogger('sso') + +try: + from onelogin.saml2.errors import OneLogin_Saml2_Error as Saml2Error + from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser as Saml2Parser + from onelogin.saml2.settings import OneLogin_Saml2_Settings as Saml2Settings + + python_saml_imported = True +except ImportError: + python_saml_imported = False + + +class Saml2(object): + def __init__(self, onelogin_settings): + self.onelogin_settings = onelogin_settings + + def get_username_attribute(self): + return self.onelogin_settings['sp']['attributeConsumingService']['requestedAttributes'][0][ + 'name'] + + def to_dict(self): + return { + 'onelogin_settings': self.onelogin_settings + } + + @classmethod + def from_dict(cls, s_dict): + return Saml2(s_dict['onelogin_settings']) + + +class SsoDB(object): + VERSION = 1 + SSODB_CONFIG_KEY = "ssodb_v" + + def __init__(self, version, protocol, saml2): + self.version = version + self.protocol = protocol + self.saml2 = saml2 + self.lock = threading.RLock() + + def save(self): + with self.lock: + db = { + 'protocol': self.protocol, + 'saml2': self.saml2.to_dict(), + 'version': self.version + } + mgr.set_store(self.ssodb_config_key(), json.dumps(db)) + + @classmethod + def ssodb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.SSODB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("Checking for previous DB versions") + if self.VERSION != 1: + raise NotImplementedError() + + @classmethod + def load(cls): + logger.info("Loading SSO DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.ssodb_config_key(), None) + if json_db is None: + logger.debug("No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, '', Saml2({})) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + dict_db = json.loads(json_db) # type: dict + return cls(dict_db['version'], dict_db.get('protocol'), + Saml2.from_dict(dict_db.get('saml2'))) + + +def load_sso_db(): + mgr.SSO_DB = SsoDB.load() # type: ignore + + +SSO_COMMANDS = [ + { + 'cmd': 'dashboard sso enable saml2', + 'desc': 'Enable SAML2 Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso disable', + 'desc': 'Disable Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso status', + 'desc': 'Get Single Sign-On status', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso show saml2', + 'desc': 'Show SAML2 configuration', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso setup saml2 ' + 'name=ceph_dashboard_base_url,type=CephString ' + 'name=idp_metadata,type=CephString ' + 'name=idp_username_attribute,type=CephString,req=false ' + 'name=idp_entity_id,type=CephString,req=false ' + 'name=sp_x_509_cert,type=CephFilepath,req=false ' + 'name=sp_private_key,type=CephFilepath,req=false', + 'desc': 'Setup SAML2 Single Sign-On', + 'perm': 'w' + } +] + + +def _get_optional_attr(cmd, attr, default): + if attr in cmd: + if cmd[attr] != '': + return cmd[attr] + return default + + +def handle_sso_command(cmd): + ret = -errno.ENOSYS, '', '' + if cmd['prefix'] not in ['dashboard sso enable saml2', + 'dashboard sso disable', + 'dashboard sso status', + 'dashboard sso show saml2', + 'dashboard sso setup saml2']: + return -errno.ENOSYS, '', '' + + if not python_saml_imported: + return -errno.EPERM, '', 'Required library not found: `python3-saml`' + + if cmd['prefix'] == 'dashboard sso disable': + mgr.SSO_DB.protocol = '' + mgr.SSO_DB.save() + return 0, 'SSO is "disabled".', '' + + if cmd['prefix'] == 'dashboard sso enable saml2': + configured = _is_sso_configured() + if configured: + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + return -errno.EPERM, '', 'Single Sign-On is not configured: ' \ + 'use `ceph dashboard sso setup saml2`' + + if cmd['prefix'] == 'dashboard sso status': + if mgr.SSO_DB.protocol == 'saml2': + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + + return 0, 'SSO is "disabled".', '' + + if cmd['prefix'] == 'dashboard sso show saml2': + return 0, json.dumps(mgr.SSO_DB.saml2.to_dict()), '' + + if cmd['prefix'] == 'dashboard sso setup saml2': + ret = _handle_saml_setup(cmd) + return ret + + return -errno.ENOSYS, '', '' + + +def _is_sso_configured(): + configured = True + try: + Saml2Settings(mgr.SSO_DB.saml2.onelogin_settings) + except Saml2Error: + configured = False + return configured + + +def _handle_saml_setup(cmd): + err, sp_x_509_cert, sp_private_key, has_sp_cert = _read_saml_files(cmd) + if err: + ret = -errno.EINVAL, '', err + else: + _set_saml_settings(cmd, sp_x_509_cert, sp_private_key, has_sp_cert) + ret = 0, json.dumps(mgr.SSO_DB.saml2.onelogin_settings), '' + return ret + + +def _read_saml_files(cmd): + sp_x_509_cert_path = _get_optional_attr(cmd, 'sp_x_509_cert', '') + sp_private_key_path = _get_optional_attr(cmd, 'sp_private_key', '') + has_sp_cert = sp_x_509_cert_path != "" and sp_private_key_path != "" + sp_x_509_cert = '' + sp_private_key = '' + err = None + if sp_x_509_cert_path and not sp_private_key_path: + err = 'Missing parameter `sp_private_key`.' + elif not sp_x_509_cert_path and sp_private_key_path: + err = 'Missing parameter `sp_x_509_cert`.' + elif has_sp_cert: + sp_x_509_cert, err = _try_read_file(sp_x_509_cert_path) + sp_private_key, err = _try_read_file(sp_private_key_path) + return err, sp_x_509_cert, sp_private_key, has_sp_cert + + +def _try_read_file(path): + res = "" + ret = "" + try: + with open(path, 'r', encoding='utf-8') as f: + res = f.read() + except FileNotFoundError: + ret = '`{}` not found.'.format(path) + return res, ret + + +def _set_saml_settings(cmd, sp_x_509_cert, sp_private_key, has_sp_cert): + ceph_dashboard_base_url = cmd['ceph_dashboard_base_url'] + idp_metadata = cmd['idp_metadata'] + idp_username_attribute = _get_optional_attr( + cmd, 'idp_username_attribute', 'uid') + idp_entity_id = _get_optional_attr(cmd, 'idp_entity_id', None) + idp_settings = _parse_saml_settings(idp_metadata, idp_entity_id) + + url_prefix = prepare_url_prefix( + mgr.get_module_option('url_prefix', default='')) + settings = { + 'sp': { + 'entityId': '{}{}/auth/saml2/metadata'.format(ceph_dashboard_base_url, url_prefix), + 'assertionConsumerService': { + 'url': '{}{}/auth/saml2'.format(ceph_dashboard_base_url, url_prefix), + 'binding': "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" + }, + 'attributeConsumingService': { + 'serviceName': "Ceph Dashboard", + "serviceDescription": "Ceph Dashboard Service", + "requestedAttributes": [ + { + "name": idp_username_attribute, + "isRequired": True + } + ] + }, + 'singleLogoutService': { + 'url': '{}{}/auth/saml2/logout'.format(ceph_dashboard_base_url, url_prefix), + 'binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect' + }, + "x509cert": sp_x_509_cert, + "privateKey": sp_private_key + }, + 'security': { + "nameIdEncrypted": has_sp_cert, + "authnRequestsSigned": has_sp_cert, + "logoutRequestSigned": has_sp_cert, + "logoutResponseSigned": has_sp_cert, + "signMetadata": has_sp_cert, + "wantMessagesSigned": has_sp_cert, + "wantAssertionsSigned": has_sp_cert, + "wantAssertionsEncrypted": has_sp_cert, + # Not all Identity Providers support this. + "wantNameIdEncrypted": False, + "metadataValidUntil": '', + "wantAttributeStatement": False + } + } + settings = Saml2Parser.merge_settings(settings, idp_settings) + mgr.SSO_DB.saml2.onelogin_settings = settings + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + + +def _parse_saml_settings(idp_metadata, idp_entity_id): + if os.path.isfile(idp_metadata): + warnings.warn( + "Please prepend 'file://' to indicate a local SAML2 IdP file", DeprecationWarning) + with open(idp_metadata, 'r', encoding='utf-8') as f: + idp_settings = Saml2Parser.parse(f.read(), entity_id=idp_entity_id) + elif parse.urlparse(idp_metadata)[0] in ('http', 'https', 'file'): + idp_settings = Saml2Parser.parse_remote( + url=idp_metadata, validate_cert=False, entity_id=idp_entity_id) + else: + idp_settings = Saml2Parser.parse(idp_metadata, entity_id=idp_entity_id) + return idp_settings diff --git a/src/pybind/mgr/dashboard/services/tcmu_service.py b/src/pybind/mgr/dashboard/services/tcmu_service.py new file mode 100644 index 000000000..a81b6e8f2 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/tcmu_service.py @@ -0,0 +1,113 @@ +from mgr_util import get_most_recent_rate + +from dashboard.services.ceph_service import CephService + +from .. import mgr + +try: + from typing import Dict +except ImportError: + pass # Just for type checking + + +SERVICE_TYPE = 'tcmu-runner' + + +class TcmuService(object): + # pylint: disable=too-many-nested-blocks + # pylint: disable=too-many-branches + @staticmethod + def get_iscsi_info(): + daemons = {} # type: Dict[str, dict] + images = {} # type: Dict[str, dict] + daemon = None + for service in CephService.get_service_list(SERVICE_TYPE): + metadata = service['metadata'] + if metadata is None: + continue + status = service['status'] + hostname = service['hostname'] + + daemon = daemons.get(hostname, None) + if daemon is None: + daemon = { + 'server_hostname': hostname, + 'version': metadata['ceph_version'], + 'optimized_paths': 0, + 'non_optimized_paths': 0 + } + daemons[hostname] = daemon + + service_id = service['id'] + device_id = service_id.split(':')[-1] + image = images.get(device_id) + if image is None: + image = { + 'device_id': device_id, + 'pool_name': metadata['pool_name'], + 'name': metadata['image_name'], + 'id': metadata.get('image_id', None), + 'optimized_paths': [], + 'non_optimized_paths': [] + } + images[device_id] = image + + if status.get('lock_owner', 'false') == 'true': + daemon['optimized_paths'] += 1 + image['optimized_paths'].append(hostname) + + perf_key_prefix = "librbd-{id}-{pool}-{name}.".format( + id=metadata.get('image_id', ''), + pool=metadata['pool_name'], + name=metadata['image_name']) + perf_key = "{}lock_acquired_time".format(perf_key_prefix) + perf_value = mgr.get_counter('tcmu-runner', + service_id, + perf_key)[perf_key] + if perf_value: + lock_acquired_time = perf_value[-1][1] / 1000000000 + else: + lock_acquired_time = 0 + if lock_acquired_time > image.get('optimized_since', 0): + image['optimized_daemon'] = hostname + image['optimized_since'] = lock_acquired_time + image['stats'] = {} + image['stats_history'] = {} + for s in ['rd', 'wr', 'rd_bytes', 'wr_bytes']: + perf_key = "{}{}".format(perf_key_prefix, s) + rates = CephService.get_rates('tcmu-runner', service_id, perf_key) + image['stats'][s] = get_most_recent_rate(rates) + image['stats_history'][s] = rates + else: + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(hostname) + + # clear up races w/ tcmu-runner clients that haven't detected + # loss of optimized path + TcmuService.remove_undetected_clients(images, daemons, daemon) + + return { + 'daemons': sorted(daemons.values(), + key=lambda d: d['server_hostname']), + 'images': sorted(images.values(), key=lambda i: ['id']), + } + + @staticmethod + def get_image_info(pool_name, image_name, get_iscsi_info): + for image in get_iscsi_info['images']: + if image['pool_name'] == pool_name and image['name'] == image_name: + return image + return None + + @staticmethod + def remove_undetected_clients(images, daemons, daemon): + for image in images.values(): + optimized_daemon = image.get('optimized_daemon', None) + if optimized_daemon: + for daemon_name in image['optimized_paths']: + if daemon_name != optimized_daemon: + daemon = daemons[daemon_name] + daemon['optimized_paths'] -= 1 + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(daemon_name) + image['optimized_paths'] = [optimized_daemon] |