From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/pybind/mgr/dashboard/services/__init__.py | 2 + .../mgr/dashboard/services/access_control.py | 953 +++++++++++++++++++++ src/pybind/mgr/dashboard/services/auth.py | 215 +++++ src/pybind/mgr/dashboard/services/ceph_service.py | 423 +++++++++ src/pybind/mgr/dashboard/services/cephfs.py | 263 ++++++ src/pybind/mgr/dashboard/services/cluster.py | 35 + src/pybind/mgr/dashboard/services/exception.py | 118 +++ src/pybind/mgr/dashboard/services/iscsi_cli.py | 59 ++ src/pybind/mgr/dashboard/services/iscsi_client.py | 259 ++++++ src/pybind/mgr/dashboard/services/iscsi_config.py | 112 +++ src/pybind/mgr/dashboard/services/orchestrator.py | 232 +++++ src/pybind/mgr/dashboard/services/osd.py | 25 + src/pybind/mgr/dashboard/services/progress.py | 92 ++ src/pybind/mgr/dashboard/services/rbd.py | 580 +++++++++++++ src/pybind/mgr/dashboard/services/rgw_client.py | 764 +++++++++++++++++ src/pybind/mgr/dashboard/services/sso.py | 257 ++++++ src/pybind/mgr/dashboard/services/tcmu_service.py | 108 +++ 17 files changed, 4497 insertions(+) create mode 100644 src/pybind/mgr/dashboard/services/__init__.py create mode 100644 src/pybind/mgr/dashboard/services/access_control.py create mode 100644 src/pybind/mgr/dashboard/services/auth.py create mode 100644 src/pybind/mgr/dashboard/services/ceph_service.py create mode 100644 src/pybind/mgr/dashboard/services/cephfs.py create mode 100644 src/pybind/mgr/dashboard/services/cluster.py create mode 100644 src/pybind/mgr/dashboard/services/exception.py create mode 100644 src/pybind/mgr/dashboard/services/iscsi_cli.py create mode 100644 src/pybind/mgr/dashboard/services/iscsi_client.py create mode 100644 src/pybind/mgr/dashboard/services/iscsi_config.py create mode 100644 src/pybind/mgr/dashboard/services/orchestrator.py create mode 100644 src/pybind/mgr/dashboard/services/osd.py create mode 100644 src/pybind/mgr/dashboard/services/progress.py create mode 100644 src/pybind/mgr/dashboard/services/rbd.py create mode 100644 src/pybind/mgr/dashboard/services/rgw_client.py create mode 100644 src/pybind/mgr/dashboard/services/sso.py create mode 100644 src/pybind/mgr/dashboard/services/tcmu_service.py (limited to 'src/pybind/mgr/dashboard/services') diff --git a/src/pybind/mgr/dashboard/services/__init__.py b/src/pybind/mgr/dashboard/services/__init__.py new file mode 100644 index 000000000..139759b65 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import diff --git a/src/pybind/mgr/dashboard/services/access_control.py b/src/pybind/mgr/dashboard/services/access_control.py new file mode 100644 index 000000000..27d849cc4 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/access_control.py @@ -0,0 +1,953 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-arguments,too-many-return-statements +# pylint: disable=too-many-branches, too-many-locals, too-many-statements +from __future__ import absolute_import + +import errno +import json +import logging +import re +import threading +import time +from datetime import datetime, timedelta +from string import ascii_lowercase, ascii_uppercase, digits, punctuation +from typing import List, Optional, Sequence + +import bcrypt +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand + +from .. import mgr +from ..exceptions import PasswordPolicyException, PermissionNotValid, \ + PwdExpirationDateNotValid, RoleAlreadyExists, RoleDoesNotExist, \ + RoleIsAssociatedWithUser, RoleNotInUser, ScopeNotInRole, ScopeNotValid, \ + UserAlreadyExists, UserDoesNotExist +from ..security import Permission, Scope +from ..settings import Settings + +logger = logging.getLogger('access_control') +DEFAULT_FILE_DESC = 'password/secret' + + +# password hashing algorithm +def password_hash(password, salt_password=None): + if not password: + return None + if not salt_password: + salt_password = bcrypt.gensalt() + else: + salt_password = salt_password.encode('utf8') + return bcrypt.hashpw(password.encode('utf8'), salt_password).decode('utf8') + + +_P = Permission # short alias + + +class PasswordPolicy(object): + def __init__(self, password, username=None, old_password=None): + """ + :param password: The new plain password. + :type password: str + :param username: The name of the user. + :type username: str | None + :param old_password: The old plain password. + :type old_password: str | None + """ + self.password = password + self.username = username + self.old_password = old_password + self.forbidden_words = Settings.PWD_POLICY_EXCLUSION_LIST.split(',') + self.complexity_credits = 0 + + @staticmethod + def _check_if_contains_word(password, word): + return re.compile('(?:{0})'.format(word), + flags=re.IGNORECASE).search(password) + + def check_password_complexity(self): + if not Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED: + return Settings.PWD_POLICY_MIN_COMPLEXITY + digit_credit = 1 + small_letter_credit = 1 + big_letter_credit = 2 + special_character_credit = 3 + other_character_credit = 5 + self.complexity_credits = 0 + for ch in self.password: + if ch in ascii_uppercase: + self.complexity_credits += big_letter_credit + elif ch in ascii_lowercase: + self.complexity_credits += small_letter_credit + elif ch in digits: + self.complexity_credits += digit_credit + elif ch in punctuation: + self.complexity_credits += special_character_credit + else: + self.complexity_credits += other_character_credit + return self.complexity_credits + + def check_is_old_password(self): + if not Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED: + return False + return self.old_password and self.password == self.old_password + + def check_if_contains_username(self): + if not Settings.PWD_POLICY_CHECK_USERNAME_ENABLED: + return False + if not self.username: + return False + return self._check_if_contains_word(self.password, self.username) + + def check_if_contains_forbidden_words(self): + if not Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED: + return False + return self._check_if_contains_word(self.password, + '|'.join(self.forbidden_words)) + + def check_if_sequential_characters(self): + if not Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED: + return False + for i in range(1, len(self.password) - 1): + if ord(self.password[i - 1]) + 1 == ord(self.password[i])\ + == ord(self.password[i + 1]) - 1: + return True + return False + + def check_if_repetitive_characters(self): + if not Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED: + return False + for i in range(1, len(self.password) - 1): + if self.password[i - 1] == self.password[i] == self.password[i + 1]: + return True + return False + + def check_password_length(self): + if not Settings.PWD_POLICY_CHECK_LENGTH_ENABLED: + return True + return len(self.password) >= Settings.PWD_POLICY_MIN_LENGTH + + def check_all(self): + """ + Perform all password policy checks. + :raise PasswordPolicyException: If a password policy check fails. + """ + if not Settings.PWD_POLICY_ENABLED: + return + if self.check_password_complexity() < Settings.PWD_POLICY_MIN_COMPLEXITY: + raise PasswordPolicyException('Password is too weak.') + if not self.check_password_length(): + raise PasswordPolicyException('Password is too weak.') + if self.check_is_old_password(): + raise PasswordPolicyException('Password must not be the same as the previous one.') + if self.check_if_contains_username(): + raise PasswordPolicyException('Password must not contain username.') + result = self.check_if_contains_forbidden_words() + if result: + raise PasswordPolicyException('Password must not contain the keyword "{}".'.format( + result.group(0))) + if self.check_if_repetitive_characters(): + raise PasswordPolicyException('Password must not contain repetitive characters.') + if self.check_if_sequential_characters(): + raise PasswordPolicyException('Password must not contain sequential characters.') + + +class Role(object): + def __init__(self, name, description=None, scope_permissions=None): + self.name = name + self.description = description + if scope_permissions is None: + self.scopes_permissions = {} + else: + self.scopes_permissions = scope_permissions + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return self.name == other.name + + def set_scope_permissions(self, scope, permissions): + if not Scope.valid_scope(scope): + raise ScopeNotValid(scope) + for perm in permissions: + if not Permission.valid_permission(perm): + raise PermissionNotValid(perm) + + permissions.sort() + self.scopes_permissions[scope] = permissions + + def del_scope_permissions(self, scope): + if scope not in self.scopes_permissions: + raise ScopeNotInRole(scope, self.name) + del self.scopes_permissions[scope] + + def reset_scope_permissions(self): + self.scopes_permissions = {} + + def authorize(self, scope, permissions): + if scope in self.scopes_permissions: + role_perms = self.scopes_permissions[scope] + for perm in permissions: + if perm not in role_perms: + return False + return True + return False + + def to_dict(self): + return { + 'name': self.name, + 'description': self.description, + 'scopes_permissions': self.scopes_permissions + } + + @classmethod + def from_dict(cls, r_dict): + return Role(r_dict['name'], r_dict['description'], + r_dict['scopes_permissions']) + + +# static pre-defined system roles +# this roles cannot be deleted nor updated + +# admin role provides all permissions for all scopes +ADMIN_ROLE = Role( + 'administrator', 'allows full permissions for all security scopes', { + scope_name: Permission.all_permissions() + for scope_name in Scope.all_scopes() + }) + + +# read-only role provides read-only permission for all scopes +READ_ONLY_ROLE = Role( + 'read-only', + 'allows read permission for all security scope except dashboard settings and config-opt', { + scope_name: [_P.READ] for scope_name in Scope.all_scopes() + if scope_name not in (Scope.DASHBOARD_SETTINGS, Scope.CONFIG_OPT) + }) + + +# block manager role provides all permission for block related scopes +BLOCK_MGR_ROLE = Role( + 'block-manager', 'allows full permissions for rbd-image, rbd-mirroring, and iscsi scopes', { + Scope.RBD_IMAGE: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.POOL: [_P.READ], + Scope.ISCSI: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RBD_MIRRORING: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# RadosGW manager role provides all permissions for block related scopes +RGW_MGR_ROLE = Role( + 'rgw-manager', 'allows full permissions for the rgw scope', { + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# Cluster manager role provides all permission for OSDs, Monitors, and +# Config options +CLUSTER_MGR_ROLE = Role( + 'cluster-manager', """allows full permissions for the hosts, osd, mon, mgr, + and config-opt scopes""", { + Scope.HOSTS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.OSD: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MONITOR: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MANAGER: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CONFIG_OPT: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.LOG: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +# Pool manager role provides all permissions for pool related scopes +POOL_MGR_ROLE = Role( + 'pool-manager', 'allows full permissions for the pool scope', { + Scope.POOL: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + +# CephFS manager role provides all permissions for CephFS related scopes +CEPHFS_MGR_ROLE = Role( + 'cephfs-manager', 'allows full permissions for the cephfs scope', { + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + +GANESHA_MGR_ROLE = Role( + 'ganesha-manager', 'allows full permissions for the nfs-ganesha scope', { + Scope.NFS_GANESHA: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], + }) + + +SYSTEM_ROLES = { + ADMIN_ROLE.name: ADMIN_ROLE, + READ_ONLY_ROLE.name: READ_ONLY_ROLE, + BLOCK_MGR_ROLE.name: BLOCK_MGR_ROLE, + RGW_MGR_ROLE.name: RGW_MGR_ROLE, + CLUSTER_MGR_ROLE.name: CLUSTER_MGR_ROLE, + POOL_MGR_ROLE.name: POOL_MGR_ROLE, + CEPHFS_MGR_ROLE.name: CEPHFS_MGR_ROLE, + GANESHA_MGR_ROLE.name: GANESHA_MGR_ROLE, +} + + +class User(object): + def __init__(self, username, password, name=None, email=None, roles=None, + last_update=None, enabled=True, pwd_expiration_date=None, + pwd_update_required=False): + self.username = username + self.password = password + self.name = name + self.email = email + self.invalid_auth_attempt = 0 + if roles is None: + self.roles = set() + else: + self.roles = roles + if last_update is None: + self.refresh_last_update() + else: + self.last_update = last_update + self._enabled = enabled + self.pwd_expiration_date = pwd_expiration_date + if self.pwd_expiration_date is None: + self.refresh_pwd_expiration_date() + self.pwd_update_required = pwd_update_required + + def refresh_last_update(self): + self.last_update = int(time.time()) + + def refresh_pwd_expiration_date(self): + if Settings.USER_PWD_EXPIRATION_SPAN > 0: + expiration_date = datetime.utcnow() + timedelta( + days=Settings.USER_PWD_EXPIRATION_SPAN) + self.pwd_expiration_date = int(time.mktime(expiration_date.timetuple())) + else: + self.pwd_expiration_date = None + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, value): + self._enabled = value + self.refresh_last_update() + + def set_password(self, password): + self.set_password_hash(password_hash(password)) + + def set_password_hash(self, hashed_password): + self.invalid_auth_attempt = 0 + self.password = hashed_password + self.refresh_last_update() + self.refresh_pwd_expiration_date() + self.pwd_update_required = False + + def compare_password(self, password): + """ + Compare the specified password with the user password. + :param password: The plain password to check. + :type password: str + :return: `True` if the passwords are equal, otherwise `False`. + :rtype: bool + """ + pass_hash = password_hash(password, salt_password=self.password) + return pass_hash == self.password + + def is_pwd_expired(self): + if self.pwd_expiration_date: + current_time = int(time.mktime(datetime.utcnow().timetuple())) + return self.pwd_expiration_date < current_time + return False + + def set_roles(self, roles): + self.roles = set(roles) + self.refresh_last_update() + + def add_roles(self, roles): + self.roles = self.roles.union(set(roles)) + self.refresh_last_update() + + def del_roles(self, roles): + for role in roles: + if role not in self.roles: + raise RoleNotInUser(role.name, self.username) + self.roles.difference_update(set(roles)) + self.refresh_last_update() + + def authorize(self, scope, permissions): + if self.pwd_update_required: + return False + + for role in self.roles: + if role.authorize(scope, permissions): + return True + return False + + def permissions_dict(self): + # type: () -> dict + perms = {} # type: dict + for role in self.roles: + for scope, perms_list in role.scopes_permissions.items(): + if scope in perms: + perms_tmp = set(perms[scope]).union(set(perms_list)) + perms[scope] = list(perms_tmp) + else: + perms[scope] = perms_list + + return perms + + def to_dict(self): + return { + 'username': self.username, + 'password': self.password, + 'roles': sorted([r.name for r in self.roles]), + 'name': self.name, + 'email': self.email, + 'lastUpdate': self.last_update, + 'enabled': self.enabled, + 'pwdExpirationDate': self.pwd_expiration_date, + 'pwdUpdateRequired': self.pwd_update_required + } + + @classmethod + def from_dict(cls, u_dict, roles): + return User(u_dict['username'], u_dict['password'], u_dict['name'], + u_dict['email'], {roles[r] for r in u_dict['roles']}, + u_dict['lastUpdate'], u_dict['enabled'], + u_dict['pwdExpirationDate'], u_dict['pwdUpdateRequired']) + + +class AccessControlDB(object): + VERSION = 2 + ACDB_CONFIG_KEY = "accessdb_v" + + def __init__(self, version, users, roles): + self.users = users + self.version = version + self.roles = roles + self.lock = threading.RLock() + + def create_role(self, name, description=None): + with self.lock: + if name in SYSTEM_ROLES or name in self.roles: + raise RoleAlreadyExists(name) + role = Role(name, description) + self.roles[name] = role + return role + + def get_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + return self.roles[name] + + def increment_attempt(self, username): + with self.lock: + if username in self.users: + self.users[username].invalid_auth_attempt += 1 + + def reset_attempt(self, username): + with self.lock: + if username in self.users: + self.users[username].invalid_auth_attempt = 0 + + def get_attempt(self, username): + with self.lock: + try: + return self.users[username].invalid_auth_attempt + except KeyError: + return 0 + + def delete_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + role = self.roles[name] + + # check if role is not associated with a user + for username, user in self.users.items(): + if role in user.roles: + raise RoleIsAssociatedWithUser(name, username) + + del self.roles[name] + + def create_user(self, username, password, name, email, enabled=True, + pwd_expiration_date=None, pwd_update_required=False): + logger.debug("creating user: username=%s", username) + with self.lock: + if username in self.users: + raise UserAlreadyExists(username) + if pwd_expiration_date and \ + (pwd_expiration_date < int(time.mktime(datetime.utcnow().timetuple()))): + raise PwdExpirationDateNotValid() + user = User(username, password_hash(password), name, email, enabled=enabled, + pwd_expiration_date=pwd_expiration_date, + pwd_update_required=pwd_update_required) + self.users[username] = user + return user + + def get_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + return self.users[username] + + def delete_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + del self.users[username] + + def update_users_with_roles(self, role): + with self.lock: + if not role: + return + for _, user in self.users.items(): + if role in user.roles: + user.refresh_last_update() + + def save(self): + with self.lock: + db = { + 'users': {un: u.to_dict() for un, u in self.users.items()}, + 'roles': {rn: r.to_dict() for rn, r in self.roles.items()}, + 'version': self.version + } + mgr.set_store(self.accessdb_config_key(), json.dumps(db)) + + @classmethod + def accessdb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.ACDB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("Checking for previous DB versions") + + def check_migrate_v1_to_current(): + # Check if version 1 exists in the DB and migrate it to current version + v1_db = mgr.get_store(self.accessdb_config_key(1)) + if v1_db: + logger.debug("Found database v1 credentials") + v1_db = json.loads(v1_db) + + for user, _ in v1_db['users'].items(): + v1_db['users'][user]['enabled'] = True + v1_db['users'][user]['pwdExpirationDate'] = None + v1_db['users'][user]['pwdUpdateRequired'] = False + + self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()} + self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES)) + for un, u in v1_db.get('users', {}).items()} + + self.save() + + check_migrate_v1_to_current() + + @classmethod + def load(cls): + logger.info("Loading user roles DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.accessdb_config_key()) + if json_db is None: + logger.debug("No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, {}, {}) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + dict_db = json.loads(json_db) + roles = {rn: Role.from_dict(r) + for rn, r in dict_db.get('roles', {}).items()} + users = {un: User.from_dict(u, dict(roles, **SYSTEM_ROLES)) + for un, u in dict_db.get('users', {}).items()} + return cls(dict_db['version'], users, roles) + + +def load_access_control_db(): + mgr.ACCESS_CTRL_DB = AccessControlDB.load() + + +# CLI dashboard access control scope commands + +@CLIWriteCommand('dashboard set-login-credentials') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def set_login_credentials_cmd(_, username: str, inbuf: str): + ''' + Set the login credentials. Password read from -i + ''' + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password(password) + except UserDoesNotExist: + user = mgr.ACCESS_CTRL_DB.create_user(username, password, None, None) + user.set_roles([ADMIN_ROLE]) + + mgr.ACCESS_CTRL_DB.save() + + return 0, '''\ +****************************************************************** +*** WARNING: this command is deprecated. *** +*** Please use the ac-user-* related commands to manage users. *** +****************************************************************** +Username and password updated''', '' + + +@CLIReadCommand('dashboard ac-role-show') +def ac_role_show_cmd(_, rolename: Optional[str] = None): + ''' + Show role info + ''' + if not rolename: + roles = dict(mgr.ACCESS_CTRL_DB.roles) + roles.update(SYSTEM_ROLES) + roles_list = [name for name, _ in roles.items()] + return 0, json.dumps(roles_list), '' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + return 0, json.dumps(role.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-role-create') +def ac_role_create_cmd(_, rolename: str, description: Optional[str] = None): + ''' + Create a new access control role + ''' + try: + role = mgr.ACCESS_CTRL_DB.create_role(rolename, description) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-delete') +def ac_role_delete_cmd(_, rolename: str): + ''' + Delete an access control role + ''' + try: + mgr.ACCESS_CTRL_DB.delete_role(rolename) + mgr.ACCESS_CTRL_DB.save() + return 0, "Role '{}' deleted".format(rolename), "" + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot delete system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except RoleIsAssociatedWithUser as ex: + return -errno.EPERM, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-add-scope-perms') +def ac_role_add_scope_perms_cmd(_, + rolename: str, + scopename: str, + permissions: Sequence[str]): + ''' + Add the scope permissions for a role + ''' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + perms_array = [perm.strip() for perm in permissions] + role.set_scope_permissions(scopename, perms_array) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotValid as ex: + return -errno.EINVAL, '', str(ex) + "\n Possible values: {}" \ + .format(Scope.all_scopes()) + except PermissionNotValid as ex: + return -errno.EINVAL, '', str(ex) + \ + "\n Possible values: {}" \ + .format(Permission.all_permissions()) + + +@CLIWriteCommand('dashboard ac-role-del-scope-perms') +def ac_role_del_scope_perms_cmd(_, rolename: str, scopename: str): + ''' + Delete the scope permissions for a role + ''' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + role.del_scope_permissions(scopename) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotInRole as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIReadCommand('dashboard ac-user-show') +def ac_user_show_cmd(_, username: Optional[str] = None): + ''' + Show user info + ''' + if not username: + users = mgr.ACCESS_CTRL_DB.users + users_list = [name for name, _ in users.items()] + return 0, json.dumps(users_list), '' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-create') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_create_cmd(_, username: str, inbuf: str, + rolename: Optional[str] = None, + name: Optional[str] = None, + email: Optional[str] = None, + enabled: bool = True, + force_password: bool = False, + pwd_expiration_date: Optional[int] = None, + pwd_update_required: bool = False): + ''' + Create a user. Password read from -i + ''' + password = inbuf + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + + try: + if not force_password: + pw_check = PasswordPolicy(password, username) + pw_check.check_all() + user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email, + enabled, pwd_expiration_date, + pwd_update_required) + except PasswordPolicyException as ex: + return -errno.EINVAL, '', str(ex) + except UserAlreadyExists as ex: + return 0, str(ex), '' + + if role: + user.set_roles([role]) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-user-enable') +def ac_user_enable(_, username: str): + ''' + Enable a user + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.enabled = True + mgr.ACCESS_CTRL_DB.reset_attempt(username) + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-disable') +def ac_user_disable(_, username: str): + ''' + Disable a user + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.enabled = False + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-delete') +def ac_user_delete_cmd(_, username: str): + ''' + Delete user + ''' + try: + mgr.ACCESS_CTRL_DB.delete_user(username) + mgr.ACCESS_CTRL_DB.save() + return 0, "User '{}' deleted".format(username), "" + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-roles') +def ac_user_set_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Set user roles + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-add-roles') +def ac_user_add_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Add roles to user + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.add_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-del-roles') +def ac_user_del_roles_cmd(_, username: str, roles: Sequence[str]): + ''' + Delete roles from user + ''' + rolesname = roles + roles: List[Role] = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.del_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except RoleNotInUser as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-password') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_set_password(_, username: str, inbuf: str, + force_password: bool = False): + ''' + Set user password from -i + ''' + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if not force_password: + pw_check = PasswordPolicy(password, user.name) + pw_check.check_all() + user.set_password(password) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except PasswordPolicyException as ex: + return -errno.EINVAL, '', str(ex) + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-password-hash') +@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC) +def ac_user_set_password_hash(_, username: str, inbuf: str): + ''' + Set user password bcrypt hash from -i + ''' + hashed_password = inbuf + try: + # make sure the hashed_password is actually a bcrypt hash + bcrypt.checkpw(b'', hashed_password.encode('utf-8')) + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password_hash(hashed_password) + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except ValueError: + return -errno.EINVAL, '', 'Invalid password hash' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-info') +def ac_user_set_info(_, username: str, name: str, email: str): + ''' + Set user info + ''' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if name: + user.name = name + if email: + user.email = email + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +class LocalAuthenticator(object): + def __init__(self): + load_access_control_db() + + def get_user(self, username): + return mgr.ACCESS_CTRL_DB.get_user(username) + + def authenticate(self, username, password): + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if user.password: + if user.enabled and user.compare_password(password) \ + and not user.is_pwd_expired(): + return {'permissions': user.permissions_dict(), + 'pwdExpirationDate': user.pwd_expiration_date, + 'pwdUpdateRequired': user.pwd_update_required} + except UserDoesNotExist: + logger.debug("User '%s' does not exist", username) + return None + + def authorize(self, username, scope, permissions): + user = mgr.ACCESS_CTRL_DB.get_user(username) + return user.authorize(scope, permissions) diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py new file mode 100644 index 000000000..fc883f055 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/auth.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json +import logging +import os +import threading +import time +import uuid +from base64 import b64encode + +import cherrypy +import jwt + +from .. import mgr +from .access_control import LocalAuthenticator, UserDoesNotExist + +cherrypy.config.update({ + 'response.headers.server': 'Ceph-Dashboard', + 'response.headers.content-security-policy': "frame-ancestors 'self';", + 'response.headers.x-content-type-options': 'nosniff', + 'response.headers.strict-transport-security': 'max-age=63072000; includeSubDomains; preload' +}) + + +class JwtManager(object): + JWT_TOKEN_BLOCKLIST_KEY = "jwt_token_block_list" + JWT_TOKEN_TTL = 28800 # default 8 hours + JWT_ALGORITHM = 'HS256' + _secret = None + + LOCAL_USER = threading.local() + + @staticmethod + def _gen_secret(): + secret = os.urandom(16) + return b64encode(secret).decode('utf-8') + + @classmethod + def init(cls): + cls.logger = logging.getLogger('jwt') # type: ignore + # generate a new secret if it does not exist + secret = mgr.get_store('jwt_secret') + if secret is None: + secret = cls._gen_secret() + mgr.set_store('jwt_secret', secret) + cls._secret = secret + + @classmethod + def gen_token(cls, username): + if not cls._secret: + cls.init() + ttl = mgr.get_module_option('jwt_token_ttl', cls.JWT_TOKEN_TTL) + ttl = int(ttl) + now = int(time.time()) + payload = { + 'iss': 'ceph-dashboard', + 'jti': str(uuid.uuid4()), + 'exp': now + ttl, + 'iat': now, + 'username': username + } + return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) # type: ignore + + @classmethod + def decode_token(cls, token): + if not cls._secret: + cls.init() + return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) # type: ignore + + @classmethod + def get_token_from_header(cls): + auth_cookie_name = 'token' + try: + # use cookie + return cherrypy.request.cookie[auth_cookie_name].value + except KeyError: + try: + # fall-back: use Authorization header + auth_header = cherrypy.request.headers.get('authorization') + if auth_header is not None: + scheme, params = auth_header.split(' ', 1) + if scheme.lower() == 'bearer': + return params + except IndexError: + return None + + @classmethod + def set_user(cls, username): + cls.LOCAL_USER.username = username + + @classmethod + def reset_user(cls): + cls.set_user(None) + + @classmethod + def get_username(cls): + return getattr(cls.LOCAL_USER, 'username', None) + + @classmethod + def get_user(cls, token): + try: + dtoken = JwtManager.decode_token(token) + if not JwtManager.is_blocklisted(dtoken['jti']): + user = AuthManager.get_user(dtoken['username']) + if user.last_update <= dtoken['iat']: + return user + cls.logger.debug( # type: ignore + "user info changed after token was issued, iat=%s last_update=%s", + dtoken['iat'], user.last_update + ) + else: + cls.logger.debug('Token is block-listed') # type: ignore + except jwt.ExpiredSignatureError: + cls.logger.debug("Token has expired") # type: ignore + except jwt.InvalidTokenError: + cls.logger.debug("Failed to decode token") # type: ignore + except UserDoesNotExist: + cls.logger.debug( # type: ignore + "Invalid token: user %s does not exist", dtoken['username'] + ) + return None + + @classmethod + def blocklist_token(cls, token): + token = cls.decode_token(token) + blocklist_json = mgr.get_store(cls.JWT_TOKEN_BLOCKLIST_KEY) + if not blocklist_json: + blocklist_json = "{}" + bl_dict = json.loads(blocklist_json) + now = time.time() + + # remove expired tokens + to_delete = [] + for jti, exp in bl_dict.items(): + if exp < now: + to_delete.append(jti) + for jti in to_delete: + del bl_dict[jti] + + bl_dict[token['jti']] = token['exp'] + mgr.set_store(cls.JWT_TOKEN_BLOCKLIST_KEY, json.dumps(bl_dict)) + + @classmethod + def is_blocklisted(cls, jti): + blocklist_json = mgr.get_store(cls.JWT_TOKEN_BLOCKLIST_KEY) + if not blocklist_json: + blocklist_json = "{}" + bl_dict = json.loads(blocklist_json) + return jti in bl_dict + + +class AuthManager(object): + AUTH_PROVIDER = None + + @classmethod + def initialize(cls): + cls.AUTH_PROVIDER = LocalAuthenticator() + + @classmethod + def get_user(cls, username): + return cls.AUTH_PROVIDER.get_user(username) # type: ignore + + @classmethod + def authenticate(cls, username, password): + return cls.AUTH_PROVIDER.authenticate(username, password) # type: ignore + + @classmethod + def authorize(cls, username, scope, permissions): + return cls.AUTH_PROVIDER.authorize(username, scope, permissions) # type: ignore + + +class AuthManagerTool(cherrypy.Tool): + def __init__(self): + super(AuthManagerTool, self).__init__( + 'before_handler', self._check_authentication, priority=20) + self.logger = logging.getLogger('auth') + + def _check_authentication(self): + JwtManager.reset_user() + token = JwtManager.get_token_from_header() + if token: + user = JwtManager.get_user(token) + if user: + self._check_authorization(user.username) + return + self.logger.debug('Unauthorized access to %s', + cherrypy.url(relative='server')) + raise cherrypy.HTTPError(401, 'You are not authorized to access ' + 'that resource') + + def _check_authorization(self, username): + self.logger.debug("checking authorization...") + handler = cherrypy.request.handler.callable + controller = handler.__self__ + sec_scope = getattr(controller, '_security_scope', None) + sec_perms = getattr(handler, '_security_permissions', None) + JwtManager.set_user(username) + + if not sec_scope: + # controller does not define any authorization restrictions + return + + self.logger.debug("checking '%s' access to '%s' scope", sec_perms, + sec_scope) + + if not sec_perms: + self.logger.debug("Fail to check permission on: %s:%s", controller, + handler) + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") + + if not AuthManager.authorize(username, sec_scope, sec_perms): + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") diff --git a/src/pybind/mgr/dashboard/services/ceph_service.py b/src/pybind/mgr/dashboard/services/ceph_service.py new file mode 100644 index 000000000..675d0425a --- /dev/null +++ b/src/pybind/mgr/dashboard/services/ceph_service.py @@ -0,0 +1,423 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json +import logging + +import rados +from mgr_module import CommandResult +from mgr_util import get_most_recent_rate, get_time_series_rates + +from .. import mgr + +try: + from typing import Any, Dict, Optional, Union +except ImportError: + pass # For typing only + +logger = logging.getLogger('ceph_service') + + +class SendCommandError(rados.Error): + def __init__(self, err, prefix, argdict, errno): + self.prefix = prefix + self.argdict = argdict + super(SendCommandError, self).__init__(err, errno) + + +# pylint: disable=too-many-public-methods +class CephService(object): + + OSD_FLAG_NO_SCRUB = 'noscrub' + OSD_FLAG_NO_DEEP_SCRUB = 'nodeep-scrub' + + PG_STATUS_SCRUBBING = 'scrubbing' + PG_STATUS_DEEP_SCRUBBING = 'deep' + + SCRUB_STATUS_DISABLED = 'Disabled' + SCRUB_STATUS_ACTIVE = 'Active' + SCRUB_STATUS_INACTIVE = 'Inactive' + + @classmethod + def get_service_map(cls, service_name): + service_map = {} # type: Dict[str, dict] + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_name: + if server['hostname'] not in service_map: + service_map[server['hostname']] = { + 'server': server, + 'services': [] + } + inst_id = service['id'] + metadata = mgr.get_metadata(service_name, inst_id) + status = mgr.get_daemon_status(service_name, inst_id) + service_map[server['hostname']]['services'].append({ + 'id': inst_id, + 'type': service_name, + 'hostname': server['hostname'], + 'metadata': metadata, + 'status': status + }) + return service_map + + @classmethod + def get_service_list(cls, service_name): + service_map = cls.get_service_map(service_name) + return [svc for _, svcs in service_map.items() for svc in svcs['services']] + + @classmethod + def get_service_data_by_metadata_id(cls, + service_type: str, + metadata_id: str) -> Optional[Dict[str, Any]]: + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_type: + metadata = mgr.get_metadata(service_type, service['id']) + if metadata_id == metadata['id']: + return { + 'id': metadata['id'], + 'service_map_id': str(service['id']), + 'type': service_type, + 'hostname': server['hostname'], + 'metadata': metadata + } + return None + + @classmethod + def get_service(cls, service_type: str, metadata_id: str) -> Optional[Dict[str, Any]]: + svc_data = cls.get_service_data_by_metadata_id(service_type, metadata_id) + if svc_data: + svc_data['status'] = mgr.get_daemon_status(svc_data['type'], svc_data['service_map_id']) + return svc_data + + @classmethod + def get_service_perf_counters(cls, service_type: str, service_id: str) -> Dict[str, Any]: + schema_dict = mgr.get_perf_schema(service_type, service_id) + schema = schema_dict["{}.{}".format(service_type, service_id)] + counters = [] + for key, value in sorted(schema.items()): + counter = {'name': str(key), 'description': value['description']} + # pylint: disable=W0212 + if mgr._stattype_to_str(value['type']) == 'counter': + counter['value'] = cls.get_rate( + service_type, service_id, key) + counter['unit'] = mgr._unit_to_str(value['units']) + else: + counter['value'] = mgr.get_latest( + service_type, service_id, key) + counter['unit'] = '' + counters.append(counter) + + return { + 'service': { + 'type': service_type, + 'id': str(service_id) + }, + 'counters': counters + } + + @classmethod + def get_pool_list(cls, application=None): + osd_map = mgr.get('osd_map') + if not application: + return osd_map['pools'] + return [pool for pool in osd_map['pools'] + if application in pool.get('application_metadata', {})] + + @classmethod + def get_pool_list_with_stats(cls, application=None): + # pylint: disable=too-many-locals + pools = cls.get_pool_list(application) + + pools_w_stats = [] + + pg_summary = mgr.get("pg_summary") + pool_stats = mgr.get_updated_pool_stats() + + for pool in pools: + pool['pg_status'] = pg_summary['by_pool'][pool['pool'].__str__()] + stats = pool_stats[pool['pool']] + s = {} + + for stat_name, stat_series in stats.items(): + rates = get_time_series_rates(stat_series) + s[stat_name] = { + 'latest': stat_series[0][1], + 'rate': get_most_recent_rate(rates), + 'rates': rates + } + pool['stats'] = s + pools_w_stats.append(pool) + return pools_w_stats + + @classmethod + def get_erasure_code_profiles(cls): + def _serialize_ecp(name, ecp): + def serialize_numbers(key): + value = ecp.get(key) + if value is not None: + ecp[key] = int(value) + + ecp['name'] = name + serialize_numbers('k') + serialize_numbers('m') + return ecp + + ret = [] + for name, ecp in mgr.get('osd_map').get('erasure_code_profiles', {}).items(): + ret.append(_serialize_ecp(name, ecp)) + return ret + + @classmethod + def get_pool_name_from_id(cls, pool_id): + # type: (int) -> Union[str, None] + return mgr.rados.pool_reverse_lookup(pool_id) + + @classmethod + def get_pool_by_attribute(cls, attribute, value): + # type: (str, Any) -> Union[dict, None] + pool_list = cls.get_pool_list() + for pool in pool_list: + if attribute in pool and pool[attribute] == value: + return pool + return None + + @classmethod + def get_pool_pg_status(cls, pool_name): + # type: (str) -> dict + pool = cls.get_pool_by_attribute('pool_name', pool_name) + if pool is None: + return {} + return mgr.get("pg_summary")['by_pool'][pool['pool'].__str__()] + + @staticmethod + def send_command(srv_type, prefix, srv_spec='', **kwargs): + # type: (str, str, Optional[str], Any) -> Any + """ + :type prefix: str + :param srv_type: mon | + :param kwargs: will be added to argdict + :param srv_spec: typically empty. or something like ":0" + + :raises PermissionError: See rados.make_ex + :raises ObjectNotFound: See rados.make_ex + :raises IOError: See rados.make_ex + :raises NoSpace: See rados.make_ex + :raises ObjectExists: See rados.make_ex + :raises ObjectBusy: See rados.make_ex + :raises NoData: See rados.make_ex + :raises InterruptedOrTimeoutError: See rados.make_ex + :raises TimedOut: See rados.make_ex + :raises ValueError: return code != 0 + """ + argdict = { + "prefix": prefix, + "format": "json", + } + argdict.update({k: v for k, v in kwargs.items() if v is not None}) + result = CommandResult("") + mgr.send_command(result, srv_type, srv_spec, json.dumps(argdict), "") + r, outb, outs = result.wait() + if r != 0: + logger.error("send_command '%s' failed. (r=%s, outs=\"%s\", kwargs=%s)", prefix, r, + outs, kwargs) + + raise SendCommandError(outs, prefix, argdict, r) + + try: + return json.loads(outb or outs) + except Exception: # pylint: disable=broad-except + return outb + + @staticmethod + def _get_smart_data_by_device(device): + # type: (dict) -> Dict[str, dict] + # Check whether the device is associated with daemons. + if 'daemons' in device and device['daemons']: + dev_smart_data: Dict[str, Any] = {} + + # Get a list of all OSD daemons on all hosts that are 'up' + # because SMART data can not be retrieved from daemons that + # are 'down' or 'destroyed'. + osd_tree = CephService.send_command('mon', 'osd tree') + osd_daemons_up = [ + node['name'] for node in osd_tree.get('nodes', {}) + if node.get('status') == 'up' + ] + + # All daemons on the same host can deliver SMART data, + # thus it is not relevant for us which daemon we are using. + # NOTE: the list may contain daemons that are 'down' or 'destroyed'. + for daemon in device['daemons']: + svc_type, svc_id = daemon.split('.', 1) + if 'osd' in svc_type: + if daemon not in osd_daemons_up: + continue + try: + dev_smart_data = CephService.send_command( + svc_type, 'smart', svc_id, devid=device['devid']) + except SendCommandError as error: + logger.warning(str(error)) + # Try to retrieve SMART data from another daemon. + continue + elif 'mon' in svc_type: + try: + dev_smart_data = CephService.send_command( + svc_type, 'device query-daemon-health-metrics', who=daemon) + except SendCommandError as error: + logger.warning(str(error)) + # Try to retrieve SMART data from another daemon. + continue + else: + dev_smart_data = {} + for dev_id, dev_data in dev_smart_data.items(): + if 'error' in dev_data: + logger.warning( + '[SMART] Error retrieving smartctl data for device ID "%s": %s', + dev_id, dev_data) + break + + return dev_smart_data + logger.warning('[SMART] No daemons associated with device ID "%s"', + device['devid']) + return {} + + @staticmethod + def get_devices_by_host(hostname): + # type: (str) -> dict + return CephService.send_command('mon', + 'device ls-by-host', + host=hostname) + + @staticmethod + def get_devices_by_daemon(daemon_type, daemon_id): + # type: (str, str) -> dict + return CephService.send_command('mon', + 'device ls-by-daemon', + who='{}.{}'.format( + daemon_type, daemon_id)) + + @staticmethod + def get_smart_data_by_host(hostname): + # type: (str) -> dict + """ + Get the SMART data of all devices on the given host, regardless + of the daemon (osd, mon, ...). + :param hostname: The name of the host. + :return: A dictionary containing the SMART data of every device + on the given host. The device name is used as the key in the + dictionary. + """ + devices = CephService.get_devices_by_host(hostname) + smart_data = {} # type: dict + if devices: + for device in devices: + if device['devid'] not in smart_data: + smart_data.update( + CephService._get_smart_data_by_device(device)) + else: + logger.debug('[SMART] could not retrieve device list from host %s', hostname) + return smart_data + + @staticmethod + def get_smart_data_by_daemon(daemon_type, daemon_id): + # type: (str, str) -> Dict[str, dict] + """ + Get the SMART data of the devices associated with the given daemon. + :param daemon_type: The daemon type, e.g. 'osd' or 'mon'. + :param daemon_id: The daemon identifier. + :return: A dictionary containing the SMART data of every device + associated with the given daemon. The device name is used as the + key in the dictionary. + """ + devices = CephService.get_devices_by_daemon(daemon_type, daemon_id) + smart_data = {} # type: Dict[str, dict] + if devices: + for device in devices: + if device['devid'] not in smart_data: + smart_data.update( + CephService._get_smart_data_by_device(device)) + else: + msg = '[SMART] could not retrieve device list from daemon with type %s and ' +\ + 'with ID %s' + logger.debug(msg, daemon_type, daemon_id) + return smart_data + + @classmethod + def get_rates(cls, svc_type, svc_name, path): + """ + :return: the derivative of mgr.get_counter() + :rtype: list[tuple[int, float]]""" + data = mgr.get_counter(svc_type, svc_name, path)[path] + return get_time_series_rates(data) + + @classmethod + def get_rate(cls, svc_type, svc_name, path): + """returns most recent rate""" + return get_most_recent_rate(cls.get_rates(svc_type, svc_name, path)) + + @classmethod + def get_client_perf(cls): + pools_stats = mgr.get('osd_pool_stats')['pool_stats'] + + io_stats = { + 'read_bytes_sec': 0, + 'read_op_per_sec': 0, + 'write_bytes_sec': 0, + 'write_op_per_sec': 0, + } + recovery_stats = {'recovering_bytes_per_sec': 0} + + for pool_stats in pools_stats: + client_io = pool_stats['client_io_rate'] + for stat in list(io_stats.keys()): + if stat in client_io: + io_stats[stat] += client_io[stat] + + client_recovery = pool_stats['recovery_rate'] + for stat in list(recovery_stats.keys()): + if stat in client_recovery: + recovery_stats[stat] += client_recovery[stat] + + client_perf = io_stats.copy() + client_perf.update(recovery_stats) + + return client_perf + + @classmethod + def get_scrub_status(cls): + enabled_flags = mgr.get('osd_map')['flags_set'] + if cls.OSD_FLAG_NO_SCRUB in enabled_flags or cls.OSD_FLAG_NO_DEEP_SCRUB in enabled_flags: + return cls.SCRUB_STATUS_DISABLED + + grouped_pg_statuses = mgr.get('pg_summary')['all'] + for grouped_pg_status in grouped_pg_statuses.keys(): + if len(grouped_pg_status.split(cls.PG_STATUS_SCRUBBING)) > 1 \ + or len(grouped_pg_status.split(cls.PG_STATUS_DEEP_SCRUBBING)) > 1: + return cls.SCRUB_STATUS_ACTIVE + + return cls.SCRUB_STATUS_INACTIVE + + @classmethod + def get_pg_info(cls): + pg_summary = mgr.get('pg_summary') + object_stats = {stat: pg_summary['pg_stats_sum']['stat_sum'][stat] for stat in [ + 'num_objects', 'num_object_copies', 'num_objects_degraded', + 'num_objects_misplaced', 'num_objects_unfound']} + + pgs_per_osd = 0.0 + total_osds = len(pg_summary['by_osd']) + if total_osds > 0: + total_pgs = 0.0 + for _, osd_pg_statuses in pg_summary['by_osd'].items(): + for _, pg_amount in osd_pg_statuses.items(): + total_pgs += pg_amount + + pgs_per_osd = total_pgs / total_osds + + return { + 'object_stats': object_stats, + 'statuses': pg_summary['all'], + 'pgs_per_osd': pgs_per_osd, + } diff --git a/src/pybind/mgr/dashboard/services/cephfs.py b/src/pybind/mgr/dashboard/services/cephfs.py new file mode 100644 index 000000000..604cf4a77 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cephfs.py @@ -0,0 +1,263 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import datetime +import logging +import os +from contextlib import contextmanager + +import cephfs + +from .. import mgr + +logger = logging.getLogger('cephfs') + + +class CephFS(object): + @classmethod + def list_filesystems(cls): + fsmap = mgr.get("fs_map") + return [{'id': fs['id'], 'name': fs['mdsmap']['fs_name']} + for fs in fsmap['filesystems']] + + @classmethod + def fs_name_from_id(cls, fs_id): + """ + Get the filesystem name from ID. + :param fs_id: The filesystem ID. + :type fs_id: int | str + :return: The filesystem name or None. + :rtype: str | None + """ + fs_map = mgr.get("fs_map") + fs_info = list(filter(lambda x: str(x['id']) == str(fs_id), + fs_map['filesystems'])) + if not fs_info: + return None + return fs_info[0]['mdsmap']['fs_name'] + + def __init__(self, fs_name=None): + logger.debug("initializing cephfs connection") + self.cfs = cephfs.LibCephFS(rados_inst=mgr.rados) + logger.debug("mounting cephfs filesystem: %s", fs_name) + if fs_name: + self.cfs.mount(filesystem_name=fs_name) + else: + self.cfs.mount() + logger.debug("mounted cephfs filesystem") + + def __del__(self): + logger.debug("shutting down cephfs filesystem") + self.cfs.shutdown() + + @contextmanager + def opendir(self, dirpath): + d = None + try: + d = self.cfs.opendir(dirpath) + yield d + finally: + if d: + self.cfs.closedir(d) + + def ls_dir(self, path, depth): + """ + List directories of specified path with additional information. + :param path: The root directory path. + :type path: str | bytes + :param depth: The number of steps to go down the directory tree. + :type depth: int | str + :return: A list of directory dicts which consist of name, path, + parent, snapshots and quotas. + :rtype: list + """ + paths = self._ls_dir(path, int(depth)) + # Convert (bytes => string), prettify paths (strip slashes) + # and append additional information. + return [self.get_directory(p) for p in paths if p != path.encode()] + + def _ls_dir(self, path, depth): + """ + List directories of specified path. + :param path: The root directory path. + :type path: str | bytes + :param depth: The number of steps to go down the directory tree. + :type depth: int + :return: A list of directory paths (bytes encoded). + Example: + ls_dir('/photos', 1) => [ + b'/photos/flowers', b'/photos/cars' + ] + :rtype: list + """ + if isinstance(path, str): + path = path.encode() + logger.debug("get_dir_list dirpath=%s depth=%s", path, + depth) + if depth == 0: + return [path] + logger.debug("opening dirpath=%s", path) + with self.opendir(path) as d: + dent = self.cfs.readdir(d) + paths = [path] + while dent: + logger.debug("found entry=%s", dent.d_name) + if dent.d_name in [b'.', b'..']: + dent = self.cfs.readdir(d) + continue + if dent.is_dir(): + logger.debug("found dir=%s", dent.d_name) + subdir_path = os.path.join(path, dent.d_name) + paths.extend(self._ls_dir(subdir_path, depth - 1)) + dent = self.cfs.readdir(d) + return paths + + def get_directory(self, path): + """ + Transforms path of directory into a meaningful dictionary. + :param path: The root directory path. + :type path: str | bytes + :return: Dict consists of name, path, parent, snapshots and quotas. + :rtype: dict + """ + path = path.decode() + not_root = path != os.sep + return { + 'name': os.path.basename(path) if not_root else path, + 'path': path, + 'parent': os.path.dirname(path) if not_root else None, + 'snapshots': self.ls_snapshots(path), + 'quotas': self.get_quotas(path) if not_root else None + } + + def dir_exists(self, path): + try: + with self.opendir(path): + return True + except cephfs.ObjectNotFound: + return False + + def mk_dirs(self, path): + """ + Create a directory. + :param path: The path of the directory. + """ + if path == os.sep: + raise Exception('Cannot create root directory "/"') + if self.dir_exists(path): + return + logger.info("Creating directory: %s", path) + self.cfs.mkdirs(path, 0o755) + + def rm_dir(self, path): + """ + Remove a directory. + :param path: The path of the directory. + """ + if path == os.sep: + raise Exception('Cannot remove root directory "/"') + if not self.dir_exists(path): + return + logger.info("Removing directory: %s", path) + self.cfs.rmdir(path) + + def mk_snapshot(self, path, name=None, mode=0o755): + """ + Create a snapshot. + :param path: The path of the directory. + :type path: str + :param name: The name of the snapshot. If not specified, + a name using the current time in RFC3339 UTC format + will be generated. + :type name: str | None + :param mode: The permissions the directory should have + once created. + :type mode: int + :return: Returns the name of the snapshot. + :rtype: str + """ + if name is None: + now = datetime.datetime.now() + tz = now.astimezone().tzinfo + name = now.replace(tzinfo=tz).isoformat('T') + client_snapdir = self.cfs.conf_get('client_snapdir') + snapshot_path = os.path.join(path, client_snapdir, name) + logger.info("Creating snapshot: %s", snapshot_path) + self.cfs.mkdir(snapshot_path, mode) + return name + + def ls_snapshots(self, path): + """ + List snapshots for the specified path. + :param path: The path of the directory. + :type path: str + :return: A list of dictionaries containing the name and the + creation time of the snapshot. + :rtype: list + """ + result = [] + client_snapdir = self.cfs.conf_get('client_snapdir') + path = os.path.join(path, client_snapdir).encode() + with self.opendir(path) as d: + dent = self.cfs.readdir(d) + while dent: + if dent.is_dir(): + if dent.d_name not in [b'.', b'..'] and not dent.d_name.startswith(b'_'): + snapshot_path = os.path.join(path, dent.d_name) + stat = self.cfs.stat(snapshot_path) + result.append({ + 'name': dent.d_name.decode(), + 'path': snapshot_path.decode(), + 'created': '{}Z'.format(stat.st_ctime.isoformat('T')) + }) + dent = self.cfs.readdir(d) + return result + + def rm_snapshot(self, path, name): + """ + Remove a snapshot. + :param path: The path of the directory. + :type path: str + :param name: The name of the snapshot. + :type name: str + """ + client_snapdir = self.cfs.conf_get('client_snapdir') + snapshot_path = os.path.join(path, client_snapdir, name) + logger.info("Removing snapshot: %s", snapshot_path) + self.cfs.rmdir(snapshot_path) + + def get_quotas(self, path): + """ + Get the quotas of the specified path. + :param path: The path of the directory/file. + :type path: str + :return: Returns a dictionary containing 'max_bytes' + and 'max_files'. + :rtype: dict + """ + try: + max_bytes = int(self.cfs.getxattr(path, 'ceph.quota.max_bytes')) + except cephfs.NoData: + max_bytes = 0 + try: + max_files = int(self.cfs.getxattr(path, 'ceph.quota.max_files')) + except cephfs.NoData: + max_files = 0 + return {'max_bytes': max_bytes, 'max_files': max_files} + + def set_quotas(self, path, max_bytes=None, max_files=None): + """ + Set the quotas of the specified path. + :param path: The path of the directory/file. + :type path: str + :param max_bytes: The byte limit. + :type max_bytes: int | None + :param max_files: The file limit. + :type max_files: int | None + """ + if max_bytes is not None: + self.cfs.setxattr(path, 'ceph.quota.max_bytes', + str(max_bytes).encode(), 0) + if max_files is not None: + self.cfs.setxattr(path, 'ceph.quota.max_files', + str(max_files).encode(), 0) diff --git a/src/pybind/mgr/dashboard/services/cluster.py b/src/pybind/mgr/dashboard/services/cluster.py new file mode 100644 index 000000000..a057f2438 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cluster.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +from enum import Enum + +from .. import mgr + + +class ClusterModel: + + class Status(Enum): + INSTALLED = 0 + POST_INSTALLED = 1 + + status: Status + + def __init__(self, status=Status.POST_INSTALLED.name): + """ + :param status: The status of the cluster. Assume that the cluster + is already functional by default. + :type status: str + """ + self.status = self.Status[status] + + def dict(self): + return {'status': self.status.name} + + def to_db(self): + mgr.set_store('cluster/status', self.status.name) + + @classmethod + def from_db(cls): + """ + Get the stored cluster status from the configuration key/value store. + If the status is not set, assume it is already fully functional. + """ + return cls(status=mgr.get_store('cluster/status', cls.Status.POST_INSTALLED.name)) diff --git a/src/pybind/mgr/dashboard/services/exception.py b/src/pybind/mgr/dashboard/services/exception.py new file mode 100644 index 000000000..02a827e1b --- /dev/null +++ b/src/pybind/mgr/dashboard/services/exception.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json +import logging +from contextlib import contextmanager + +import cephfs +import cherrypy +import rados +import rbd +from orchestrator import OrchestratorError + +from ..exceptions import DashboardException, ViewCacheNoDataException +from ..services.ceph_service import SendCommandError + +logger = logging.getLogger('exception') + + +def serialize_dashboard_exception(e, include_http_status=False, task=None): + """ + :type e: Exception + :param include_http_status: Used for Tasks, where the HTTP status code is not available. + """ + from ..tools import ViewCache + if isinstance(e, ViewCacheNoDataException): + return {'status': ViewCache.VALUE_NONE, 'value': None} + + out = dict(detail=str(e)) + try: + out['code'] = e.code + except AttributeError: + pass + component = getattr(e, 'component', None) + out['component'] = component if component else None + if include_http_status: + out['status'] = getattr(e, 'status', 500) + if task: + out['task'] = dict(name=task.name, metadata=task.metadata) # type: ignore + return out + + +# pylint: disable=broad-except +def dashboard_exception_handler(handler, *args, **kwargs): + try: + with handle_rados_error(component=None): # make the None controller the fallback. + return handler(*args, **kwargs) + # pylint: disable=try-except-raise + except (cherrypy.HTTPRedirect, cherrypy.NotFound, cherrypy.HTTPError): + raise + except (ViewCacheNoDataException, DashboardException) as error: + logger.exception('Dashboard Exception') + cherrypy.response.headers['Content-Type'] = 'application/json' + cherrypy.response.status = getattr(error, 'status', 400) + return json.dumps(serialize_dashboard_exception(error)).encode('utf-8') + except Exception as error: + logger.exception('Internal Server Error') + raise error + + +@contextmanager +def handle_cephfs_error(): + try: + yield + except cephfs.OSError as e: + raise DashboardException(e, component='cephfs') from e + + +@contextmanager +def handle_rbd_error(): + try: + yield + except rbd.OSError as e: + raise DashboardException(e, component='rbd') + except rbd.Error as e: + raise DashboardException(e, component='rbd', code=e.__class__.__name__) + + +@contextmanager +def handle_rados_error(component): + try: + yield + except rados.OSError as e: + raise DashboardException(e, component=component) + except rados.Error as e: + raise DashboardException(e, component=component, code=e.__class__.__name__) + + +@contextmanager +def handle_send_command_error(component): + try: + yield + except SendCommandError as e: + raise DashboardException(e, component=component) + + +@contextmanager +def handle_orchestrator_error(component): + try: + yield + except OrchestratorError as e: + raise DashboardException(e, component=component) + + +@contextmanager +def handle_error(component, http_status_code=None): + try: + yield + except Exception as e: # pylint: disable=broad-except + raise DashboardException(e, component=component, http_status_code=http_status_code) + + +@contextmanager +def handle_custom_error(component, http_status_code=None, exceptions=()): + try: + yield + except exceptions as e: + raise DashboardException(e, component=component, http_status_code=http_status_code) diff --git a/src/pybind/mgr/dashboard/services/iscsi_cli.py b/src/pybind/mgr/dashboard/services/iscsi_cli.py new file mode 100644 index 000000000..71e6c9f3d --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_cli.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import errno +import json +from typing import Optional + +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand + +from ..rest_client import RequestException +from .iscsi_client import IscsiClient +from .iscsi_config import InvalidServiceUrl, IscsiGatewayAlreadyExists, \ + IscsiGatewayDoesNotExist, IscsiGatewaysConfig, \ + ManagedByOrchestratorException + + +@CLIReadCommand('dashboard iscsi-gateway-list') +def list_iscsi_gateways(_): + ''' + List iSCSI gateways + ''' + return 0, json.dumps(IscsiGatewaysConfig.get_gateways_config()), '' + + +@CLIWriteCommand('dashboard iscsi-gateway-add') +@CLICheckNonemptyFileInput(desc='iSCSI gateway configuration') +def add_iscsi_gateway(_, inbuf, name: Optional[str] = None): + ''' + Add iSCSI gateway configuration. Gateway URL read from -i + ''' + service_url = inbuf + try: + IscsiGatewaysConfig.validate_service_url(service_url) + if name is None: + name = IscsiClient.instance(service_url=service_url).get_hostname()['data'] + IscsiGatewaysConfig.add_gateway(name, service_url) + return 0, 'Success', '' + except IscsiGatewayAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + except InvalidServiceUrl as ex: + return -errno.EINVAL, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) + except RequestException as ex: + return -errno.EINVAL, '', str(ex) + + +@CLIWriteCommand('dashboard iscsi-gateway-rm') +def remove_iscsi_gateway(_, name: str): + ''' + Remove iSCSI gateway configuration + ''' + try: + IscsiGatewaysConfig.remove_gateway(name) + return 0, 'Success', '' + except IscsiGatewayDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) diff --git a/src/pybind/mgr/dashboard/services/iscsi_client.py b/src/pybind/mgr/dashboard/services/iscsi_client.py new file mode 100644 index 000000000..9cc2600e1 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_client.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-public-methods +from __future__ import absolute_import + +import json +import logging + +from requests.auth import HTTPBasicAuth + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from ..rest_client import RestClient +from ..settings import Settings +from .iscsi_config import IscsiGatewaysConfig + +logger = logging.getLogger('iscsi_client') + + +class IscsiClient(RestClient): + _CLIENT_NAME = 'iscsi' + _instances = {} # type: dict + + service_url = None + gateway_name = None + + @classmethod + def instance(cls, gateway_name=None, service_url=None): + if not service_url: + if not gateway_name: + gateway_name = list(IscsiGatewaysConfig.get_gateways_config()['gateways'].keys())[0] + gateways_config = IscsiGatewaysConfig.get_gateway_config(gateway_name) + service_url = gateways_config['service_url'] + + instance = cls._instances.get(gateway_name) + if not instance or service_url != instance.service_url or \ + instance.session.verify != Settings.ISCSI_API_SSL_VERIFICATION: + url = urlparse(service_url) + ssl = url.scheme == 'https' + host = url.hostname + port = url.port + username = url.username + password = url.password + if not port: + port = 443 if ssl else 80 + + auth = HTTPBasicAuth(username, password) + instance = IscsiClient(host, port, IscsiClient._CLIENT_NAME, ssl, + auth, Settings.ISCSI_API_SSL_VERIFICATION) + instance.service_url = service_url + instance.gateway_name = gateway_name + if gateway_name: + cls._instances[gateway_name] = instance + + return instance + + @RestClient.api_get('/api/_ping') + def ping(self, request=None): + return request() + + @RestClient.api_get('/api/settings') + def get_settings(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/ip_addresses') + def get_ip_addresses(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/hostname') + def get_hostname(self, request=None): + return request() + + @RestClient.api_get('/api/config') + def get_config(self, request=None): + return request({ + 'decrypt_passwords': True + }) + + @RestClient.api_put('/api/target/{target_iqn}') + def create_target(self, target_iqn, target_controls, request=None): + logger.debug("[%s] Creating target: %s", self.gateway_name, target_iqn) + return request({ + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_delete('/api/target/{target_iqn}') + def delete_target(self, target_iqn, request=None): + logger.debug("[%s] Deleting target: %s", self.gateway_name, target_iqn) + return request() + + @RestClient.api_put('/api/target/{target_iqn}') + def reconfigure_target(self, target_iqn, target_controls, request=None): + logger.debug("[%s] Reconfiguring target: %s", self.gateway_name, target_iqn) + return request({ + 'mode': 'reconfigure', + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_put('/api/gateway/{target_iqn}/{gateway_name}') + def create_gateway(self, target_iqn, gateway_name, ip_address, request=None): + logger.debug("[%s] Creating gateway: %s/%s", self.gateway_name, target_iqn, + gateway_name) + return request({ + 'ip_address': ','.join(ip_address), + 'skipchecks': 'true' + }) + + @RestClient.api_get('/api/gatewayinfo') + def get_gatewayinfo(self, request=None): + return request() + + @RestClient.api_delete('/api/gateway/{target_iqn}/{gateway_name}') + def delete_gateway(self, target_iqn, gateway_name, request=None): + logger.debug("Deleting gateway: %s/%s", target_iqn, gateway_name) + return request() + + @RestClient.api_put('/api/disk/{pool}/{image}') + def create_disk(self, pool, image, backstore, wwn, request=None): + logger.debug("[%s] Creating disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'mode': 'create', + 'backstore': backstore, + 'wwn': wwn + }) + + @RestClient.api_delete('/api/disk/{pool}/{image}') + def delete_disk(self, pool, image, request=None): + logger.debug("[%s] Deleting disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'preserve_image': 'true' + }) + + @RestClient.api_put('/api/disk/{pool}/{image}') + def reconfigure_disk(self, pool, image, controls, request=None): + logger.debug("[%s] Reconfiguring disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'controls': json.dumps(controls), + 'mode': 'reconfigure' + }) + + @RestClient.api_put('/api/targetlun/{target_iqn}') + def create_target_lun(self, target_iqn, image_id, lun, request=None): + logger.debug("[%s] Creating target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id, + 'lun_id': lun + }) + + @RestClient.api_delete('/api/targetlun/{target_iqn}') + def delete_target_lun(self, target_iqn, image_id, request=None): + logger.debug("[%s] Deleting target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/client/{target_iqn}/{client_iqn}') + def create_client(self, target_iqn, client_iqn, request=None): + logger.debug("[%s] Creating client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_delete('/api/client/{target_iqn}/{client_iqn}') + def delete_client(self, target_iqn, client_iqn, request=None): + logger.debug("[%s] Deleting client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_put('/api/clientlun/{target_iqn}/{client_iqn}') + def create_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("[%s] Creating client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_delete('/api/clientlun/{target_iqn}/{client_iqn}') + def delete_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("iSCSI[%s] Deleting client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/clientauth/{target_iqn}/{client_iqn}') + def create_client_auth(self, target_iqn, client_iqn, username, password, mutual_username, + mutual_password, request=None): + logger.debug("[%s] Creating client auth: %s/%s/%s/%s/%s/%s", + self.gateway_name, target_iqn, client_iqn, username, password, mutual_username, + mutual_password) + return request({ + 'username': username, + 'password': password, + 'mutual_username': mutual_username, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def create_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("[%s] Creating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def update_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("iSCSI[%s] Updating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'action': 'remove', + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_delete('/api/hostgroup/{target_iqn}/{group_name}') + def delete_group(self, target_iqn, group_name, request=None): + logger.debug("[%s] Deleting group: %s/%s", self.gateway_name, target_iqn, group_name) + return request() + + @RestClient.api_put('/api/discoveryauth') + def update_discoveryauth(self, user, password, mutual_user, mutual_password, request=None): + logger.debug("[%s] Updating discoveryauth: %s/%s/%s/%s", self.gateway_name, user, + password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetacl(self, target_iqn, action, request=None): + logger.debug("[%s] Updating targetacl: %s/%s", self.gateway_name, target_iqn, action) + return request({ + 'action': action + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetauth(self, target_iqn, user, password, mutual_user, mutual_password, + request=None): + logger.debug("[%s] Updating targetauth: %s/%s/%s/%s/%s", self.gateway_name, + target_iqn, user, password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_get('/api/targetinfo/{target_iqn}') + def get_targetinfo(self, target_iqn, request=None): + # pylint: disable=unused-argument + return request() + + @RestClient.api_get('/api/clientinfo/{target_iqn}/{client_iqn}') + def get_clientinfo(self, target_iqn, client_iqn, request=None): + # pylint: disable=unused-argument + return request() diff --git a/src/pybind/mgr/dashboard/services/iscsi_config.py b/src/pybind/mgr/dashboard/services/iscsi_config.py new file mode 100644 index 000000000..7cdd5cd02 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_config.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from .. import mgr + + +class IscsiGatewayAlreadyExists(Exception): + def __init__(self, gateway_name): + super(IscsiGatewayAlreadyExists, self).__init__( + "iSCSI gateway '{}' already exists".format(gateway_name)) + + +class IscsiGatewayDoesNotExist(Exception): + def __init__(self, hostname): + super(IscsiGatewayDoesNotExist, self).__init__( + "iSCSI gateway '{}' does not exist".format(hostname)) + + +class InvalidServiceUrl(Exception): + def __init__(self, service_url): + super(InvalidServiceUrl, self).__init__( + "Invalid service URL '{}'. " + "Valid format: '://:@[:port]'.".format(service_url)) + + +class ManagedByOrchestratorException(Exception): + def __init__(self): + super(ManagedByOrchestratorException, self).__init__( + "iSCSI configuration is managed by the orchestrator") + + +_ISCSI_STORE_KEY = "_iscsi_config" + + +class IscsiGatewaysConfig(object): + @classmethod + def _load_config_from_store(cls): + json_db = mgr.get_store(_ISCSI_STORE_KEY, + '{"gateways": {}}') + config = json.loads(json_db) + cls.update_iscsi_config(config) + return config + + @classmethod + def update_iscsi_config(cls, config): + """ + Since `ceph-iscsi` config v10, gateway names were renamed from host short name to FQDN. + If Ceph Dashboard were configured before v10, we try to update our internal gateways + database automatically. + """ + for gateway_name, gateway_config in list(config['gateways'].items()): + if '.' not in gateway_name: + from ..rest_client import RequestException + from .iscsi_client import IscsiClient # pylint: disable=cyclic-import + try: + service_url = gateway_config['service_url'] + new_gateway_name = IscsiClient.instance( + service_url=service_url).get_hostname()['data'] + if gateway_name != new_gateway_name: + config['gateways'][new_gateway_name] = gateway_config + del config['gateways'][gateway_name] + cls._save_config(config) + except RequestException: + # If gateway is not acessible, it should be removed manually + # or we will try to update automatically next time + continue + + @classmethod + def _save_config(cls, config): + mgr.set_store(_ISCSI_STORE_KEY, json.dumps(config)) + + @classmethod + def validate_service_url(cls, service_url): + url = urlparse(service_url) + if not url.scheme or not url.hostname or not url.username or not url.password: + raise InvalidServiceUrl(service_url) + + @classmethod + def add_gateway(cls, name, service_url): + config = cls.get_gateways_config() + if name in config: + raise IscsiGatewayAlreadyExists(name) + IscsiGatewaysConfig.validate_service_url(service_url) + config['gateways'][name] = {'service_url': service_url} + cls._save_config(config) + + @classmethod + def remove_gateway(cls, name): + config = cls._load_config_from_store() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + + del config['gateways'][name] + cls._save_config(config) + + @classmethod + def get_gateways_config(cls): + return cls._load_config_from_store() + + @classmethod + def get_gateway_config(cls, name): + config = IscsiGatewaysConfig.get_gateways_config() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + return config['gateways'][name] diff --git a/src/pybind/mgr/dashboard/services/orchestrator.py b/src/pybind/mgr/dashboard/services/orchestrator.py new file mode 100644 index 000000000..3ca9a0f37 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/orchestrator.py @@ -0,0 +1,232 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import logging +from functools import wraps +from typing import Any, Dict, List, Optional + +from ceph.deployment.service_spec import ServiceSpec +from orchestrator import DaemonDescription, DeviceLightLoc, HostSpec, \ + InventoryFilter, OrchestratorClientMixin, OrchestratorError, OrchResult, \ + ServiceDescription, raise_if_exception + +from .. import mgr + +logger = logging.getLogger('orchestrator') + + +# pylint: disable=abstract-method +class OrchestratorAPI(OrchestratorClientMixin): + def __init__(self): + super(OrchestratorAPI, self).__init__() + self.set_mgr(mgr) # type: ignore + + def status(self): + try: + status, message, _module_details = super().available() + logger.info("is orchestrator available: %s, %s", status, message) + return dict(available=status, message=message) + except (RuntimeError, OrchestratorError, ImportError) as e: + return dict( + available=False, + message='Orchestrator is unavailable: {}'.format(str(e))) + + +def wait_api_result(method): + @wraps(method) + def inner(self, *args, **kwargs): + completion = method(self, *args, **kwargs) + raise_if_exception(completion) + return completion.result + return inner + + +class ResourceManager(object): + def __init__(self, api): + self.api = api + + +class HostManger(ResourceManager): + @wait_api_result + def list(self) -> List[HostSpec]: + return self.api.get_hosts() + + @wait_api_result + def enter_maintenance(self, hostname: str, force: bool = False): + return self.api.enter_host_maintenance(hostname, force) + + @wait_api_result + def exit_maintenance(self, hostname: str): + return self.api.exit_host_maintenance(hostname) + + def get(self, hostname: str) -> Optional[HostSpec]: + hosts = [host for host in self.list() if host.hostname == hostname] + return hosts[0] if hosts else None + + @wait_api_result + def add(self, hostname: str, addr: str, labels: List[str]): + return self.api.add_host(HostSpec(hostname, addr=addr, labels=labels)) + + @wait_api_result + def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: + return self.api.get_facts(hostname) + + @wait_api_result + def remove(self, hostname: str): + return self.api.remove_host(hostname) + + @wait_api_result + def add_label(self, host: str, label: str) -> OrchResult[str]: + return self.api.add_host_label(host, label) + + @wait_api_result + def remove_label(self, host: str, label: str) -> OrchResult[str]: + return self.api.remove_host_label(host, label) + + @wait_api_result + def drain(self, hostname: str): + return self.api.drain_host(hostname) + + +class InventoryManager(ResourceManager): + @wait_api_result + def list(self, hosts=None, refresh=False): + host_filter = InventoryFilter(hosts=hosts) if hosts else None + return self.api.get_inventory(host_filter=host_filter, refresh=refresh) + + +class ServiceManager(ResourceManager): + @wait_api_result + def list(self, + service_type: Optional[str] = None, + service_name: Optional[str] = None) -> List[ServiceDescription]: + return self.api.describe_service(service_type, service_name) + + @wait_api_result + def get(self, service_name: str) -> ServiceDescription: + return self.api.describe_service(None, service_name) + + @wait_api_result + def list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + hostname: Optional[str] = None) -> List[DaemonDescription]: + return self.api.list_daemons(service_name=service_name, + daemon_type=daemon_type, + host=hostname) + + def reload(self, service_type, service_ids): + if not isinstance(service_ids, list): + service_ids = [service_ids] + + completion_list = [ + self.api.service_action('reload', service_type, service_name, + service_id) + for service_name, service_id in service_ids + ] + self.api.orchestrator_wait(completion_list) + for c in completion_list: + raise_if_exception(c) + + @wait_api_result + def apply(self, + service_spec: Dict, + no_overwrite: Optional[bool] = False) -> OrchResult[List[str]]: + spec = ServiceSpec.from_json(service_spec) + return self.api.apply([spec], no_overwrite) + + @wait_api_result + def remove(self, service_name: str) -> List[str]: + return self.api.remove_service(service_name) + + +class OsdManager(ResourceManager): + @wait_api_result + def create(self, drive_group_specs): + return self.api.apply_drivegroups(drive_group_specs) + + @wait_api_result + def remove(self, osd_ids, replace=False, force=False): + return self.api.remove_osds(osd_ids, replace, force) + + @wait_api_result + def removing_status(self): + return self.api.remove_osds_status() + + +class DaemonManager(ResourceManager): + @wait_api_result + def action(self, daemon_name='', action='', image=None): + return self.api.daemon_action(daemon_name=daemon_name, action=action, image=image) + + +class OrchClient(object): + + _instance = None + + @classmethod + def instance(cls): + # type: () -> OrchClient + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def __init__(self): + self.api = OrchestratorAPI() + + self.hosts = HostManger(self.api) + self.inventory = InventoryManager(self.api) + self.services = ServiceManager(self.api) + self.osds = OsdManager(self.api) + self.daemons = DaemonManager(self.api) + + def available(self, features: Optional[List[str]] = None) -> bool: + available = self.status()['available'] + if available and features is not None: + return not self.get_missing_features(features) + return available + + def status(self) -> Dict[str, Any]: + status = self.api.status() + status['features'] = {} + if status['available']: + status['features'] = self.api.get_feature_set() + return status + + def get_missing_features(self, features: List[str]) -> List[str]: + supported_features = {k for k, v in self.api.get_feature_set().items() if v['available']} + return list(set(features) - supported_features) + + @wait_api_result + def blink_device_light(self, hostname, device, ident_fault, on): + # type: (str, str, str, bool) -> OrchResult[List[str]] + return self.api.blink_device_light( + ident_fault, on, [DeviceLightLoc(hostname, device, device)]) + + +class OrchFeature(object): + HOST_LIST = 'get_hosts' + HOST_ADD = 'add_host' + HOST_REMOVE = 'remove_host' + HOST_LABEL_ADD = 'add_host_label' + HOST_LABEL_REMOVE = 'remove_host_label' + HOST_MAINTENANCE_ENTER = 'enter_host_maintenance' + HOST_MAINTENANCE_EXIT = 'exit_host_maintenance' + HOST_DRAIN = 'drain_host' + + SERVICE_LIST = 'describe_service' + SERVICE_CREATE = 'apply' + SERVICE_EDIT = 'apply' + SERVICE_DELETE = 'remove_service' + SERVICE_RELOAD = 'service_action' + DAEMON_LIST = 'list_daemons' + + OSD_GET_REMOVE_STATUS = 'remove_osds_status' + + OSD_CREATE = 'apply_drivegroups' + OSD_DELETE = 'remove_osds' + + DEVICE_LIST = 'get_inventory' + DEVICE_BLINK_LIGHT = 'blink_device_light' + + DAEMON_ACTION = 'daemon_action' diff --git a/src/pybind/mgr/dashboard/services/osd.py b/src/pybind/mgr/dashboard/services/osd.py new file mode 100644 index 000000000..12db733cc --- /dev/null +++ b/src/pybind/mgr/dashboard/services/osd.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +from enum import Enum + + +class OsdDeploymentOptions(str, Enum): + COST_CAPACITY = 'cost_capacity' + THROUGHPUT = 'throughput_optimized' + IOPS = 'iops_optimized' + + +class HostStorageSummary: + def __init__(self, name: str, title=None, desc=None, available=False, + capacity=0, used=0, hdd_used=0, ssd_used=0, nvme_used=0): + self.name = name + self.title = title + self.desc = desc + self.available = available + self.capacity = capacity + self.used = used + self.hdd_used = hdd_used + self.ssd_used = ssd_used + self.nvme_used = nvme_used + + def as_dict(self): + return self.__dict__ diff --git a/src/pybind/mgr/dashboard/services/progress.py b/src/pybind/mgr/dashboard/services/progress.py new file mode 100644 index 000000000..96272beff --- /dev/null +++ b/src/pybind/mgr/dashboard/services/progress.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +''' +Progress Mgr Module Helper + +This python module implements helper methods to retrieve the +executing and completed tasks tacked by the progress mgr module +using the same structure of dashboard tasks +''' + +from __future__ import absolute_import + +import logging +from datetime import datetime + +from .. import mgr +from . import rbd # pylint: disable=no-name-in-module + +logger = logging.getLogger('progress') + + +def _progress_event_to_dashboard_task_common(event, task): + if event['refs'] and isinstance(event['refs'], dict): + refs = event['refs'] + if refs['origin'] == "rbd_support": + # rbd mgr module event, we can transform this event into an rbd dashboard task + action_map = { + 'remove': "delete", + 'flatten': "flatten", + 'trash remove': "trash/remove" + } + action = action_map.get(refs['action'], refs['action']) + metadata = {} + if 'image_name' in refs: + metadata['image_spec'] = rbd.get_image_spec(refs['pool_name'], + refs['pool_namespace'], + refs['image_name']) + else: + metadata['image_id_spec'] = rbd.get_image_spec(refs['pool_name'], + refs['pool_namespace'], + refs['image_id']) + task.update({ + 'name': "rbd/{}".format(action), + 'metadata': metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(event["started_at"]) + .isoformat()), + }) + return + + task.update({ + # we're prepending the "progress/" prefix to tag tasks that come + # from the progress module + 'name': "progress/{}".format(event['message']), + 'metadata': dict(event.get('refs', {})), + 'begin_time': "{}Z".format(datetime.fromtimestamp(event["started_at"]) + .isoformat()), + }) + + +def _progress_event_to_dashboard_task(event, completed=False): + task = {} + _progress_event_to_dashboard_task_common(event, task) + if not completed: + task.update({ + 'progress': int(100 * event['progress']) + }) + else: + task.update({ + 'end_time': "{}Z".format(datetime.fromtimestamp(event['finished_at']) + .isoformat()), + 'duration': event['finished_at'] - event['started_at'], + 'progress': 100, + 'success': 'failed' not in event, + 'ret_value': None, + 'exception': {'detail': event['failure_message']} if 'failed' in event else None + }) + return task + + +def get_progress_tasks(): + executing_t = [] + finished_t = [] + progress_events = mgr.remote('progress', "_json") + + for ev in progress_events['events']: + logger.debug("event=%s", ev) + executing_t.append(_progress_event_to_dashboard_task(ev)) + + for ev in progress_events['completed']: + logger.debug("finished event=%s", ev) + finished_t.append(_progress_event_to_dashboard_task(ev, True)) + + return executing_t, finished_t diff --git a/src/pybind/mgr/dashboard/services/rbd.py b/src/pybind/mgr/dashboard/services/rbd.py new file mode 100644 index 000000000..f14aa244f --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rbd.py @@ -0,0 +1,580 @@ +# -*- coding: utf-8 -*- +# pylint: disable=unused-argument +from __future__ import absolute_import + +import errno +import json +from enum import IntEnum + +import cherrypy +import rados +import rbd + +from .. import mgr +from ..exceptions import DashboardException +from ..plugins.ttl_cache import ttl_cache +from .ceph_service import CephService + +try: + from typing import List, Optional +except ImportError: + pass # For typing only + + +RBD_FEATURES_NAME_MAPPING = { + rbd.RBD_FEATURE_LAYERING: "layering", + rbd.RBD_FEATURE_STRIPINGV2: "striping", + rbd.RBD_FEATURE_EXCLUSIVE_LOCK: "exclusive-lock", + rbd.RBD_FEATURE_OBJECT_MAP: "object-map", + rbd.RBD_FEATURE_FAST_DIFF: "fast-diff", + rbd.RBD_FEATURE_DEEP_FLATTEN: "deep-flatten", + rbd.RBD_FEATURE_JOURNALING: "journaling", + rbd.RBD_FEATURE_DATA_POOL: "data-pool", + rbd.RBD_FEATURE_OPERATIONS: "operations", +} + + +class MIRROR_IMAGE_MODE(IntEnum): + journal = rbd.RBD_MIRROR_IMAGE_MODE_JOURNAL + snapshot = rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT + + +def _rbd_support_remote(method_name: str, *args, level_spec: str = '', + interval: str = '', start_time: str = '', **kwargs): + # pacific specific implementation of rbd mirror schedule snapshot remote methods + prefixes = { + 'mirror_snapshot_schedule_status': 'rbd mirror snapshot schedule status', + 'mirror_snapshot_schedule_add': 'rbd mirror snapshot schedule add', + 'mirror_snapshot_schedule_remove': 'rbd mirror snapshot schedule remove', + } + cmd = { + 'level_spec': level_spec, + 'prefix': prefixes[method_name] + } + if interval: + cmd['interval'] = interval + if start_time: + cmd['start_time'] = start_time + + try: + res = mgr.remote('rbd_support', 'handle_command', None, cmd, *args, **kwargs) + return res + except ImportError as ie: + raise DashboardException(f'rbd_support module not found {ie}') + except RuntimeError as ie: + raise DashboardException(f'rbd_support.{method_name} error: {ie}') + except ValueError as ie: + raise DashboardException(f'rbd_support handle_command {prefixes[method_name]} error: {ie}') + + +def format_bitmask(features): + """ + Formats the bitmask: + + @DISABLEDOCTEST: >>> format_bitmask(45) + ['deep-flatten', 'exclusive-lock', 'layering', 'object-map'] + """ + names = [val for key, val in RBD_FEATURES_NAME_MAPPING.items() + if key & features == key] + return sorted(names) + + +def format_features(features): + """ + Converts the features list to bitmask: + + @DISABLEDOCTEST: >>> format_features(['deep-flatten', 'exclusive-lock', + 'layering', 'object-map']) + 45 + + @DISABLEDOCTEST: >>> format_features(None) is None + True + + @DISABLEDOCTEST: >>> format_features('deep-flatten, exclusive-lock') + 32 + """ + if isinstance(features, str): + features = features.split(',') + + if not isinstance(features, list): + return None + + res = 0 + for key, value in RBD_FEATURES_NAME_MAPPING.items(): + if value in features: + res = key | res + return res + + +def get_image_spec(pool_name, namespace, rbd_name): + namespace = '{}/'.format(namespace) if namespace else '' + return '{}/{}{}'.format(pool_name, namespace, rbd_name) + + +def parse_image_spec(image_spec): + namespace_spec, image_name = image_spec.rsplit('/', 1) + if '/' in namespace_spec: + pool_name, namespace = namespace_spec.rsplit('/', 1) + else: + pool_name, namespace = namespace_spec, None + return pool_name, namespace, image_name + + +def rbd_call(pool_name, namespace, func, *args, **kwargs): + with mgr.rados.open_ioctx(pool_name) as ioctx: + ioctx.set_namespace(namespace if namespace is not None else '') + return func(ioctx, *args, **kwargs) + + +def rbd_image_call(pool_name, namespace, image_name, func, *args, **kwargs): + def _ioctx_func(ioctx, image_name, func, *args, **kwargs): + with rbd.Image(ioctx, image_name) as img: + return func(ioctx, img, *args, **kwargs) + + return rbd_call(pool_name, namespace, _ioctx_func, image_name, func, *args, **kwargs) + + +class RbdConfiguration(object): + _rbd = rbd.RBD() + + def __init__(self, pool_name: str = '', namespace: str = '', image_name: str = '', + pool_ioctx: Optional[rados.Ioctx] = None, image_ioctx: Optional[rbd.Image] = None): + assert bool(pool_name) != bool(pool_ioctx) # xor + self._pool_name = pool_name + self._namespace = namespace if namespace is not None else '' + self._image_name = image_name + self._pool_ioctx = pool_ioctx + self._image_ioctx = image_ioctx + + @staticmethod + def _ensure_prefix(option): + # type: (str) -> str + return option if option.startswith('conf_') else 'conf_' + option + + def list(self): + # type: () -> List[dict] + def _list(ioctx): + if self._image_name: # image config + try: + # No need to open the context of the image again + # if we already did open it. + if self._image_ioctx: + result = self._image_ioctx.config_list() + else: + with rbd.Image(ioctx, self._image_name) as image: + result = image.config_list() + except rbd.ImageNotFound: + result = [] + else: # pool config + pg_status = list(CephService.get_pool_pg_status(self._pool_name).keys()) + if len(pg_status) == 1 and 'incomplete' in pg_status[0]: + # If config_list would be called with ioctx if it's a bad pool, + # the dashboard would stop working, waiting for the response + # that would not happen. + # + # This is only a workaround for https://tracker.ceph.com/issues/43771 which + # already got rejected as not worth the effort. + # + # Are more complete workaround for the dashboard will be implemented with + # https://tracker.ceph.com/issues/44224 + # + # @TODO: If #44224 is addressed remove this workaround + return [] + result = self._rbd.config_list(ioctx) + return list(result) + + if self._pool_name: + ioctx = mgr.rados.open_ioctx(self._pool_name) + ioctx.set_namespace(self._namespace) + else: + ioctx = self._pool_ioctx + + return _list(ioctx) + + def get(self, option_name): + # type: (str) -> str + option_name = self._ensure_prefix(option_name) + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + pool_ioctx.set_namespace(self._namespace) + if self._image_name: + with rbd.Image(pool_ioctx, self._image_name) as image: + return image.metadata_get(option_name) + return self._rbd.pool_metadata_get(pool_ioctx, option_name) + + def set(self, option_name, option_value): + # type: (str, str) -> None + + option_value = str(option_value) + option_name = self._ensure_prefix(option_name) + + pool_ioctx = self._pool_ioctx + if self._pool_name: # open ioctx + pool_ioctx = mgr.rados.open_ioctx(self._pool_name) + pool_ioctx.__enter__() # type: ignore + pool_ioctx.set_namespace(self._namespace) # type: ignore + + image_ioctx = self._image_ioctx + if self._image_name: + image_ioctx = rbd.Image(pool_ioctx, self._image_name) + image_ioctx.__enter__() # type: ignore + + if image_ioctx: + image_ioctx.metadata_set(option_name, option_value) # type: ignore + else: + self._rbd.pool_metadata_set(pool_ioctx, option_name, option_value) + + if self._image_name: # Name provided, so we opened it and now have to close it + image_ioctx.__exit__(None, None, None) # type: ignore + if self._pool_name: + pool_ioctx.__exit__(None, None, None) # type: ignore + + def remove(self, option_name): + """ + Removes an option by name. Will not raise an error, if the option hasn't been found. + :type option_name str + """ + def _remove(ioctx): + try: + if self._image_name: + with rbd.Image(ioctx, self._image_name) as image: + image.metadata_remove(option_name) + else: + self._rbd.pool_metadata_remove(ioctx, option_name) + except KeyError: + pass + + option_name = self._ensure_prefix(option_name) + + if self._pool_name: + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + pool_ioctx.set_namespace(self._namespace) + _remove(pool_ioctx) + else: + _remove(self._pool_ioctx) + + def set_configuration(self, configuration): + if configuration: + for option_name, option_value in configuration.items(): + if option_value is not None: + self.set(option_name, option_value) + else: + self.remove(option_name) + + +class RbdService(object): + _rbd_inst = rbd.RBD() + + @classmethod + def _rbd_disk_usage(cls, image, snaps, whole_object=True): + class DUCallback(object): + def __init__(self): + self.used_size = 0 + + def __call__(self, offset, length, exists): + if exists: + self.used_size += length + + snap_map = {} + prev_snap = None + total_used_size = 0 + for _, size, name in snaps: + image.set_snap(name) + du_callb = DUCallback() + image.diff_iterate(0, size, prev_snap, du_callb, + whole_object=whole_object) + snap_map[name] = du_callb.used_size + total_used_size += du_callb.used_size + prev_snap = name + + return total_used_size, snap_map + + @classmethod + def _rbd_image(cls, ioctx, pool_name, namespace, image_name): # pylint: disable=R0912 + with rbd.Image(ioctx, image_name) as img: + stat = img.stat() + mirror_mode = img.mirror_image_get_mode() + if mirror_mode == rbd.RBD_MIRROR_IMAGE_MODE_JOURNAL: + stat['mirror_mode'] = 'journal' + elif mirror_mode == rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + stat['mirror_mode'] = 'snapshot' + schedule_status = json.loads(_rbd_support_remote( + 'mirror_snapshot_schedule_status')[1]) + for scheduled_image in schedule_status['scheduled_images']: + if scheduled_image['image'] == get_image_spec(pool_name, namespace, image_name): + stat['schedule_info'] = scheduled_image + else: + stat['mirror_mode'] = 'unknown' + + stat['name'] = image_name + if img.old_format(): + stat['unique_id'] = get_image_spec(pool_name, namespace, stat['block_name_prefix']) + stat['id'] = stat['unique_id'] + stat['image_format'] = 1 + else: + stat['unique_id'] = get_image_spec(pool_name, namespace, img.id()) + stat['id'] = img.id() + stat['image_format'] = 2 + + stat['pool_name'] = pool_name + stat['namespace'] = namespace + features = img.features() + stat['features'] = features + stat['features_name'] = format_bitmask(features) + + # the following keys are deprecated + del stat['parent_pool'] + del stat['parent_name'] + + stat['timestamp'] = "{}Z".format(img.create_timestamp() + .isoformat()) + + stat['stripe_count'] = img.stripe_count() + stat['stripe_unit'] = img.stripe_unit() + + data_pool_name = CephService.get_pool_name_from_id( + img.data_pool_id()) + if data_pool_name == pool_name: + data_pool_name = None + stat['data_pool'] = data_pool_name + + try: + stat['parent'] = img.get_parent_image_spec() + except rbd.ImageNotFound: + # no parent image + stat['parent'] = None + + # snapshots + stat['snapshots'] = [] + for snap in img.list_snaps(): + try: + snap['mirror_mode'] = MIRROR_IMAGE_MODE(img.mirror_image_get_mode()).name + except ValueError as ex: + raise DashboardException(f'Unknown RBD Mirror mode: {ex}') + + snap['timestamp'] = "{}Z".format( + img.get_snap_timestamp(snap['id']).isoformat()) + + snap['is_protected'] = None + if mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + snap['is_protected'] = img.is_protected_snap(snap['name']) + snap['used_bytes'] = None + snap['children'] = [] + + if mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + img.set_snap(snap['name']) + for child_pool_name, child_image_name in img.list_children(): + snap['children'].append({ + 'pool_name': child_pool_name, + 'image_name': child_image_name + }) + stat['snapshots'].append(snap) + + # disk usage + img_flags = img.flags() + if 'fast-diff' in stat['features_name'] and \ + not rbd.RBD_FLAG_FAST_DIFF_INVALID & img_flags and \ + mirror_mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + snaps = [(s['id'], s['size'], s['name']) + for s in stat['snapshots']] + snaps.sort(key=lambda s: s[0]) + snaps += [(snaps[-1][0] + 1 if snaps else 0, stat['size'], None)] + total_prov_bytes, snaps_prov_bytes = cls._rbd_disk_usage( + img, snaps, True) + stat['total_disk_usage'] = total_prov_bytes + for snap, prov_bytes in snaps_prov_bytes.items(): + if snap is None: + stat['disk_usage'] = prov_bytes + continue + for ss in stat['snapshots']: + if ss['name'] == snap: + ss['disk_usage'] = prov_bytes + break + else: + stat['total_disk_usage'] = None + stat['disk_usage'] = None + + stat['configuration'] = RbdConfiguration( + pool_ioctx=ioctx, image_name=image_name, image_ioctx=img).list() + + return stat + + @classmethod + @ttl_cache(10) + def get_ioctx(cls, pool_name, namespace=''): + ioctx = mgr.rados.open_ioctx(pool_name) + ioctx.set_namespace(namespace) + return ioctx + + @classmethod + @ttl_cache(30) + def _rbd_image_refs(cls, pool_name, namespace=''): + # We add and set the namespace here so that we cache by ioctx and namespace. + images = [] + ioctx = cls.get_ioctx(pool_name, namespace) + images = cls._rbd_inst.list2(ioctx) + return images + + @classmethod + @ttl_cache(30) + def _pool_namespaces(cls, pool_name, namespace=None): + namespaces = [] + if namespace: + namespaces = [namespace] + else: + ioctx = cls.get_ioctx(pool_name, namespace=rados.LIBRADOS_ALL_NSPACES) + namespaces = cls._rbd_inst.namespace_list(ioctx) + # images without namespace + namespaces.append('') + return namespaces + + @classmethod + def _rbd_image_stat(cls, ioctx, pool_name, namespace, image_name): + return cls._rbd_image(ioctx, pool_name, namespace, image_name) + + @classmethod + def _rbd_image_stat_removing(cls, ioctx, pool_name, namespace, image_id): + img = cls._rbd_inst.trash_get(ioctx, image_id) + img_spec = get_image_spec(pool_name, namespace, image_id) + + if img['source'] == 'REMOVING': + img['unique_id'] = img_spec + img['pool_name'] = pool_name + img['namespace'] = namespace + img['deletion_time'] = "{}Z".format(img['deletion_time'].isoformat()) + img['deferment_end_time'] = "{}Z".format(img['deferment_end_time'].isoformat()) + return img + raise rbd.ImageNotFound('No image {} in status `REMOVING` found.'.format(img_spec), + errno=errno.ENOENT) + + @classmethod + def _rbd_pool_image_refs(cls, pool_names: List[str], namespace: Optional[str] = None): + joint_refs = [] + for pool in pool_names: + for current_namespace in cls._pool_namespaces(pool, namespace=namespace): + image_refs = cls._rbd_image_refs(pool, current_namespace) + for image in image_refs: + image['namespace'] = current_namespace + image['pool_name'] = pool + joint_refs.append(image) + return joint_refs + + @classmethod + def rbd_pool_list(cls, pool_names: List[str], namespace: Optional[str] = None, offset: int = 0, + limit: int = 5, search: str = '', sort: str = ''): + offset = int(offset) + limit = int(limit) + # let's use -1 to denotate we want ALL images for now. Iscsi currently gathers + # all images therefore, we need this. + if limit < -1: + raise DashboardException(msg=f'Wrong limit value {limit}', code=400) + + refs = cls._rbd_pool_image_refs(pool_names, namespace) + image_refs = [] + # transform to list so that we can count + for ref in refs: + if search in ref['name']: + image_refs.append(ref) + elif search in ref['pool_name']: + image_refs.append(ref) + elif search in ref['namespace']: + image_refs.append(ref) + + result = [] + end = offset + limit + if len(sort) < 2: + sort = '+name' + descending = sort[0] == '-' + sort_by = sort[1:] + if sort_by not in ['name', 'pool_name', 'namespace']: + sort_by = 'name' + if limit == -1: + end = len(image_refs) + for image_ref in sorted(image_refs, key=lambda v: v[sort_by], + reverse=descending)[offset:end]: + ioctx = cls.get_ioctx(image_ref['pool_name'], namespace=image_ref['namespace']) + try: + stat = cls._rbd_image_stat( + ioctx, image_ref['pool_name'], image_ref['namespace'], image_ref['name']) + except rbd.ImageNotFound: + # Check if the RBD has been deleted partially. This happens for example if + # the deletion process of the RBD has been started and was interrupted. + try: + stat = cls._rbd_image_stat_removing( + ioctx, image_ref['pool_name'], image_ref['namespace'], image_ref['id']) + except rbd.ImageNotFound: + continue + result.append(stat) + return result, len(image_refs) + + @classmethod + def get_image(cls, image_spec): + pool_name, namespace, image_name = parse_image_spec(image_spec) + ioctx = mgr.rados.open_ioctx(pool_name) + if namespace: + ioctx.set_namespace(namespace) + try: + return cls._rbd_image(ioctx, pool_name, namespace, image_name) + except rbd.ImageNotFound: + raise cherrypy.HTTPError(404, 'Image not found') + + +class RbdSnapshotService(object): + + @classmethod + def remove_snapshot(cls, image_spec, snapshot_name, unprotect=False): + def _remove_snapshot(ioctx, img, snapshot_name, unprotect): + if unprotect: + img.unprotect_snap(snapshot_name) + img.remove_snap(snapshot_name) + + pool_name, namespace, image_name = parse_image_spec(image_spec) + return rbd_image_call(pool_name, namespace, image_name, + _remove_snapshot, snapshot_name, unprotect) + + +class RBDSchedulerInterval: + def __init__(self, interval: str): + self.amount = int(interval[:-1]) + self.unit = interval[-1] + if self.unit not in 'mhd': + raise ValueError(f'Invalid interval unit {self.unit}') + + def __str__(self): + return f'{self.amount}{self.unit}' + + +class RbdMirroringService: + + @classmethod + def enable_image(cls, image_name: str, pool_name: str, namespace: str, mode: MIRROR_IMAGE_MODE): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_enable(mode)) + + @classmethod + def disable_image(cls, image_name: str, pool_name: str, namespace: str, force: bool = False): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_disable(force)) + + @classmethod + def promote_image(cls, image_name: str, pool_name: str, namespace: str, force: bool = False): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_promote(force)) + + @classmethod + def demote_image(cls, image_name: str, pool_name: str, namespace: str): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_demote()) + + @classmethod + def resync_image(cls, image_name: str, pool_name: str, namespace: str): + rbd_image_call(pool_name, namespace, image_name, + lambda ioctx, image: image.mirror_image_resync()) + + @classmethod + def snapshot_schedule_add(cls, image_spec: str, interval: str): + _rbd_support_remote('mirror_snapshot_schedule_add', level_spec=image_spec, + interval=str(RBDSchedulerInterval(interval))) + + @classmethod + def snapshot_schedule_remove(cls, image_spec: str): + _rbd_support_remote('mirror_snapshot_schedule_remove', level_spec=image_spec) diff --git a/src/pybind/mgr/dashboard/services/rgw_client.py b/src/pybind/mgr/dashboard/services/rgw_client.py new file mode 100644 index 000000000..4c401f2a4 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rgw_client.py @@ -0,0 +1,764 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import ipaddress +import json +import logging +import re +import xml.etree.ElementTree as ET # noqa: N814 +from distutils.util import strtobool +from subprocess import SubprocessError + +from mgr_util import build_url + +from .. import mgr +from ..awsauth import S3Auth +from ..exceptions import DashboardException +from ..rest_client import RequestException, RestClient +from ..settings import Settings +from ..tools import dict_contains_path, dict_get, json_str_to_object + +try: + from typing import Any, Dict, List, Optional, Tuple, Union +except ImportError: + pass # For typing only + +logger = logging.getLogger('rgw_client') + + +class NoRgwDaemonsException(Exception): + def __init__(self): + super().__init__('No RGW service is running.') + + +class NoCredentialsException(Exception): + def __init__(self): + super(NoCredentialsException, self).__init__( + 'No RGW credentials found, ' + 'please consult the documentation on how to enable RGW for ' + 'the dashboard.') + + +class RgwAdminException(Exception): + pass + + +class RgwDaemon: + """Simple representation of a daemon.""" + host: str + name: str + port: int + ssl: bool + realm_name: str + zonegroup_name: str + zone_name: str + + +def _get_daemons() -> Dict[str, RgwDaemon]: + """ + Retrieve RGW daemon info from MGR. + """ + service_map = mgr.get('service_map') + if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']): + raise NoRgwDaemonsException + + daemons = {} + daemon_map = service_map['services']['rgw']['daemons'] + for key in daemon_map.keys(): + if dict_contains_path(daemon_map[key], ['metadata', 'frontend_config#0']): + daemon = _determine_rgw_addr(daemon_map[key]) + daemon.name = daemon_map[key]['metadata']['id'] + daemon.realm_name = daemon_map[key]['metadata']['realm_name'] + daemon.zonegroup_name = daemon_map[key]['metadata']['zonegroup_name'] + daemon.zone_name = daemon_map[key]['metadata']['zone_name'] + daemons[daemon.name] = daemon + logger.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s', + daemon.host, daemon.port, str(daemon.ssl)) + if not daemons: + raise NoRgwDaemonsException + + return daemons + + +def _determine_rgw_addr(daemon_info: Dict[str, Any]) -> RgwDaemon: + """ + Parse RGW daemon info to determine the configured host (IP address) and port. + """ + daemon = RgwDaemon() + daemon.host = daemon_info['metadata']['hostname'] + daemon.port, daemon.ssl = _parse_frontend_config(daemon_info['metadata']['frontend_config#0']) + + return daemon + + +def _parse_addr(value) -> str: + """ + Get the IP address the RGW is running on. + + >>> _parse_addr('192.168.178.3:49774/1534999298') + '192.168.178.3' + + >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298') + '2001:db8:85a3::8a2e:370:7334' + + >>> _parse_addr('xyz') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + >>> _parse_addr('192.168.178.a:8080/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '192.168.178.a' found + + >>> _parse_addr('[2001:0db8:1234]:443/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '2001:0db8:1234' found + + >>> _parse_addr('2001:0db8::1234:49774/1534999298') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + :param value: The string to process. The syntax is ':/'. + :type: str + :raises LookupError if parsing fails to determine the IP address. + :return: The IP address. + :rtype: str + """ + match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value) + if match: + # IPv4: + # Group 0: 192.168.178.3:49774/1534999298 + # Group 3: 192.168.178.3 + # IPv6: + # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298 + # Group 1: [ + # Group 2: 2001:db8:85a3::8a2e:370:7334 + addr = match.group(3) if match.group(3) else match.group(2) + try: + ipaddress.ip_address(addr) + return addr + except ValueError: + raise LookupError('Invalid RGW address \'{}\' found'.format(addr)) + raise LookupError('Failed to determine RGW address') + + +def _parse_frontend_config(config) -> Tuple[int, bool]: + """ + Get the port the RGW is running on. Due the complexity of the + syntax not all variations are supported. + + If there are multiple (ssl_)ports/(ssl_)endpoints options, then + the first found option will be returned. + + Get more details about the configuration syntax here: + http://docs.ceph.com/en/latest/radosgw/frontends/ + https://civetweb.github.io/civetweb/UserManual.html + + :param config: The configuration string to parse. + :type config: str + :raises LookupError if parsing fails to determine the port. + :return: A tuple containing the port number and the information + whether SSL is used. + :rtype: (int, boolean) + """ + match = re.search(r'^(beast|civetweb)\s+.+$', config) + if match: + if match.group(1) == 'beast': + match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)', + config) + if match: + option_name = match.group(1) + if option_name in ['port', 'ssl_port']: + match = re.search(r'(\d+)', match.group(2)) + if match: + port = int(match.group(1)) + ssl = option_name == 'ssl_port' + return port, ssl + if option_name in ['endpoint', 'ssl_endpoint']: + match = re.search(r'([\d.]+|\[.+\])(:(\d+))?', + match.group(2)) # type: ignore + if match: + port = int(match.group(3)) if \ + match.group(2) is not None else 443 if \ + option_name == 'ssl_endpoint' else \ + 80 + ssl = option_name == 'ssl_endpoint' + return port, ssl + if match.group(1) == 'civetweb': # type: ignore + match = re.search(r'port=(.*:)?(\d+)(s)?', config) + if match: + port = int(match.group(2)) + ssl = match.group(3) == 's' + return port, ssl + raise LookupError('Failed to determine RGW port from "{}"'.format(config)) + + +def _parse_secrets(user: str, data: dict) -> Tuple[str, str]: + for key in data.get('keys', []): + if key.get('user') == user and data.get('system') in ['true', True]: + access_key = key.get('access_key') + secret_key = key.get('secret_key') + return access_key, secret_key + return '', '' + + +def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]: + access_key = '' + secret_key = '' + rgw_user_info_cmd = ['user', 'info', '--uid', user] + cmd_realm_option = ['--rgw-realm', realm] if realm else [] + if realm: + rgw_user_info_cmd += cmd_realm_option + try: + _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd) + if out: + access_key, secret_key = _parse_secrets(user, out) + if not access_key: + rgw_create_user_cmd = [ + 'user', 'create', + '--uid', user, + '--display-name', 'Ceph Dashboard', + '--system', + ] + cmd_realm_option + _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd) + if out: + access_key, secret_key = _parse_secrets(user, out) + if not access_key: + logger.error('Unable to create rgw user "%s": %s', user, err) + except SubprocessError as error: + logger.exception(error) + + return access_key, secret_key + + +def configure_rgw_credentials(): + logger.info('Configuring dashboard RGW credentials') + user = 'dashboard' + realms = [] + access_key = '' + secret_key = '' + try: + _, out, err = mgr.send_rgwadmin_command(['realm', 'list']) + if out: + realms = out.get('realms', []) + if err: + logger.error('Unable to list RGW realms: %s', err) + if realms: + realm_access_keys = {} + realm_secret_keys = {} + for realm in realms: + realm_access_key, realm_secret_key = _get_user_keys(user, realm) + if realm_access_key: + realm_access_keys[realm] = realm_access_key + realm_secret_keys[realm] = realm_secret_key + if realm_access_keys: + access_key = json.dumps(realm_access_keys) + secret_key = json.dumps(realm_secret_keys) + else: + access_key, secret_key = _get_user_keys(user) + + assert access_key and secret_key + Settings.RGW_API_ACCESS_KEY = access_key + Settings.RGW_API_SECRET_KEY = secret_key + except (AssertionError, SubprocessError) as error: + logger.exception(error) + raise NoCredentialsException + + +class RgwClient(RestClient): + _host = None + _port = None + _ssl = None + _user_instances = {} # type: Dict[str, Dict[str, RgwClient]] + _config_instances = {} # type: Dict[str, RgwClient] + _rgw_settings_snapshot = None + _daemons: Dict[str, RgwDaemon] = {} + daemon: RgwDaemon + got_keys_from_config: bool + userid: str + + @staticmethod + def _handle_response_status_code(status_code: int) -> int: + # Do not return auth error codes (so they are not handled as ceph API user auth errors). + return 404 if status_code in [401, 403] else status_code + + @staticmethod + def _get_daemon_connection_info(daemon_name: str) -> dict: + try: + realm_name = RgwClient._daemons[daemon_name].realm_name + access_key = Settings.RGW_API_ACCESS_KEY[realm_name] + secret_key = Settings.RGW_API_SECRET_KEY[realm_name] + except TypeError: + # Legacy string values. + access_key = Settings.RGW_API_ACCESS_KEY + secret_key = Settings.RGW_API_SECRET_KEY + except KeyError as error: + raise DashboardException(msg='Credentials not found for RGW Daemon: {}'.format(error), + http_status_code=404, + component='rgw') + + return {'access_key': access_key, 'secret_key': secret_key} + + def _get_daemon_zone_info(self): # type: () -> dict + return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None)) + + def _get_realms_info(self): # type: () -> dict + return json_str_to_object(self.proxy('GET', 'realm?list', None, None)) + + def _get_realm_info(self, realm_id: str) -> Dict[str, Any]: + return json_str_to_object(self.proxy('GET', f'realm?id={realm_id}', None, None)) + + @staticmethod + def _rgw_settings(): + return (Settings.RGW_API_ACCESS_KEY, + Settings.RGW_API_SECRET_KEY, + Settings.RGW_API_ADMIN_RESOURCE, + Settings.RGW_API_SSL_VERIFY) + + @staticmethod + def instance(userid: Optional[str] = None, + daemon_name: Optional[str] = None) -> 'RgwClient': + # pylint: disable=too-many-branches + + RgwClient._daemons = _get_daemons() + + # The API access key and secret key are mandatory for a minimal configuration. + if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY): + configure_rgw_credentials() + + if not daemon_name: + # Select 1st daemon: + daemon_name = next(iter(RgwClient._daemons.keys())) + + # Discard all cached instances if any rgw setting has changed + if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings(): + RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings() + RgwClient.drop_instance() + + if daemon_name not in RgwClient._config_instances: + connection_info = RgwClient._get_daemon_connection_info(daemon_name) + RgwClient._config_instances[daemon_name] = RgwClient(connection_info['access_key'], + connection_info['secret_key'], + daemon_name) + + if not userid or userid == RgwClient._config_instances[daemon_name].userid: + return RgwClient._config_instances[daemon_name] + + if daemon_name not in RgwClient._user_instances \ + or userid not in RgwClient._user_instances[daemon_name]: + # Get the access and secret keys for the specified user. + keys = RgwClient._config_instances[daemon_name].get_user_keys(userid) + if not keys: + raise RequestException( + "User '{}' does not have any keys configured.".format( + userid)) + instance = RgwClient(keys['access_key'], + keys['secret_key'], + daemon_name, + userid) + RgwClient._user_instances.update({daemon_name: {userid: instance}}) + + return RgwClient._user_instances[daemon_name][userid] + + @staticmethod + def admin_instance(daemon_name: Optional[str] = None) -> 'RgwClient': + return RgwClient.instance(daemon_name=daemon_name) + + @staticmethod + def drop_instance(instance: Optional['RgwClient'] = None): + """ + Drop a cached instance or all. + """ + if instance: + if instance.got_keys_from_config: + del RgwClient._config_instances[instance.daemon.name] + else: + del RgwClient._user_instances[instance.daemon.name][instance.userid] + else: + RgwClient._config_instances.clear() + RgwClient._user_instances.clear() + + def _reset_login(self): + if self.got_keys_from_config: + raise RequestException('Authentication failed for the "{}" user: wrong credentials' + .format(self.userid), status_code=401) + logger.info("Fetching new keys for user: %s", self.userid) + keys = RgwClient.admin_instance(daemon_name=self.daemon.name).get_user_keys(self.userid) + self.auth = S3Auth(keys['access_key'], keys['secret_key'], + service_url=self.service_url) + + def __init__(self, + access_key: str, + secret_key: str, + daemon_name: str, + user_id: Optional[str] = None) -> None: + try: + daemon = RgwClient._daemons[daemon_name] + except KeyError as error: + raise DashboardException(msg='RGW Daemon not found: {}'.format(error), + http_status_code=404, + component='rgw') + ssl_verify = Settings.RGW_API_SSL_VERIFY + self.admin_path = Settings.RGW_API_ADMIN_RESOURCE + self.service_url = build_url(host=daemon.host, port=daemon.port) + + self.auth = S3Auth(access_key, secret_key, service_url=self.service_url) + super(RgwClient, self).__init__(daemon.host, + daemon.port, + 'RGW', + daemon.ssl, + self.auth, + ssl_verify=ssl_verify) + self.got_keys_from_config = not user_id + try: + self.userid = self._get_user_id(self.admin_path) if self.got_keys_from_config \ + else user_id + except RequestException as error: + logger.exception(error) + msg = 'Error connecting to Object Gateway' + if error.status_code == 404: + msg = '{}: {}'.format(msg, str(error)) + raise DashboardException(msg=msg, + http_status_code=error.status_code, + component='rgw') + self.daemon = daemon + + logger.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d", + daemon.name, daemon.host, daemon.port, daemon.ssl, ssl_verify) + + @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)') + def is_service_online(self, request=None) -> bool: + """ + Consider the service as online if the response contains the + specified keys. Nothing more is checked here. + """ + _ = request({'format': 'json'}) + return True + + @RestClient.api_get('/{admin_path}/metadata/user?myself', + resp_structure='data > user_id') + def _get_user_id(self, admin_path, request=None): + # pylint: disable=unused-argument + """ + Get the user ID of the user that is used to communicate with the + RGW Admin Ops API. + :rtype: str + :return: The user ID of the user that is used to sign the + RGW Admin Ops API calls. + """ + response = request() + return response['data']['user_id'] + + @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]') + def _user_exists(self, admin_path, user_id, request=None): + # pylint: disable=unused-argument + response = request() + if user_id: + return user_id in response + return self.userid in response + + def user_exists(self, user_id=None): + return self._user_exists(self.admin_path, user_id) + + @RestClient.api_get('/{admin_path}/metadata/user?key={userid}', + resp_structure='data > system') + def _is_system_user(self, admin_path, userid, request=None) -> bool: + # pylint: disable=unused-argument + response = request() + return strtobool(response['data']['system']) + + def is_system_user(self) -> bool: + return self._is_system_user(self.admin_path, self.userid) + + @RestClient.api_get( + '/{admin_path}/user', + resp_structure='tenant & user_id & email & keys[*] > ' + ' (user & access_key & secret_key)') + def _admin_get_user_keys(self, admin_path, userid, request=None): + # pylint: disable=unused-argument + colon_idx = userid.find(':') + user = userid if colon_idx == -1 else userid[:colon_idx] + response = request({'uid': user}) + for key in response['keys']: + if key['user'] == userid: + return { + 'access_key': key['access_key'], + 'secret_key': key['secret_key'] + } + return None + + def get_user_keys(self, userid): + return self._admin_get_user_keys(self.admin_path, userid) + + @RestClient.api('/{admin_path}/{path}') + def _proxy_request( + self, # pylint: disable=too-many-arguments + admin_path, + path, + method, + params, + data, + request=None): + # pylint: disable=unused-argument + return request(method=method, params=params, data=data, + raw_content=True) + + def proxy(self, method, path, params, data): + logger.debug("proxying method=%s path=%s params=%s data=%s", + method, path, params, data) + return self._proxy_request(self.admin_path, path, method, + params, data) + + @RestClient.api_get('/', resp_structure='[1][*] > Name') + def get_buckets(self, request=None): + """ + Get a list of names from all existing buckets of this user. + :return: Returns a list of bucket names. + """ + response = request({'format': 'json'}) + return [bucket['Name'] for bucket in response[1]] + + @RestClient.api_get('/{bucket_name}') + def bucket_exists(self, bucket_name, userid, request=None): + """ + Check if the specified bucket exists for this user. + :param bucket_name: The name of the bucket. + :return: Returns True if the bucket exists, otherwise False. + """ + # pylint: disable=unused-argument + try: + request() + my_buckets = self.get_buckets() + if bucket_name not in my_buckets: + raise RequestException( + 'Bucket "{}" belongs to other user'.format(bucket_name), + 403) + return True + except RequestException as e: + if e.status_code == 404: + return False + + raise e + + @RestClient.api_put('/{bucket_name}') + def create_bucket(self, bucket_name, zonegroup=None, + placement_target=None, lock_enabled=False, + request=None): + logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s", + bucket_name, zonegroup, placement_target) + data = None + if zonegroup and placement_target: + create_bucket_configuration = ET.Element('CreateBucketConfiguration') + location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint') + location_constraint.text = '{}:{}'.format(zonegroup, placement_target) + data = ET.tostring(create_bucket_configuration, encoding='unicode') + + headers = None # type: Optional[dict] + if lock_enabled: + headers = {'x-amz-bucket-object-lock-enabled': 'true'} + + return request(data=data, headers=headers) + + def get_placement_targets(self): # type: () -> dict + zone = self._get_daemon_zone_info() + placement_targets = [] # type: List[Dict] + for placement_pool in zone['placement_pools']: + placement_targets.append( + { + 'name': placement_pool['key'], + 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool'] + } + ) + + return {'zonegroup': self.daemon.zonegroup_name, + 'placement_targets': placement_targets} + + def get_realms(self): # type: () -> List + realms_info = self._get_realms_info() + if 'realms' in realms_info and realms_info['realms']: + return realms_info['realms'] + + return [] + + def get_default_realm(self) -> str: + realms_info = self._get_realms_info() + if 'default_info' in realms_info and realms_info['default_info']: + realm_info = self._get_realm_info(realms_info['default_info']) + if 'name' in realm_info and realm_info['name']: + return realm_info['name'] + raise DashboardException(msg='Default realm not found.', + http_status_code=404, + component='rgw') + + @RestClient.api_get('/{bucket_name}?versioning') + def get_bucket_versioning(self, bucket_name, request=None): + """ + Get bucket versioning. + :param str bucket_name: the name of the bucket. + :return: versioning info + :rtype: Dict + """ + # pylint: disable=unused-argument + result = request() + if 'Status' not in result: + result['Status'] = 'Suspended' + if 'MfaDelete' not in result: + result['MfaDelete'] = 'Disabled' + return result + + @RestClient.api_put('/{bucket_name}?versioning') + def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete, + mfa_token_serial, mfa_token_pin, request=None): + """ + Set bucket versioning. + :param str bucket_name: the name of the bucket. + :param str versioning_state: + https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html + :param str mfa_delete: MFA Delete state. + :param str mfa_token_serial: + https://docs.ceph.com/docs/master/radosgw/mfa/ + :param str mfa_token_pin: value of a TOTP token at a certain time (auth code) + :return: None + """ + # pylint: disable=unused-argument + versioning_configuration = ET.Element('VersioningConfiguration') + status_element = ET.SubElement(versioning_configuration, 'Status') + status_element.text = versioning_state + + headers = {} + if mfa_delete and mfa_token_serial and mfa_token_pin: + headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin) + mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete') + mfa_delete_element.text = mfa_delete + + data = ET.tostring(versioning_configuration, encoding='unicode') + + try: + request(data=data, headers=headers) + except RequestException as error: + msg = str(error) + if mfa_delete and mfa_token_serial and mfa_token_pin \ + and 'AccessDenied' in error.content.decode(): + msg = 'Bad MFA credentials: {}'.format(msg) + raise DashboardException(msg=msg, + http_status_code=error.status_code, + component='rgw') + + @RestClient.api_get('/{bucket_name}?object-lock') + def get_bucket_locking(self, bucket_name, request=None): + # type: (str, Optional[object]) -> dict + """ + Gets the locking configuration for a bucket. The locking + configuration will be applied by default to every new object + placed in the specified bucket. + :param bucket_name: The name of the bucket. + :type bucket_name: str + :return: The locking configuration. + :rtype: Dict + """ + # pylint: disable=unused-argument + + # Try to get the Object Lock configuration. If there is none, + # then return default values. + try: + result = request() # type: ignore + return { + 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled', + 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'), + 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0), + 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0) + } + except RequestException as e: + if e.content: + content = json_str_to_object(e.content) + if content.get( + 'Code') == 'ObjectLockConfigurationNotFoundError': + return { + 'lock_enabled': False, + 'lock_mode': 'compliance', + 'lock_retention_period_days': None, + 'lock_retention_period_years': None + } + raise e + + @RestClient.api_put('/{bucket_name}?object-lock') + def set_bucket_locking(self, + bucket_name: str, + mode: str, + retention_period_days: Optional[Union[int, str]] = None, + retention_period_years: Optional[Union[int, str]] = None, + request: Optional[object] = None) -> None: + """ + Places the locking configuration on the specified bucket. The + locking configuration will be applied by default to every new + object placed in the specified bucket. + :param bucket_name: The name of the bucket. + :type bucket_name: str + :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`. + :type mode: str + :param retention_period_days: + :type retention_period_days: int + :param retention_period_years: + :type retention_period_years: int + :rtype: None + """ + # pylint: disable=unused-argument + + # Do some validations. + try: + retention_period_days = int(retention_period_days) if retention_period_days else 0 + retention_period_years = int(retention_period_years) if retention_period_years else 0 + if retention_period_days < 0 or retention_period_years < 0: + raise ValueError + except (TypeError, ValueError): + msg = "Retention period must be a positive integer." + raise DashboardException(msg=msg, component='rgw') + if retention_period_days and retention_period_years: + # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html + msg = "Retention period requires either Days or Years. "\ + "You can't specify both at the same time." + raise DashboardException(msg=msg, component='rgw') + if not retention_period_days and not retention_period_years: + msg = "Retention period requires either Days or Years. "\ + "You must specify at least one." + raise DashboardException(msg=msg, component='rgw') + if not isinstance(mode, str) or mode.upper() not in ['COMPLIANCE', 'GOVERNANCE']: + msg = "Retention mode must be either COMPLIANCE or GOVERNANCE." + raise DashboardException(msg=msg, component='rgw') + + # Generate the XML data like this: + # + # string + # + # + # integer + # string + # integer + # + # + # + locking_configuration = ET.Element('ObjectLockConfiguration') + enabled_element = ET.SubElement(locking_configuration, + 'ObjectLockEnabled') + enabled_element.text = 'Enabled' # Locking can't be disabled. + rule_element = ET.SubElement(locking_configuration, 'Rule') + default_retention_element = ET.SubElement(rule_element, + 'DefaultRetention') + mode_element = ET.SubElement(default_retention_element, 'Mode') + mode_element.text = mode.upper() + if retention_period_days: + days_element = ET.SubElement(default_retention_element, 'Days') + days_element.text = str(retention_period_days) + if retention_period_years: + years_element = ET.SubElement(default_retention_element, 'Years') + years_element.text = str(retention_period_years) + + data = ET.tostring(locking_configuration, encoding='unicode') + + try: + _ = request(data=data) # type: ignore + except RequestException as e: + raise DashboardException(msg=str(e), component='rgw') diff --git a/src/pybind/mgr/dashboard/services/sso.py b/src/pybind/mgr/dashboard/services/sso.py new file mode 100644 index 000000000..f2f2a6a7d --- /dev/null +++ b/src/pybind/mgr/dashboard/services/sso.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-return-statements,too-many-branches +from __future__ import absolute_import + +import errno +import json +import logging +import os +import threading +import warnings +from urllib import parse + +from .. import mgr +from ..tools import prepare_url_prefix + +logger = logging.getLogger('sso') + +try: + from onelogin.saml2.errors import OneLogin_Saml2_Error as Saml2Error + from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser as Saml2Parser + from onelogin.saml2.settings import OneLogin_Saml2_Settings as Saml2Settings + + python_saml_imported = True +except ImportError: + python_saml_imported = False + + +class Saml2(object): + def __init__(self, onelogin_settings): + self.onelogin_settings = onelogin_settings + + def get_username_attribute(self): + return self.onelogin_settings['sp']['attributeConsumingService']['requestedAttributes'][0][ + 'name'] + + def to_dict(self): + return { + 'onelogin_settings': self.onelogin_settings + } + + @classmethod + def from_dict(cls, s_dict): + return Saml2(s_dict['onelogin_settings']) + + +class SsoDB(object): + VERSION = 1 + SSODB_CONFIG_KEY = "ssodb_v" + + def __init__(self, version, protocol, saml2): + self.version = version + self.protocol = protocol + self.saml2 = saml2 + self.lock = threading.RLock() + + def save(self): + with self.lock: + db = { + 'protocol': self.protocol, + 'saml2': self.saml2.to_dict(), + 'version': self.version + } + mgr.set_store(self.ssodb_config_key(), json.dumps(db)) + + @classmethod + def ssodb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.SSODB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("Checking for previous DB versions") + if self.VERSION != 1: + raise NotImplementedError() + + @classmethod + def load(cls): + logger.info("Loading SSO DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.ssodb_config_key(), None) + if json_db is None: + logger.debug("No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, '', Saml2({})) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + dict_db = json.loads(json_db) # type: dict + return cls(dict_db['version'], dict_db.get('protocol'), + Saml2.from_dict(dict_db.get('saml2'))) + + +def load_sso_db(): + mgr.SSO_DB = SsoDB.load() + + +SSO_COMMANDS = [ + { + 'cmd': 'dashboard sso enable saml2', + 'desc': 'Enable SAML2 Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso disable', + 'desc': 'Disable Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso status', + 'desc': 'Get Single Sign-On status', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso show saml2', + 'desc': 'Show SAML2 configuration', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso setup saml2 ' + 'name=ceph_dashboard_base_url,type=CephString ' + 'name=idp_metadata,type=CephString ' + 'name=idp_username_attribute,type=CephString,req=false ' + 'name=idp_entity_id,type=CephString,req=false ' + 'name=sp_x_509_cert,type=CephFilepath,req=false ' + 'name=sp_private_key,type=CephFilepath,req=false', + 'desc': 'Setup SAML2 Single Sign-On', + 'perm': 'w' + } +] + + +def _get_optional_attr(cmd, attr, default): + if attr in cmd: + if cmd[attr] != '': + return cmd[attr] + return default + + +def handle_sso_command(cmd): + if cmd['prefix'] not in ['dashboard sso enable saml2', + 'dashboard sso disable', + 'dashboard sso status', + 'dashboard sso show saml2', + 'dashboard sso setup saml2']: + return -errno.ENOSYS, '', '' + + if cmd['prefix'] == 'dashboard sso disable': + mgr.SSO_DB.protocol = '' + mgr.SSO_DB.save() + return 0, 'SSO is "disabled".', '' + + if not python_saml_imported: + return -errno.EPERM, '', 'Required library not found: `python3-saml`' + + if cmd['prefix'] == 'dashboard sso enable saml2': + try: + Saml2Settings(mgr.SSO_DB.saml2.onelogin_settings) + except Saml2Error: + return -errno.EPERM, '', 'Single Sign-On is not configured: ' \ + 'use `ceph dashboard sso setup saml2`' + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + + if cmd['prefix'] == 'dashboard sso status': + if mgr.SSO_DB.protocol == 'saml2': + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + + return 0, 'SSO is "disabled".', '' + + if cmd['prefix'] == 'dashboard sso show saml2': + return 0, json.dumps(mgr.SSO_DB.saml2.to_dict()), '' + + if cmd['prefix'] == 'dashboard sso setup saml2': + ceph_dashboard_base_url = cmd['ceph_dashboard_base_url'] + idp_metadata = cmd['idp_metadata'] + idp_username_attribute = _get_optional_attr(cmd, 'idp_username_attribute', 'uid') + idp_entity_id = _get_optional_attr(cmd, 'idp_entity_id', None) + sp_x_509_cert_path = _get_optional_attr(cmd, 'sp_x_509_cert', '') + sp_private_key_path = _get_optional_attr(cmd, 'sp_private_key', '') + if sp_x_509_cert_path and not sp_private_key_path: + return -errno.EINVAL, '', 'Missing parameter `sp_private_key`.' + if not sp_x_509_cert_path and sp_private_key_path: + return -errno.EINVAL, '', 'Missing parameter `sp_x_509_cert`.' + has_sp_cert = sp_x_509_cert_path != "" and sp_private_key_path != "" + if has_sp_cert: + try: + with open(sp_x_509_cert_path, 'r', encoding='utf-8') as f: + sp_x_509_cert = f.read() + except FileNotFoundError: + return -errno.EINVAL, '', '`{}` not found.'.format(sp_x_509_cert_path) + try: + with open(sp_private_key_path, 'r', encoding='utf-8') as f: + sp_private_key = f.read() + except FileNotFoundError: + return -errno.EINVAL, '', '`{}` not found.'.format(sp_private_key_path) + else: + sp_x_509_cert = '' + sp_private_key = '' + + if os.path.isfile(idp_metadata): + warnings.warn( + "Please prepend 'file://' to indicate a local SAML2 IdP file", DeprecationWarning) + with open(idp_metadata, 'r', encoding='utf-8') as f: + idp_settings = Saml2Parser.parse(f.read(), entity_id=idp_entity_id) + elif parse.urlparse(idp_metadata)[0] in ('http', 'https', 'file'): + idp_settings = Saml2Parser.parse_remote( + url=idp_metadata, validate_cert=False, entity_id=idp_entity_id) + else: + idp_settings = Saml2Parser.parse(idp_metadata, entity_id=idp_entity_id) + + url_prefix = prepare_url_prefix(mgr.get_module_option('url_prefix', default='')) + settings = { + 'sp': { + 'entityId': '{}{}/auth/saml2/metadata'.format(ceph_dashboard_base_url, url_prefix), + 'assertionConsumerService': { + 'url': '{}{}/auth/saml2'.format(ceph_dashboard_base_url, url_prefix), + 'binding': "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" + }, + 'attributeConsumingService': { + 'serviceName': "Ceph Dashboard", + "serviceDescription": "Ceph Dashboard Service", + "requestedAttributes": [ + { + "name": idp_username_attribute, + "isRequired": True + } + ] + }, + 'singleLogoutService': { + 'url': '{}{}/auth/saml2/logout'.format(ceph_dashboard_base_url, url_prefix), + 'binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect' + }, + "x509cert": sp_x_509_cert, + "privateKey": sp_private_key + }, + 'security': { + "nameIdEncrypted": has_sp_cert, + "authnRequestsSigned": has_sp_cert, + "logoutRequestSigned": has_sp_cert, + "logoutResponseSigned": has_sp_cert, + "signMetadata": has_sp_cert, + "wantMessagesSigned": has_sp_cert, + "wantAssertionsSigned": has_sp_cert, + "wantAssertionsEncrypted": has_sp_cert, + "wantNameIdEncrypted": False, # Not all Identity Providers support this. + "metadataValidUntil": '', + "wantAttributeStatement": False + } + } + settings = Saml2Parser.merge_settings(settings, idp_settings) + mgr.SSO_DB.saml2.onelogin_settings = settings + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + return 0, json.dumps(mgr.SSO_DB.saml2.onelogin_settings), '' + + return -errno.ENOSYS, '', '' diff --git a/src/pybind/mgr/dashboard/services/tcmu_service.py b/src/pybind/mgr/dashboard/services/tcmu_service.py new file mode 100644 index 000000000..aceeaff3d --- /dev/null +++ b/src/pybind/mgr/dashboard/services/tcmu_service.py @@ -0,0 +1,108 @@ +from mgr_util import get_most_recent_rate + +from dashboard.services.ceph_service import CephService + +from .. import mgr + +try: + from typing import Dict +except ImportError: + pass # Just for type checking + + +SERVICE_TYPE = 'tcmu-runner' + + +class TcmuService(object): + # pylint: disable=too-many-nested-blocks + # pylint: disable=too-many-branches + @staticmethod + def get_iscsi_info(): + daemons = {} # type: Dict[str, dict] + images = {} # type: Dict[str, dict] + for service in CephService.get_service_list(SERVICE_TYPE): + metadata = service['metadata'] + if metadata is None: + continue + status = service['status'] + hostname = service['hostname'] + + daemon = daemons.get(hostname, None) + if daemon is None: + daemon = { + 'server_hostname': hostname, + 'version': metadata['ceph_version'], + 'optimized_paths': 0, + 'non_optimized_paths': 0 + } + daemons[hostname] = daemon + + service_id = service['id'] + device_id = service_id.split(':')[-1] + image = images.get(device_id) + if image is None: + image = { + 'device_id': device_id, + 'pool_name': metadata['pool_name'], + 'name': metadata['image_name'], + 'id': metadata.get('image_id', None), + 'optimized_paths': [], + 'non_optimized_paths': [] + } + images[device_id] = image + + if status.get('lock_owner', 'false') == 'true': + daemon['optimized_paths'] += 1 + image['optimized_paths'].append(hostname) + + perf_key_prefix = "librbd-{id}-{pool}-{name}.".format( + id=metadata.get('image_id', ''), + pool=metadata['pool_name'], + name=metadata['image_name']) + perf_key = "{}lock_acquired_time".format(perf_key_prefix) + perf_value = mgr.get_counter('tcmu-runner', + service_id, + perf_key)[perf_key] + if perf_value: + lock_acquired_time = perf_value[-1][1] / 1000000000 + else: + lock_acquired_time = 0 + if lock_acquired_time > image.get('optimized_since', 0): + image['optimized_daemon'] = hostname + image['optimized_since'] = lock_acquired_time + image['stats'] = {} + image['stats_history'] = {} + for s in ['rd', 'wr', 'rd_bytes', 'wr_bytes']: + perf_key = "{}{}".format(perf_key_prefix, s) + rates = CephService.get_rates('tcmu-runner', service_id, perf_key) + image['stats'][s] = get_most_recent_rate(rates) + image['stats_history'][s] = rates + else: + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(hostname) + + # clear up races w/ tcmu-runner clients that haven't detected + # loss of optimized path + for image in images.values(): + optimized_daemon = image.get('optimized_daemon', None) + if optimized_daemon: + for daemon_name in image['optimized_paths']: + if daemon_name != optimized_daemon: + daemon = daemons[daemon_name] + daemon['optimized_paths'] -= 1 + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(daemon_name) + image['optimized_paths'] = [optimized_daemon] + + return { + 'daemons': sorted(daemons.values(), + key=lambda d: d['server_hostname']), + 'images': sorted(images.values(), key=lambda i: ['id']), + } + + @staticmethod + def get_image_info(pool_name, image_name, get_iscsi_info): + for image in get_iscsi_info['images']: + if image['pool_name'] == pool_name and image['name'] == image_name: + return image + return None -- cgit v1.2.3