diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/dashboard/services | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/dashboard/services')
-rw-r--r-- | src/pybind/mgr/dashboard/services/__init__.py | 2 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/access_control.py | 670 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/auth.py | 207 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/ceph_service.py | 254 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/cephfs.py | 77 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/cephx.py | 29 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/exception.py | 122 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/ganesha.py | 998 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/iscsi_cli.py | 50 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/iscsi_client.py | 257 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/iscsi_config.py | 135 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/orchestrator.py | 38 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/rbd.py | 177 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/rgw_client.py | 437 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/sso.py | 268 | ||||
-rw-r--r-- | src/pybind/mgr/dashboard/services/tcmu_service.py | 96 |
16 files changed, 3817 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/services/__init__.py b/src/pybind/mgr/dashboard/services/__init__.py new file mode 100644 index 00000000..139759b6 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import diff --git a/src/pybind/mgr/dashboard/services/access_control.py b/src/pybind/mgr/dashboard/services/access_control.py new file mode 100644 index 00000000..9ea7a01c --- /dev/null +++ b/src/pybind/mgr/dashboard/services/access_control.py @@ -0,0 +1,670 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-arguments,too-many-return-statements +# pylint: disable=too-many-branches, too-many-locals, too-many-statements +from __future__ import absolute_import + +import errno +import json +import threading +import time +import six + +import bcrypt + +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand + +from .. import mgr, logger +from ..security import Scope, Permission +from ..exceptions import RoleAlreadyExists, RoleDoesNotExist, ScopeNotValid, \ + PermissionNotValid, RoleIsAssociatedWithUser, \ + UserAlreadyExists, UserDoesNotExist, ScopeNotInRole, \ + RoleNotInUser + + +# password hashing algorithm +def password_hash(password, salt_password=None): + if not password: + return None + if six.PY2: + password = unicode(password, 'utf-8') if isinstance(password, str) else password + if not salt_password: + salt_password = bcrypt.gensalt() + else: + salt_password = salt_password.encode('utf8') + return bcrypt.hashpw(password.encode('utf8'), salt_password).decode('utf8') + + +_P = Permission # short alias + + +class Role(object): + def __init__(self, name, description=None, scope_permissions=None): + self.name = name + self.description = description + if scope_permissions is None: + self.scopes_permissions = {} + else: + self.scopes_permissions = scope_permissions + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return self.name == other.name + + def set_scope_permissions(self, scope, permissions): + if not Scope.valid_scope(scope): + raise ScopeNotValid(scope) + for perm in permissions: + if not Permission.valid_permission(perm): + raise PermissionNotValid(perm) + + permissions.sort() + self.scopes_permissions[scope] = permissions + + def del_scope_permissions(self, scope): + if scope not in self.scopes_permissions: + raise ScopeNotInRole(scope, self.name) + del self.scopes_permissions[scope] + + def reset_scope_permissions(self): + self.scopes_permissions = {} + + def authorize(self, scope, permissions): + if scope in self.scopes_permissions: + role_perms = self.scopes_permissions[scope] + for perm in permissions: + if perm not in role_perms: + return False + return True + return False + + def to_dict(self): + return { + 'name': self.name, + 'description': self.description, + 'scopes_permissions': self.scopes_permissions + } + + @classmethod + def from_dict(cls, r_dict): + return Role(r_dict['name'], r_dict['description'], + r_dict['scopes_permissions']) + + +# static pre-defined system roles +# this roles cannot be deleted nor updated + +# admin role provides all permissions for all scopes +ADMIN_ROLE = Role('administrator', 'Administrator', { + scope_name: Permission.all_permissions() + for scope_name in Scope.all_scopes() +}) + + +# read-only role provides read-only permission for all scopes +READ_ONLY_ROLE = Role('read-only', 'Read-Only', { + scope_name: [_P.READ] for scope_name in Scope.all_scopes() + if scope_name != Scope.DASHBOARD_SETTINGS +}) + + +# block manager role provides all permission for block related scopes +BLOCK_MGR_ROLE = Role('block-manager', 'Block Manager', { + Scope.RBD_IMAGE: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.POOL: [_P.READ], + Scope.ISCSI: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RBD_MIRRORING: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + + +# RadosGW manager role provides all permissions for block related scopes +RGW_MGR_ROLE = Role('rgw-manager', 'RGW Manager', { + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + + +# Cluster manager role provides all permission for OSDs, Monitors, and +# Config options +CLUSTER_MGR_ROLE = Role('cluster-manager', 'Cluster Manager', { + Scope.HOSTS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.OSD: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MONITOR: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.MANAGER: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CONFIG_OPT: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.LOG: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + + +# Pool manager role provides all permissions for pool related scopes +POOL_MGR_ROLE = Role('pool-manager', 'Pool Manager', { + Scope.POOL: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + +# CephFS manager role provides all permissions for CephFS related scopes +CEPHFS_MGR_ROLE = Role('cephfs-manager', 'CephFS Manager', { + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + +GANESHA_MGR_ROLE = Role('ganesha-manager', 'NFS Ganesha Manager', { + Scope.NFS_GANESHA: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE], + Scope.GRAFANA: [_P.READ], +}) + + +SYSTEM_ROLES = { + ADMIN_ROLE.name: ADMIN_ROLE, + READ_ONLY_ROLE.name: READ_ONLY_ROLE, + BLOCK_MGR_ROLE.name: BLOCK_MGR_ROLE, + RGW_MGR_ROLE.name: RGW_MGR_ROLE, + CLUSTER_MGR_ROLE.name: CLUSTER_MGR_ROLE, + POOL_MGR_ROLE.name: POOL_MGR_ROLE, + CEPHFS_MGR_ROLE.name: CEPHFS_MGR_ROLE, + GANESHA_MGR_ROLE.name: GANESHA_MGR_ROLE, +} + + +class User(object): + def __init__(self, username, password, name=None, email=None, roles=None, + lastUpdate=None): + self.username = username + self.password = password + self.name = name + self.email = email + if roles is None: + self.roles = set() + else: + self.roles = roles + if lastUpdate is None: + self.refreshLastUpdate() + else: + self.lastUpdate = lastUpdate + + def refreshLastUpdate(self): + self.lastUpdate = int(time.time()) + + def set_password(self, password): + self.password = password_hash(password) + self.refreshLastUpdate() + + def set_roles(self, roles): + self.roles = set(roles) + self.refreshLastUpdate() + + def add_roles(self, roles): + self.roles = self.roles.union(set(roles)) + self.refreshLastUpdate() + + def del_roles(self, roles): + for role in roles: + if role not in self.roles: + raise RoleNotInUser(role.name, self.username) + self.roles.difference_update(set(roles)) + self.refreshLastUpdate() + + def authorize(self, scope, permissions): + for role in self.roles: + if role.authorize(scope, permissions): + return True + return False + + def permissions_dict(self): + perms = {} + for role in self.roles: + for scope, perms_list in role.scopes_permissions.items(): + if scope in perms: + perms_tmp = set(perms[scope]).union(set(perms_list)) + perms[scope] = list(perms_tmp) + else: + perms[scope] = perms_list + + return perms + + def to_dict(self): + return { + 'username': self.username, + 'password': self.password, + 'roles': sorted([r.name for r in self.roles]), + 'name': self.name, + 'email': self.email, + 'lastUpdate': self.lastUpdate + } + + @classmethod + def from_dict(cls, u_dict, roles): + return User(u_dict['username'], u_dict['password'], u_dict['name'], + u_dict['email'], {roles[r] for r in u_dict['roles']}, + u_dict['lastUpdate']) + + +class AccessControlDB(object): + VERSION = 1 + ACDB_CONFIG_KEY = "accessdb_v" + + def __init__(self, version, users, roles): + self.users = users + self.version = version + self.roles = roles + self.lock = threading.RLock() + + def create_role(self, name, description=None): + with self.lock: + if name in SYSTEM_ROLES or name in self.roles: + raise RoleAlreadyExists(name) + role = Role(name, description) + self.roles[name] = role + return role + + def get_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + return self.roles[name] + + def delete_role(self, name): + with self.lock: + if name not in self.roles: + raise RoleDoesNotExist(name) + role = self.roles[name] + + # check if role is not associated with a user + for username, user in self.users.items(): + if role in user.roles: + raise RoleIsAssociatedWithUser(name, username) + + del self.roles[name] + + def create_user(self, username, password, name, email): + logger.debug("AC: creating user: username=%s", username) + with self.lock: + if username in self.users: + raise UserAlreadyExists(username) + user = User(username, password_hash(password), name, email) + self.users[username] = user + return user + + def get_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + return self.users[username] + + def delete_user(self, username): + with self.lock: + if username not in self.users: + raise UserDoesNotExist(username) + del self.users[username] + + def update_users_with_roles(self, role): + with self.lock: + if not role: + return + for _, user in self.users.items(): + if role in user.roles: + user.refreshLastUpdate() + + def save(self): + with self.lock: + db = { + 'users': {un: u.to_dict() for un, u in self.users.items()}, + 'roles': {rn: r.to_dict() for rn, r in self.roles.items()}, + 'version': self.version + } + mgr.set_store(self.accessdb_config_key(), json.dumps(db)) + + @classmethod + def accessdb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.ACDB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("AC: Checking for previews DB versions") + if self.VERSION == 1: # current version + # check if there is username/password from previous version + username = mgr.get_module_option('username', None) + password = mgr.get_module_option('password', None) + if username and password: + logger.debug("AC: Found single user credentials: user=%s", + username) + # found user credentials + user = self.create_user(username, "", None, None) + # password is already hashed, so setting manually + user.password = password + user.add_roles([ADMIN_ROLE]) + self.save() + else: + raise NotImplementedError() + + @classmethod + def load(cls): + logger.info("AC: Loading user roles DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.accessdb_config_key()) + if json_db is None: + logger.debug("AC: No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, {}, {}) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + db = json.loads(json_db) + roles = {rn: Role.from_dict(r) + for rn, r in db.get('roles', {}).items()} + users = {un: User.from_dict(u, dict(roles, **SYSTEM_ROLES)) + for un, u in db.get('users', {}).items()} + return cls(db['version'], users, roles) + + +def load_access_control_db(): + mgr.ACCESS_CTRL_DB = AccessControlDB.load() + + +# CLI dashboard access control scope commands + +@CLIWriteCommand('dashboard set-login-credentials', + 'name=username,type=CephString', + 'Set the login credentials. Password read from -i <file>') +@CLICheckNonemptyFileInput +def set_login_credentials_cmd(_, username, inbuf): + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password(password) + except UserDoesNotExist: + user = mgr.ACCESS_CTRL_DB.create_user(username, password, None, None) + user.set_roles([ADMIN_ROLE]) + + mgr.ACCESS_CTRL_DB.save() + + return 0, '''\ +****************************************************************** +*** WARNING: this command is deprecated. *** +*** Please use the ac-user-* related commands to manage users. *** +****************************************************************** +Username and password updated''', '' + + +@CLIReadCommand('dashboard ac-role-show', + 'name=rolename,type=CephString,req=false', + 'Show role info') +def ac_role_show_cmd(_, rolename=None): + if not rolename: + roles = dict(mgr.ACCESS_CTRL_DB.roles) + roles.update(SYSTEM_ROLES) + roles_list = [name for name, _ in roles.items()] + return 0, json.dumps(roles_list), '' + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + return 0, json.dumps(role.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-role-create', + 'name=rolename,type=CephString ' + 'name=description,type=CephString,req=false', + 'Create a new access control role') +def ac_role_create_cmd(_, rolename, description=None): + try: + role = mgr.ACCESS_CTRL_DB.create_role(rolename, description) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-delete', + 'name=rolename,type=CephString', + 'Delete an access control role') +def ac_role_delete_cmd(_, rolename): + try: + mgr.ACCESS_CTRL_DB.delete_role(rolename) + mgr.ACCESS_CTRL_DB.save() + return 0, "Role '{}' deleted".format(rolename), "" + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot delete system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except RoleIsAssociatedWithUser as ex: + return -errno.EPERM, '', str(ex) + + +@CLIWriteCommand('dashboard ac-role-add-scope-perms', + 'name=rolename,type=CephString ' + 'name=scopename,type=CephString ' + 'name=permissions,type=CephString,n=N', + 'Add the scope permissions for a role') +def ac_role_add_scope_perms_cmd(_, rolename, scopename, permissions): + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + perms_array = [perm.strip() for perm in permissions] + role.set_scope_permissions(scopename, perms_array) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotValid as ex: + return -errno.EINVAL, '', str(ex) + "\n Possible values: {}" \ + .format(Scope.all_scopes()) + except PermissionNotValid as ex: + return -errno.EINVAL, '', str(ex) + \ + "\n Possible values: {}" \ + .format(Permission.all_permissions()) + + +@CLIWriteCommand('dashboard ac-role-del-scope-perms', + 'name=rolename,type=CephString ' + 'name=scopename,type=CephString', + 'Delete the scope permissions for a role') +def ac_role_del_scope_perms_cmd(_, rolename, scopename): + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) + role.del_scope_permissions(scopename) + mgr.ACCESS_CTRL_DB.update_users_with_roles(role) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(role.to_dict()), '' + except RoleDoesNotExist as ex: + if rolename in SYSTEM_ROLES: + return -errno.EPERM, '', "Cannot update system role '{}'" \ + .format(rolename) + return -errno.ENOENT, '', str(ex) + except ScopeNotInRole as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIReadCommand('dashboard ac-user-show', + 'name=username,type=CephString,req=false', + 'Show user info') +def ac_user_show_cmd(_, username=None): + if not username: + users = mgr.ACCESS_CTRL_DB.users + users_list = [name for name, _ in users.items()] + return 0, json.dumps(users_list), '' + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-create', + 'name=username,type=CephString ' + 'name=rolename,type=CephString,req=false ' + 'name=name,type=CephString,req=false ' + 'name=email,type=CephString,req=false', + 'Create a user. Password read from -i <file>') +@CLICheckNonemptyFileInput +def ac_user_create_cmd(_, username, inbuf, rolename=None, name=None, + email=None): + password = inbuf + try: + role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + role = SYSTEM_ROLES[rolename] + + try: + user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email) + except UserAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + + if role: + user.set_roles([role]) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + + +@CLIWriteCommand('dashboard ac-user-delete', + 'name=username,type=CephString', + 'Delete user') +def ac_user_delete_cmd(_, username): + try: + mgr.ACCESS_CTRL_DB.delete_user(username) + mgr.ACCESS_CTRL_DB.save() + return 0, "User '{}' deleted".format(username), "" + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-roles', + 'name=username,type=CephString ' + 'name=roles,type=CephString,n=N', + 'Set user roles') +def ac_user_set_roles_cmd(_, username, roles): + rolesname = roles + roles = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-add-roles', + 'name=username,type=CephString ' + 'name=roles,type=CephString,n=N', + 'Add roles to user') +def ac_user_add_roles_cmd(_, username, roles): + rolesname = roles + roles = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.add_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-del-roles', + 'name=username,type=CephString ' + 'name=roles,type=CephString,n=N', + 'Delete roles from user') +def ac_user_del_roles_cmd(_, username, roles): + rolesname = roles + roles = [] + for rolename in rolesname: + try: + roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename)) + except RoleDoesNotExist as ex: + if rolename not in SYSTEM_ROLES: + return -errno.ENOENT, '', str(ex) + roles.append(SYSTEM_ROLES[rolename]) + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.del_roles(roles) + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except RoleNotInUser as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-password', + 'name=username,type=CephString', + 'Set user password from -i <file>') +@CLICheckNonemptyFileInput +def ac_user_set_password(_, username, inbuf): + password = inbuf + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + user.set_password(password) + + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +@CLIWriteCommand('dashboard ac-user-set-info', + 'name=username,type=CephString ' + 'name=name,type=CephString ' + 'name=email,type=CephString', + 'Set user info') +def ac_user_set_info(_, username, name, email): + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if name: + user.name = name + if email: + user.email = email + mgr.ACCESS_CTRL_DB.save() + return 0, json.dumps(user.to_dict()), '' + except UserDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + + +class LocalAuthenticator(object): + def __init__(self): + load_access_control_db() + + def get_user(self, username): + return mgr.ACCESS_CTRL_DB.get_user(username) + + def authenticate(self, username, password): + try: + user = mgr.ACCESS_CTRL_DB.get_user(username) + if user.password: + pass_hash = password_hash(password, user.password) + if pass_hash == user.password: + return user.permissions_dict() + except UserDoesNotExist: + logger.debug("User '%s' does not exist", username) + return None + + def authorize(self, username, scope, permissions): + user = mgr.ACCESS_CTRL_DB.get_user(username) + return user.authorize(scope, permissions) diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py new file mode 100644 index 00000000..239efae8 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/auth.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from base64 import b64encode +import json +import os +import threading +import time +import uuid + +import cherrypy +import jwt + +from .access_control import LocalAuthenticator, UserDoesNotExist +from .. import mgr, logger + +cherrypy.config.update({ + 'response.headers.server': 'Ceph-Dashboard', + 'response.headers.content-security-policy': "frame-ancestors 'self';", + 'response.headers.x-content-type-options': 'nosniff', + 'response.headers.strict-transport-security': 'max-age=63072000; includeSubDomains; preload' +}) + + +class JwtManager(object): + JWT_TOKEN_BLACKLIST_KEY = "jwt_token_black_list" + JWT_TOKEN_TTL = 28800 # default 8 hours + JWT_ALGORITHM = 'HS256' + _secret = None + + LOCAL_USER = threading.local() + + @staticmethod + def _gen_secret(): + secret = os.urandom(16) + return b64encode(secret).decode('utf-8') + + @classmethod + def init(cls): + # generate a new secret if it does not exist + secret = mgr.get_store('jwt_secret') + if secret is None: + secret = cls._gen_secret() + mgr.set_store('jwt_secret', secret) + cls._secret = secret + + @classmethod + def gen_token(cls, username): + if not cls._secret: + cls.init() + ttl = mgr.get_module_option('jwt_token_ttl', cls.JWT_TOKEN_TTL) + ttl = int(ttl) + now = int(time.time()) + payload = { + 'iss': 'ceph-dashboard', + 'jti': str(uuid.uuid4()), + 'exp': now + ttl, + 'iat': now, + 'username': username + } + return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) + + @classmethod + def decode_token(cls, token): + if not cls._secret: + cls.init() + return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) + + @classmethod + def get_token_from_header(cls): + auth_cookie_name = 'token' + try: + # use cookie + return cherrypy.request.cookie[auth_cookie_name].value + except KeyError: + try: + # fall-back: use Authorization header + auth_header = cherrypy.request.headers.get('authorization') + if auth_header is not None: + scheme, params = auth_header.split(' ', 1) + if scheme.lower() == 'bearer': + return params + except IndexError: + return None + + @classmethod + def set_user(cls, token): + cls.LOCAL_USER.username = token['username'] + + @classmethod + def reset_user(cls): + cls.set_user({'username': None, 'permissions': None}) + + @classmethod + def get_username(cls): + return getattr(cls.LOCAL_USER, 'username', None) + + @classmethod + def blacklist_token(cls, token): + token = cls.decode_token(token) + blacklist_json = mgr.get_store(cls.JWT_TOKEN_BLACKLIST_KEY) + if not blacklist_json: + blacklist_json = "{}" + bl_dict = json.loads(blacklist_json) + now = time.time() + + # remove expired tokens + to_delete = [] + for jti, exp in bl_dict.items(): + if exp < now: + to_delete.append(jti) + for jti in to_delete: + del bl_dict[jti] + + bl_dict[token['jti']] = token['exp'] + mgr.set_store(cls.JWT_TOKEN_BLACKLIST_KEY, json.dumps(bl_dict)) + + @classmethod + def is_blacklisted(cls, jti): + blacklist_json = mgr.get_store(cls.JWT_TOKEN_BLACKLIST_KEY) + if not blacklist_json: + blacklist_json = "{}" + bl_dict = json.loads(blacklist_json) + return jti in bl_dict + + +class AuthManager(object): + AUTH_PROVIDER = None + + @classmethod + def initialize(cls): + cls.AUTH_PROVIDER = LocalAuthenticator() + + @classmethod + def get_user(cls, username): + return cls.AUTH_PROVIDER.get_user(username) + + @classmethod + def authenticate(cls, username, password): + return cls.AUTH_PROVIDER.authenticate(username, password) + + @classmethod + def authorize(cls, username, scope, permissions): + return cls.AUTH_PROVIDER.authorize(username, scope, permissions) + + +class AuthManagerTool(cherrypy.Tool): + def __init__(self): + super(AuthManagerTool, self).__init__( + 'before_handler', self._check_authentication, priority=20) + + def _check_authentication(self): + JwtManager.reset_user() + token = JwtManager.get_token_from_header() + logger.debug("AMT: token: %s", token) + if token: + try: + token = JwtManager.decode_token(token) + if not JwtManager.is_blacklisted(token['jti']): + user = AuthManager.get_user(token['username']) + if user.lastUpdate <= token['iat']: + self._check_authorization(token) + return + + logger.debug("AMT: user info changed after token was" + " issued, iat=%s lastUpdate=%s", + token['iat'], user.lastUpdate) + else: + logger.debug('AMT: Token is black-listed') + except jwt.exceptions.ExpiredSignatureError: + logger.debug("AMT: Token has expired") + except jwt.exceptions.InvalidTokenError: + logger.debug("AMT: Failed to decode token") + except UserDoesNotExist: + logger.debug("AMT: Invalid token: user %s does not exist", + token['username']) + + logger.debug('AMT: Unauthorized access to %s', + cherrypy.url(relative='server')) + raise cherrypy.HTTPError(401, 'You are not authorized to access ' + 'that resource') + + def _check_authorization(self, token): + logger.debug("AMT: checking authorization...") + username = token['username'] + handler = cherrypy.request.handler.callable + controller = handler.__self__ + sec_scope = getattr(controller, '_security_scope', None) + sec_perms = getattr(handler, '_security_permissions', None) + JwtManager.set_user(token) + + if not sec_scope: + # controller does not define any authorization restrictions + return + + logger.debug("AMT: checking '%s' access to '%s' scope", sec_perms, + sec_scope) + + if not sec_perms: + logger.debug("Fail to check permission on: %s:%s", controller, + handler) + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") + + if not AuthManager.authorize(username, sec_scope, sec_perms): + raise cherrypy.HTTPError(403, "You don't have permissions to " + "access that resource") diff --git a/src/pybind/mgr/dashboard/services/ceph_service.py b/src/pybind/mgr/dashboard/services/ceph_service.py new file mode 100644 index 00000000..6d38fa41 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/ceph_service.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json + +import rados + +from mgr_module import CommandResult +from mgr_util import get_time_series_rates, get_most_recent_rate + +from .. import logger, mgr + +try: + from typing import Dict, Any, Union # pylint: disable=unused-import +except ImportError: + pass # For typing only + + +class SendCommandError(rados.Error): + def __init__(self, err, prefix, argdict, errno): + self.prefix = prefix + self.argdict = argdict + super(SendCommandError, self).__init__(err, errno) + + +class CephService(object): + + OSD_FLAG_NO_SCRUB = 'noscrub' + OSD_FLAG_NO_DEEP_SCRUB = 'nodeep-scrub' + + PG_STATUS_SCRUBBING = 'scrubbing' + PG_STATUS_DEEP_SCRUBBING = 'deep' + + SCRUB_STATUS_DISABLED = 'Disabled' + SCRUB_STATUS_ACTIVE = 'Active' + SCRUB_STATUS_INACTIVE = 'Inactive' + + @classmethod + def get_service_map(cls, service_name): + service_map = {} # type: Dict[str, Dict[str, Any]] + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_name: + if server['hostname'] not in service_map: + service_map[server['hostname']] = { + 'server': server, + 'services': [] + } + inst_id = service['id'] + metadata = mgr.get_metadata(service_name, inst_id) + status = mgr.get_daemon_status(service_name, inst_id) + service_map[server['hostname']]['services'].append({ + 'id': inst_id, + 'type': service_name, + 'hostname': server['hostname'], + 'metadata': metadata, + 'status': status + }) + return service_map + + @classmethod + def get_service_list(cls, service_name): + service_map = cls.get_service_map(service_name) + return [svc for _, svcs in service_map.items() for svc in svcs['services']] + + @classmethod + def get_service(cls, service_name, service_id): + for server in mgr.list_servers(): + for service in server['services']: + if service['type'] == service_name: + inst_id = service['id'] + if inst_id == service_id: + metadata = mgr.get_metadata(service_name, inst_id) + status = mgr.get_daemon_status(service_name, inst_id) + return { + 'id': inst_id, + 'type': service_name, + 'hostname': server['hostname'], + 'metadata': metadata, + 'status': status + } + return None + + @classmethod + def get_pool_list(cls, application=None): + osd_map = mgr.get('osd_map') + if not application: + return osd_map['pools'] + return [pool for pool in osd_map['pools'] + if application in pool.get('application_metadata', {})] + + @classmethod + def get_pool_list_with_stats(cls, application=None): + # pylint: disable=too-many-locals + pools = cls.get_pool_list(application) + + pools_w_stats = [] + + pg_summary = mgr.get("pg_summary") + pool_stats = mgr.get_updated_pool_stats() + + for pool in pools: + pool['pg_status'] = pg_summary['by_pool'][pool['pool'].__str__()] + stats = pool_stats[pool['pool']] + s = {} + + for stat_name, stat_series in stats.items(): + rates = get_time_series_rates(stat_series) + s[stat_name] = { + 'latest': stat_series[0][1], + 'rate': get_most_recent_rate(rates), + 'rates': rates + } + pool['stats'] = s + pools_w_stats.append(pool) + return pools_w_stats + + @classmethod + def get_pool_name_from_id(cls, pool_id): + # type: (int) -> Union[str, None] + return mgr.rados.pool_reverse_lookup(pool_id) + + @classmethod + def get_pool_by_attribute(cls, attribute, value): + # type: (str, Any) -> Union[dict, None] + pool_list = cls.get_pool_list() + for pool in pool_list: + if attribute in pool and pool[attribute] == value: + return pool + return None + + @classmethod + def get_pool_pg_status(cls, pool_name): + # type: (str) -> dict + pool = cls.get_pool_by_attribute('pool_name', pool_name) + if pool is None: + return {} + return mgr.get("pg_summary")['by_pool'][pool['pool'].__str__()] + + @classmethod + def send_command(cls, srv_type, prefix, srv_spec='', **kwargs): + """ + :type prefix: str + :param srv_type: mon | + :param kwargs: will be added to argdict + :param srv_spec: typically empty. or something like "<fs_id>:0" + + :raises PermissionError: See rados.make_ex + :raises ObjectNotFound: See rados.make_ex + :raises IOError: See rados.make_ex + :raises NoSpace: See rados.make_ex + :raises ObjectExists: See rados.make_ex + :raises ObjectBusy: See rados.make_ex + :raises NoData: See rados.make_ex + :raises InterruptedOrTimeoutError: See rados.make_ex + :raises TimedOut: See rados.make_ex + :raises ValueError: return code != 0 + """ + argdict = { + "prefix": prefix, + "format": "json", + } + argdict.update({k: v for k, v in kwargs.items() if v is not None}) + result = CommandResult("") + mgr.send_command(result, srv_type, srv_spec, json.dumps(argdict), "") + r, outb, outs = result.wait() + if r != 0: + msg = "send_command '{}' failed. (r={}, outs=\"{}\", kwargs={})".format(prefix, r, outs, + kwargs) + logger.error(msg) + raise SendCommandError(outs, prefix, argdict, r) + else: + try: + return json.loads(outb) + except Exception: # pylint: disable=broad-except + return outb + + @classmethod + def get_rates(cls, svc_type, svc_name, path): + """ + :return: the derivative of mgr.get_counter() + :rtype: list[tuple[int, float]]""" + data = mgr.get_counter(svc_type, svc_name, path)[path] + return get_time_series_rates(data) + + @classmethod + def get_rate(cls, svc_type, svc_name, path): + """returns most recent rate""" + return get_most_recent_rate(cls.get_rates(svc_type, svc_name, path)) + + @classmethod + def get_client_perf(cls): + pools_stats = mgr.get('osd_pool_stats')['pool_stats'] + + io_stats = { + 'read_bytes_sec': 0, + 'read_op_per_sec': 0, + 'write_bytes_sec': 0, + 'write_op_per_sec': 0, + } + recovery_stats = {'recovering_bytes_per_sec': 0} + + for pool_stats in pools_stats: + client_io = pool_stats['client_io_rate'] + for stat in list(io_stats.keys()): + if stat in client_io: + io_stats[stat] += client_io[stat] + + client_recovery = pool_stats['recovery_rate'] + for stat in list(recovery_stats.keys()): + if stat in client_recovery: + recovery_stats[stat] += client_recovery[stat] + + client_perf = io_stats.copy() + client_perf.update(recovery_stats) + + return client_perf + + @classmethod + def get_scrub_status(cls): + enabled_flags = mgr.get('osd_map')['flags_set'] + if cls.OSD_FLAG_NO_SCRUB in enabled_flags or cls.OSD_FLAG_NO_DEEP_SCRUB in enabled_flags: + return cls.SCRUB_STATUS_DISABLED + + grouped_pg_statuses = mgr.get('pg_summary')['all'] + for grouped_pg_status in grouped_pg_statuses.keys(): + if len(grouped_pg_status.split(cls.PG_STATUS_SCRUBBING)) > 1 \ + or len(grouped_pg_status.split(cls.PG_STATUS_DEEP_SCRUBBING)) > 1: + return cls.SCRUB_STATUS_ACTIVE + + return cls.SCRUB_STATUS_INACTIVE + + @classmethod + def get_pg_info(cls): + pg_summary = mgr.get('pg_summary') + object_stats = {stat: pg_summary['pg_stats_sum']['stat_sum'][stat] for stat in [ + 'num_objects', 'num_object_copies', 'num_objects_degraded', + 'num_objects_misplaced', 'num_objects_unfound']} + + pgs_per_osd = 0.0 + total_osds = len(pg_summary['by_osd']) + if total_osds > 0: + total_pgs = 0.0 + for _, osd_pg_statuses in pg_summary['by_osd'].items(): + for _, pg_amount in osd_pg_statuses.items(): + total_pgs += pg_amount + + pgs_per_osd = total_pgs / total_osds + + return { + 'object_stats': object_stats, + 'statuses': pg_summary['all'], + 'pgs_per_osd': pgs_per_osd, + } diff --git a/src/pybind/mgr/dashboard/services/cephfs.py b/src/pybind/mgr/dashboard/services/cephfs.py new file mode 100644 index 00000000..bb75b4e2 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cephfs.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from contextlib import contextmanager + +import cephfs + +from .. import mgr, logger + + +class CephFS(object): + @classmethod + def list_filesystems(cls): + fsmap = mgr.get("fs_map") + return [{'id': fs['id'], 'name': fs['mdsmap']['fs_name']} + for fs in fsmap['filesystems']] + + def __init__(self, fs_name=None): + logger.debug("[CephFS] initializing cephfs connection") + self.cfs = cephfs.LibCephFS(rados_inst=mgr.rados) + logger.debug("[CephFS] mounting cephfs filesystem: %s", fs_name) + if fs_name: + self.cfs.mount(filesystem_name=fs_name) + else: + self.cfs.mount() + logger.debug("[CephFS] mounted cephfs filesystem") + + def __del__(self): + logger.debug("[CephFS] shutting down cephfs filesystem") + self.cfs.shutdown() + + @contextmanager + def opendir(self, dirpath): + d = None + try: + d = self.cfs.opendir(dirpath) + yield d + finally: + if d: + self.cfs.closedir(d) + + def get_dir_list(self, dirpath, level): + logger.debug("[CephFS] get_dir_list dirpath=%s level=%s", dirpath, + level) + if level == 0: + return [dirpath] + logger.debug("[CephFS] opening dirpath=%s", dirpath) + with self.opendir(dirpath) as d: + dent = self.cfs.readdir(d) + paths = [dirpath] + while dent: + logger.debug("[CephFS] found entry=%s", dent.d_name) + if dent.d_name in ['.', '..']: + dent = self.cfs.readdir(d) + continue + if dent.is_dir(): + logger.debug("[CephFS] found dir=%s", dent.d_name) + subdirpath = '{}{}/'.format(dirpath, dent.d_name) + paths.extend(self.get_dir_list(subdirpath, level-1)) + dent = self.cfs.readdir(d) + return paths + + def dir_exists(self, dirpath): + try: + with self.opendir(dirpath): + return True + except cephfs.ObjectNotFound: + return False + + def mkdirs(self, dirpath): + if dirpath == '/': + raise Exception('Cannot create root directory "/"') + if self.dir_exists(dirpath): + return + + logger.info("[CephFS] Creating directory: %s", dirpath) + self.cfs.mkdirs("{}".format(dirpath).encode('utf-8'), 0o755) diff --git a/src/pybind/mgr/dashboard/services/cephx.py b/src/pybind/mgr/dashboard/services/cephx.py new file mode 100644 index 00000000..ccda3879 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/cephx.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from .ceph_service import CephService + + +class CephX(object): + @classmethod + def _entities_map(cls, entity_type=None): + auth_dump = CephService.send_command("mon", "auth list") + result = {} + for auth_entry in auth_dump['auth_dump']: + entity = auth_entry['entity'] + if not entity_type or entity.startswith('{}.'.format(entity_type)): + entity_id = entity[entity.find('.')+1:] + result[entity_id] = auth_entry + return result + + @classmethod + def _clients_map(cls): + return cls._entities_map("client") + + @classmethod + def list_clients(cls): + return [client for client in cls._clients_map()] + + @classmethod + def get_client_key(cls, client_id): + return cls._clients_map()[client_id]['key'] diff --git a/src/pybind/mgr/dashboard/services/exception.py b/src/pybind/mgr/dashboard/services/exception.py new file mode 100644 index 00000000..b5c0bd58 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/exception.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json +import sys +from contextlib import contextmanager + +import cherrypy + +import rbd +import rados + +from .. import logger +from ..services.ceph_service import SendCommandError +from ..exceptions import ViewCacheNoDataException, DashboardException +from ..tools import wraps + +if sys.version_info < (3, 0): + # Monkey-patch a __call__ method into @contextmanager to make + # it compatible to Python 3 + + from contextlib import GeneratorContextManager # pylint: disable=no-name-in-module + + def init(self, *args): + if len(args) == 1: + self.gen = args[0] + elif len(args) == 3: + self.func, self.args, self.kwargs = args + else: + raise TypeError() + + def enter(self): + if hasattr(self, 'func'): + self.gen = self.func(*self.args, **self.kwargs) + try: + return self.gen.next() + except StopIteration: + raise RuntimeError("generator didn't yield") + + def call(self, f): + @wraps(f) + def wrapper(*args, **kwargs): + with self: + return f(*args, **kwargs) + + return wrapper + + GeneratorContextManager.__init__ = init + GeneratorContextManager.__enter__ = enter + GeneratorContextManager.__call__ = call + + # pylint: disable=function-redefined + def contextmanager(func): + + @wraps(func) + def helper(*args, **kwds): + return GeneratorContextManager(func, args, kwds) + + return helper + + +def serialize_dashboard_exception(e, include_http_status=False, task=None): + """ + :type e: Exception + :param include_http_status: Used for Tasks, where the HTTP status code is not available. + """ + from ..tools import ViewCache + if isinstance(e, ViewCacheNoDataException): + return {'status': ViewCache.VALUE_NONE, 'value': None} + + out = dict(detail=str(e)) + try: + out['code'] = e.code + except AttributeError: + pass + component = getattr(e, 'component', None) + out['component'] = component if component else None + if include_http_status: + out['status'] = getattr(e, 'status', 500) + if task: + out['task'] = dict(name=task.name, metadata=task.metadata) + return out + + +def dashboard_exception_handler(handler, *args, **kwargs): + try: + with handle_rados_error(component=None): # make the None controller the fallback. + return handler(*args, **kwargs) + # Don't catch cherrypy.* Exceptions. + except (ViewCacheNoDataException, DashboardException) as e: + logger.exception('dashboard_exception_handler') + cherrypy.response.headers['Content-Type'] = 'application/json' + cherrypy.response.status = getattr(e, 'status', 400) + return json.dumps(serialize_dashboard_exception(e)).encode('utf-8') + + +@contextmanager +def handle_rbd_error(): + try: + yield + except rbd.OSError as e: + raise DashboardException(e, component='rbd') + except rbd.Error as e: + raise DashboardException(e, component='rbd', code=e.__class__.__name__) + + +@contextmanager +def handle_rados_error(component): + try: + yield + except rados.OSError as e: + raise DashboardException(e, component=component) + except rados.Error as e: + raise DashboardException(e, component=component, code=e.__class__.__name__) + + +@contextmanager +def handle_send_command_error(component): + try: + yield + except SendCommandError as e: + raise DashboardException(e, component=component) diff --git a/src/pybind/mgr/dashboard/services/ganesha.py b/src/pybind/mgr/dashboard/services/ganesha.py new file mode 100644 index 00000000..4053d20c --- /dev/null +++ b/src/pybind/mgr/dashboard/services/ganesha.py @@ -0,0 +1,998 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import re + +from orchestrator import OrchestratorError +from .cephfs import CephFS +from .cephx import CephX +from .orchestrator import OrchClient +from .rgw_client import RgwClient, RequestException, NoCredentialsException +from .. import mgr, logger +from ..settings import Settings +from ..exceptions import DashboardException + + +class NFSException(DashboardException): + def __init__(self, msg): + super(NFSException, self).__init__(component="nfs", msg=msg) + + +class Ganesha(object): + @classmethod + def _get_clusters_locations(cls): + result = {} + location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE + if not location_list_str: + raise NFSException("Ganesha config location is not configured. " + "Please set the GANESHA_RADOS_POOL_NAMESPACE " + "setting.") + location_list = [l.strip() for l in location_list_str.split(",")] + for location in location_list: + cluster = None + pool = None + namespace = None + if not location: + raise NFSException("Invalid Ganesha cluster RADOS " + "[cluster_id:]pool/namespace setting: {}" + .format(location)) + if location.count(':') < 1: + # default cluster_id + if location.count('/') > 1: + raise NFSException("Invalid Ganesha RADOS pool/namespace " + "setting: {}".format(location)) + # in this case accept pool/namespace only + cluster = "_default_" + if location.count('/') == 0: + pool, namespace = location, None + else: + pool, namespace = location.split('/', 1) + else: + cluster = location[:location.find(':')] + pool_nm = location[location.find(':')+1:] + if pool_nm.count('/') == 0: + pool, namespace = pool_nm, None + else: + pool, namespace = pool_nm.split('/', 1) + + if cluster in result: + raise NFSException("Duplicate Ganesha cluster definition in " + "the setting: {}".format(location_list_str)) + result[cluster] = (pool, namespace) + + return result + + @classmethod + def get_ganesha_clusters(cls): + return [cluster_id for cluster_id in cls._get_clusters_locations()] + + @staticmethod + def _get_orch_nfs_instances(): + try: + return OrchClient().list_service_info("nfs") + except (RuntimeError, OrchestratorError, ImportError): + return [] + + @classmethod + def get_daemons_status(cls): + instances = cls._get_orch_nfs_instances() + if not instances: + return None + + result = {} + for instance in instances: + if instance.service is None: + instance.service = "_default_" + if instance.service not in result: + result[instance.service] = {} + result[instance.service][instance.nodename] = { + 'status': instance.status, + 'desc': instance.status_desc, + } + return result + + @classmethod + def parse_rados_url(cls, rados_url): + if not rados_url.startswith("rados://"): + raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}" + .format(rados_url)) + rados_url = rados_url[8:] + url_comps = rados_url.split("/") + if len(url_comps) < 2 or len(url_comps) > 3: + raise NFSException("Invalid NFS Ganesha RADOS configuration URL: " + "rados://{}".format(rados_url)) + if len(url_comps) == 2: + return url_comps[0], None, url_comps[1] + return url_comps + + @classmethod + def make_rados_url(cls, pool, namespace, obj): + if namespace: + return "rados://{}/{}/{}".format(pool, namespace, obj) + return "rados://{}/{}".format(pool, obj) + + @classmethod + def get_pool_and_namespace(cls, cluster_id): + instances = cls._get_orch_nfs_instances() + # we assume that every instance stores there configuration in the + # same RADOS pool/namespace + if instances: + location = instances[0].rados_config_location + pool, ns, _ = cls.parse_rados_url(location) + return pool, ns + locations = cls._get_clusters_locations() + if cluster_id not in locations: + raise NFSException("Cluster not found: cluster_id={}" + .format(cluster_id)) + return locations[cluster_id] + + @classmethod + def reload_daemons(cls, cluster_id, daemons_id): + logger.debug("[NFS] issued reload of daemons: %s", daemons_id) + if not OrchClient().available(): + logger.debug("[NFS] orchestrator not available") + return + reload_list = [] + daemons = cls.get_daemons_status() + if cluster_id not in daemons: + raise NFSException("Cluster not found: cluster_id={}" + .format(cluster_id)) + for daemon_id in daemons_id: + if daemon_id not in daemons[cluster_id]: + continue + if daemons[cluster_id][daemon_id] == 1: + reload_list.append((cluster_id, daemon_id)) + OrchClient().reload_service("nfs", reload_list) + + @classmethod + def fsals_available(cls): + result = [] + if CephFS.list_filesystems(): + result.append("CEPH") + try: + if RgwClient.admin_instance().is_service_online() and \ + RgwClient.admin_instance().is_system_user(): + result.append("RGW") + except (NoCredentialsException, RequestException, LookupError): + pass + return result + + +class GaneshaConfParser(object): + def __init__(self, raw_config): + self.pos = 0 + self.text = "" + self.clean_config(raw_config) + + def clean_config(self, raw_config): + for line in raw_config.split("\n"): + cardinal_idx = line.find('#') + if cardinal_idx == -1: + self.text += line + else: + # remove comments + self.text += line[:cardinal_idx] + if line.startswith("%"): + self.text += "\n" + + def remove_all_whitespaces(self): + new_text = "" + in_string = False + in_section = False + for i, cha in enumerate(self.text): + if in_section: + if cha != '"' and self.text[i-1] != '\\': + new_text += cha + elif cha == '\n': + new_text += cha + in_section = False + elif i == (len(self.text)-1): + if cha != '"' and self.text[i-1] != '\\': + new_text += cha + in_section = False + elif not in_section and (i == 0 or self.text[i-1] == '\n') and cha == '%': + in_section = True + new_text += cha + elif in_string or cha not in [' ', '\n', '\t']: + new_text += cha + elif cha == '"' and self.text[i-1] != '\\': + in_string = not in_string + self.text = new_text + + def stream(self): + return self.text[self.pos:] + + def parse_block_name(self): + idx = self.stream().find('{') + if idx == -1: + raise Exception("Cannot find block name") + block_name = self.stream()[:idx] + self.pos += idx+1 + return block_name + + def parse_block_or_section(self): + if self.stream().startswith("%url "): + # section line + self.pos += 5 + idx = self.stream().find('\n') + if idx == -1: + value = self.stream() + self.pos += len(self.stream()) + else: + value = self.stream()[:idx] + self.pos += idx+1 + block_dict = {'block_name': '%url', 'value': value} + return block_dict + + block_name = self.parse_block_name().upper() + block_dict = {'block_name': block_name} + self.parse_block_body(block_dict) + if self.stream()[0] != '}': + raise Exception("No closing bracket '}' found at the end of block") + self.pos += 1 + return block_dict + + def parse_parameter_value(self, raw_value): + colon_idx = raw_value.find(',') + + if colon_idx == -1: + try: + return int(raw_value) + except ValueError: + if raw_value == "true": + return True + if raw_value == "false": + return False + if raw_value.find('"') == 0: + return raw_value[1:-1] + return raw_value + else: + return [self.parse_parameter_value(v.strip()) + for v in raw_value.split(',')] + + def parse_stanza(self, block_dict): + equal_idx = self.stream().find('=') + semicolon_idx = self.stream().find(';') + if equal_idx == -1: + raise Exception("Malformed stanza: no equal symbol found.") + parameter_name = self.stream()[:equal_idx].lower() + parameter_value = self.stream()[equal_idx+1:semicolon_idx] + block_dict[parameter_name] = self.parse_parameter_value( + parameter_value) + self.pos += semicolon_idx+1 + + def parse_block_body(self, block_dict): + last_pos = self.pos + while True: + semicolon_idx = self.stream().find(';') + lbracket_idx = self.stream().find('{') + rbracket_idx = self.stream().find('}') + + if rbracket_idx == 0: + # block end + return + + if (semicolon_idx != -1 and lbracket_idx != -1 + and semicolon_idx < lbracket_idx) \ + or (semicolon_idx != -1 and lbracket_idx == -1): + self.parse_stanza(block_dict) + elif (semicolon_idx != -1 and lbracket_idx != -1 + and semicolon_idx > lbracket_idx) or ( + semicolon_idx == -1 and lbracket_idx != -1): + if '_blocks_' not in block_dict: + block_dict['_blocks_'] = [] + block_dict['_blocks_'].append(self.parse_block_or_section()) + else: + raise Exception("Malformed stanza: no semicolon found.") + + if last_pos == self.pos: + raise Exception("Infinite loop while parsing block content") + last_pos = self.pos + + def parse(self): + self.remove_all_whitespaces() + blocks = [] + while self.stream(): + block_dict = self.parse_block_or_section() + blocks.append(block_dict) + return blocks + + @staticmethod + def _indentation(depth, size=4): + conf_str = "" + for _ in range(0, depth*size): + conf_str += " " + return conf_str + + @staticmethod + def write_block_body(block, depth=0): + def format_val(key, val): + if isinstance(val, list): + return ', '.join([format_val(key, v) for v in val]) + if isinstance(val, bool): + return str(val).lower() + if isinstance(val, int) or (block['block_name'] == 'CLIENT' + and key == 'clients'): + return '{}'.format(val) + return '"{}"'.format(val) + + conf_str = "" + for key, val in block.items(): + if key == 'block_name': + continue + elif key == '_blocks_': + for blo in val: + conf_str += GaneshaConfParser.write_block(blo, depth) + elif val: + conf_str += GaneshaConfParser._indentation(depth) + conf_str += '{} = {};\n'.format(key, format_val(key, val)) + return conf_str + + @staticmethod + def write_block(block, depth): + if block['block_name'] == "%url": + return '%url "{}"\n\n'.format(block['value']) + + conf_str = "" + conf_str += GaneshaConfParser._indentation(depth) + conf_str += format(block['block_name']) + conf_str += " {\n" + conf_str += GaneshaConfParser.write_block_body(block, depth+1) + conf_str += GaneshaConfParser._indentation(depth) + conf_str += "}\n\n" + return conf_str + + @staticmethod + def write_conf(blocks): + if not isinstance(blocks, list): + blocks = [blocks] + conf_str = "" + for block in blocks: + conf_str += GaneshaConfParser.write_block(block, 0) + return conf_str + + +class FSal(object): + def __init__(self, name): + self.name = name + + @classmethod + def validate_path(cls, _): + raise NotImplementedError() + + def validate(self): + raise NotImplementedError() + + def fill_keys(self): + raise NotImplementedError() + + def create_path(self, path): + raise NotImplementedError() + + @staticmethod + def from_fsal_block(fsal_block): + if fsal_block['name'] == "CEPH": + return CephFSFSal.from_fsal_block(fsal_block) + if fsal_block['name'] == 'RGW': + return RGWFSal.from_fsal_block(fsal_block) + return None + + def to_fsal_block(self): + raise NotImplementedError() + + @staticmethod + def from_dict(fsal_dict): + if fsal_dict['name'] == "CEPH": + return CephFSFSal.from_dict(fsal_dict) + if fsal_dict['name'] == 'RGW': + return RGWFSal.from_dict(fsal_dict) + return None + + def to_dict(self): + raise NotImplementedError() + + +class RGWFSal(FSal): + def __init__(self, name, rgw_user_id, access_key, secret_key): + super(RGWFSal, self).__init__(name) + self.rgw_user_id = rgw_user_id + self.access_key = access_key + self.secret_key = secret_key + + @classmethod + def validate_path(cls, path): + return path == "/" or re.match(r'^[^/><|&()#?]+$', path) + + def validate(self): + if not self.rgw_user_id: + raise NFSException('RGW user must be specified') + + if not RgwClient.admin_instance().user_exists(self.rgw_user_id): + raise NFSException("RGW user '{}' does not exist" + .format(self.rgw_user_id)) + + def fill_keys(self): + keys = RgwClient.admin_instance().get_user_keys(self.rgw_user_id) + self.access_key = keys['access_key'] + self.secret_key = keys['secret_key'] + + def create_path(self, path): + if path == '/': # nothing to do + return + rgw = RgwClient.instance(self.rgw_user_id) + try: + exists = rgw.bucket_exists(path, self.rgw_user_id) + logger.debug('Checking existence of RGW bucket "%s" for user "%s": %s', + path, self.rgw_user_id, exists) + except RequestException as exp: + if exp.status_code == 403: + raise NFSException('Cannot create bucket "{}" as it already ' + 'exists, and belongs to other user.' + .format(path)) + else: + raise exp + if not exists: + logger.info('Creating new RGW bucket "%s" for user "%s"', path, + self.rgw_user_id) + rgw.create_bucket(path) + + @classmethod + def from_fsal_block(cls, fsal_block): + return cls(fsal_block['name'], + fsal_block['user_id'], + fsal_block['access_key_id'], + fsal_block['secret_access_key']) + + def to_fsal_block(self): + return { + 'block_name': 'FSAL', + 'name': self.name, + 'user_id': self.rgw_user_id, + 'access_key_id': self.access_key, + 'secret_access_key': self.secret_key + } + + @classmethod + def from_dict(cls, fsal_dict): + return cls(fsal_dict['name'], fsal_dict['rgw_user_id'], None, None) + + def to_dict(self): + return { + 'name': self.name, + 'rgw_user_id': self.rgw_user_id + } + + +class CephFSFSal(FSal): + def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None, + cephx_key=None): + super(CephFSFSal, self).__init__(name) + self.fs_name = fs_name + self.user_id = user_id + self.sec_label_xattr = sec_label_xattr + self.cephx_key = cephx_key + + @classmethod + def validate_path(cls, path): + return re.match(r'^/[^><|&()?]*$', path) + + def validate(self): + if self.user_id and self.user_id not in CephX.list_clients(): + raise NFSException("cephx user '{}' does not exist" + .format(self.user_id)) + + def fill_keys(self): + if self.user_id: + self.cephx_key = CephX.get_client_key(self.user_id) + + def create_path(self, path): + cfs = CephFS() + if not cfs.dir_exists(path): + cfs.mkdirs(path) + + @classmethod + def from_fsal_block(cls, fsal_block): + return cls(fsal_block['name'], + fsal_block.get('user_id', None), + fsal_block.get('filesystem', None), + fsal_block.get('sec_label_xattr', None), + fsal_block.get('secret_access_key', None)) + + def to_fsal_block(self): + result = { + 'block_name': 'FSAL', + 'name': self.name, + } + if self.user_id: + result['user_id'] = self.user_id + if self.fs_name: + result['filesystem'] = self.fs_name + if self.sec_label_xattr: + result['sec_label_xattr'] = self.sec_label_xattr + if self.cephx_key: + result['secret_access_key'] = self.cephx_key + return result + + @classmethod + def from_dict(cls, fsal_dict): + return cls(fsal_dict['name'], fsal_dict['user_id'], + fsal_dict['fs_name'], fsal_dict['sec_label_xattr'], None) + + def to_dict(self): + return { + 'name': self.name, + 'user_id': self.user_id, + 'fs_name': self.fs_name, + 'sec_label_xattr': self.sec_label_xattr + } + + +class Client(object): + def __init__(self, addresses, access_type=None, squash=None): + self.addresses = addresses + self.access_type = access_type + self.squash = GaneshaConf.format_squash(squash) + + @classmethod + def from_client_block(cls, client_block): + addresses = client_block['clients'] + if not isinstance(addresses, list): + addresses = [addresses] + return cls(addresses, + client_block.get('access_type', None), + client_block.get('squash', None)) + + def to_client_block(self): + result = { + 'block_name': 'CLIENT', + 'clients': self.addresses, + } + if self.access_type: + result['access_type'] = self.access_type + if self.squash: + result['squash'] = self.squash + return result + + @classmethod + def from_dict(cls, client_dict): + return cls(client_dict['addresses'], client_dict['access_type'], + client_dict['squash']) + + def to_dict(self): + return { + 'addresses': self.addresses, + 'access_type': self.access_type, + 'squash': self.squash + } + + +class Export(object): + # pylint: disable=R0902 + def __init__(self, export_id, path, fsal, cluster_id, daemons, pseudo=None, + tag=None, access_type=None, squash=None, + attr_expiration_time=None, security_label=False, + protocols=None, transports=None, clients=None): + self.export_id = export_id + self.path = GaneshaConf.format_path(path) + self.fsal = fsal + self.cluster_id = cluster_id + self.daemons = set(daemons) + self.pseudo = GaneshaConf.format_path(pseudo) + self.tag = tag + self.access_type = access_type + self.squash = GaneshaConf.format_squash(squash) + if attr_expiration_time is None: + self.attr_expiration_time = 0 + else: + self.attr_expiration_time = attr_expiration_time + self.security_label = security_label + self.protocols = {GaneshaConf.format_protocol(p) for p in protocols} + self.transports = set(transports) + self.clients = clients + + def validate(self, daemons_list): + # pylint: disable=R0912 + for daemon_id in self.daemons: + if daemon_id not in daemons_list: + raise NFSException("Daemon '{}' does not exist" + .format(daemon_id)) + + if not self.fsal.validate_path(self.path): + raise NFSException("Export path ({}) is invalid.".format(self.path)) + + if not self.protocols: + raise NFSException( + "No NFS protocol version specified for the export.") + + if not self.transports: + raise NFSException( + "No network transport type specified for the export.") + + for t in self.transports: + match = re.match(r'^TCP$|^UDP$', t) + if not match: + raise NFSException( + "'{}' is an invalid network transport type identifier" + .format(t)) + + self.fsal.validate() + + if 4 in self.protocols: + if not self.pseudo: + raise NFSException( + "Pseudo path is required when NFSv4 protocol is used") + match = re.match(r'^/[^><|&()]*$', self.pseudo) + if not match: + raise NFSException( + "Export pseudo path ({}) is invalid".format(self.pseudo)) + + if self.tag: + match = re.match(r'^[^/><|:&()]+$', self.tag) + if not match: + raise NFSException( + "Export tag ({}) is invalid".format(self.tag)) + + if self.fsal.name == 'RGW' and 4 not in self.protocols and not self.tag: + raise NFSException( + "Tag is mandatory for RGW export when using only NFSv3") + + @classmethod + def from_export_block(cls, export_block, cluster_id, defaults): + logger.debug("[NFS] parsing export block: %s", export_block) + + fsal_block = [b for b in export_block['_blocks_'] + if b['block_name'] == "FSAL"] + + protocols = export_block.get('protocols', defaults['protocols']) + if not isinstance(protocols, list): + protocols = [protocols] + + transports = export_block.get('transports', defaults['transports']) + if not isinstance(transports, list): + transports = [transports] + + client_blocks = [b for b in export_block['_blocks_'] + if b['block_name'] == "CLIENT"] + + return cls(export_block['export_id'], + export_block['path'], + FSal.from_fsal_block(fsal_block[0]), + cluster_id, + [], + export_block.get('pseudo', None), + export_block.get('tag', None), + export_block.get('access_type', defaults['access_type']), + export_block.get('squash', defaults['squash']), + export_block.get('attr_expiration_time', None), + export_block.get('security_label', False), + protocols, + transports, + [Client.from_client_block(client) + for client in client_blocks]) + + def to_export_block(self, defaults): + # pylint: disable=too-many-branches + result = { + 'block_name': 'EXPORT', + 'export_id': self.export_id, + 'path': self.path + } + if self.pseudo: + result['pseudo'] = self.pseudo + if self.tag: + result['tag'] = self.tag + if 'access_type' not in defaults \ + or self.access_type != defaults['access_type']: + result['access_type'] = self.access_type + if 'squash' not in defaults or self.squash != defaults['squash']: + result['squash'] = self.squash + if self.fsal.name == 'CEPH': + result['attr_expiration_time'] = self.attr_expiration_time + result['security_label'] = self.security_label + if 'protocols' not in defaults: + result['protocols'] = [p for p in self.protocols] + else: + def_proto = defaults['protocols'] + if not isinstance(def_proto, list): + def_proto = set([def_proto]) + if self.protocols != def_proto: + result['protocols'] = [p for p in self.protocols] + if 'transports' not in defaults: + result['transports'] = [t for t in self.transports] + else: + def_transp = defaults['transports'] + if not isinstance(def_transp, list): + def_transp = set([def_transp]) + if self.transports != def_transp: + result['transports'] = [t for t in self.transports] + + result['_blocks_'] = [self.fsal.to_fsal_block()] + result['_blocks_'].extend([client.to_client_block() + for client in self.clients]) + return result + + @classmethod + def from_dict(cls, export_id, ex_dict, old_export=None): + return cls(export_id, + ex_dict['path'], + FSal.from_dict(ex_dict['fsal']), + ex_dict['cluster_id'], + ex_dict['daemons'], + ex_dict['pseudo'], + ex_dict['tag'], + ex_dict['access_type'], + ex_dict['squash'], + old_export.attr_expiration_time if old_export else None, + ex_dict['security_label'], + ex_dict['protocols'], + ex_dict['transports'], + [Client.from_dict(client) for client in ex_dict['clients']]) + + def to_dict(self): + return { + 'export_id': self.export_id, + 'path': self.path, + 'fsal': self.fsal.to_dict(), + 'cluster_id': self.cluster_id, + 'daemons': sorted([d for d in self.daemons]), + 'pseudo': self.pseudo, + 'tag': self.tag, + 'access_type': self.access_type, + 'squash': self.squash, + 'security_label': self.security_label, + 'protocols': sorted([p for p in self.protocols]), + 'transports': sorted([t for t in self.transports]), + 'clients': [client.to_dict() for client in self.clients] + } + + +class GaneshaConf(object): + # pylint: disable=R0902 + + def __init__(self, cluster_id, rados_pool, rados_namespace): + self.cluster_id = cluster_id + self.rados_pool = rados_pool + self.rados_namespace = rados_namespace + self.export_conf_blocks = [] + self.daemons_conf_blocks = {} + self._defaults = {} + self.exports = {} + + self._read_raw_config() + + # load defaults + def_block = [b for b in self.export_conf_blocks + if b['block_name'] == "EXPORT_DEFAULTS"] + self.export_defaults = def_block[0] if def_block else {} + self._defaults = self.ganesha_defaults(self.export_defaults) + + for export_block in [block for block in self.export_conf_blocks + if block['block_name'] == "EXPORT"]: + export = Export.from_export_block(export_block, cluster_id, + self._defaults) + self.exports[export.export_id] = export + + # link daemons to exports + for daemon_id, daemon_blocks in self.daemons_conf_blocks.items(): + for block in daemon_blocks: + if block['block_name'] == "%url": + rados_url = block['value'] + _, _, obj = Ganesha.parse_rados_url(rados_url) + if obj.startswith("export-"): + export_id = int(obj[obj.find('-')+1:]) + self.exports[export_id].daemons.add(daemon_id) + + @classmethod + def instance(cls, cluster_id): + pool, ns = Ganesha.get_pool_and_namespace(cluster_id) + return cls(cluster_id, pool, ns) + + def _read_raw_config(self): + with mgr.rados.open_ioctx(self.rados_pool) as ioctx: + if self.rados_namespace: + ioctx.set_namespace(self.rados_namespace) + objs = ioctx.list_objects() + for obj in objs: + if obj.key.startswith("export-"): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + logger.debug("[NFS] read export configuration from rados " + "object %s/%s/%s:\n%s", self.rados_pool, + self.rados_namespace, obj.key, raw_config) + self.export_conf_blocks.extend( + GaneshaConfParser(raw_config).parse()) + elif obj.key.startswith("conf-"): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + logger.debug("[NFS] read daemon configuration from rados " + "object %s/%s/%s:\n%s", self.rados_pool, + self.rados_namespace, obj.key, raw_config) + idx = obj.key.find('-') + self.daemons_conf_blocks[obj.key[idx+1:]] = \ + GaneshaConfParser(raw_config).parse() + + def _write_raw_config(self, conf_block, obj): + raw_config = GaneshaConfParser.write_conf(conf_block) + with mgr.rados.open_ioctx(self.rados_pool) as ioctx: + if self.rados_namespace: + ioctx.set_namespace(self.rados_namespace) + ioctx.write_full(obj, raw_config.encode('utf-8')) + logger.debug( + "[NFS] write configuration into rados object %s/%s/%s:\n%s", + self.rados_pool, self.rados_namespace, obj, raw_config) + + @classmethod + def ganesha_defaults(cls, export_defaults): + """ + According to + https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt + """ + return { + 'access_type': export_defaults.get('access_type', 'NONE'), + 'protocols': export_defaults.get('protocols', [3, 4]), + 'transports': export_defaults.get('transports', ['TCP', 'UDP']), + 'squash': export_defaults.get('squash', 'root_squash') + } + + @classmethod + def format_squash(cls, squash): + if squash is None: + return None + if squash.lower() in ["no_root_squash", "noidsquash", "none"]: + return "no_root_squash" + if squash.lower() in ["rootid", "root_id_squash", "rootidsquash"]: + return "root_id_squash" + if squash.lower() in ["root", "root_squash", "rootsquash"]: + return "root_squash" + if squash.lower() in ["all", "all_squash", "allsquash", + "all_anonymous", "allanonymous"]: + return "all_squash" + logger.error("[NFS] could not parse squash value: %s", squash) + raise NFSException("'{}' is an invalid squash option".format(squash)) + + @classmethod + def format_protocol(cls, protocol): + if str(protocol) in ["NFSV3", "3", "V3", "NFS3"]: + return 3 + if str(protocol) in ["NFSV4", "4", "V4", "NFS4"]: + return 4 + logger.error("[NFS] could not parse protocol value: %s", protocol) + raise NFSException("'{}' is an invalid NFS protocol version identifier" + .format(protocol)) + + @classmethod + def format_path(cls, path): + if path is not None: + path = path.strip() + if len(path) > 1 and path[-1] == '/': + path = path[:-1] + return path + + def validate(self, export): + export.validate(self.list_daemons()) + + if 4 in export.protocols: # NFSv4 protocol + len_prefix = 1 + parent_export = None + for ex in self.list_exports(): + if export.tag and ex.tag == export.tag: + raise NFSException( + "Another export exists with the same tag: {}" + .format(export.tag)) + + if export.pseudo and ex.pseudo == export.pseudo: + raise NFSException( + "Another export exists with the same pseudo path: {}" + .format(export.pseudo)) + + if not ex.pseudo: + continue + + if export.pseudo[:export.pseudo.rfind('/')+1].startswith(ex.pseudo): + if export.pseudo[len(ex.pseudo)] == '/': + if len(ex.pseudo) > len_prefix: + len_prefix = len(ex.pseudo) + parent_export = ex + + if len_prefix > 1: + # validate pseudo path + idx = len(parent_export.pseudo) + idx = idx + 1 if idx > 1 else idx + real_path = "{}/{}".format(parent_export.path + if len(parent_export.path) > 1 else "", + export.pseudo[idx:]) + if export.fsal.name == 'CEPH': + cfs = CephFS() + if export.path != real_path and not cfs.dir_exists(real_path): + raise NFSException( + "Pseudo path ({}) invalid, path {} does not exist." + .format(export.pseudo, real_path)) + + def _gen_export_id(self): + exports = sorted(self.exports) + nid = 1 + for e_id in exports: + if e_id == nid: + nid += 1 + else: + break + return nid + + def _persist_daemon_configuration(self): + daemon_map = {} + for daemon_id in self.list_daemons(): + daemon_map[daemon_id] = [] + + for _, ex in self.exports.items(): + for daemon in ex.daemons: + daemon_map[daemon].append({ + 'block_name': "%url", + 'value': Ganesha.make_rados_url( + self.rados_pool, self.rados_namespace, + "export-{}".format(ex.export_id)) + }) + for daemon_id, conf_blocks in daemon_map.items(): + self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id)) + + def _save_export(self, export): + self.validate(export) + export.fsal.create_path(export.path) + export.fsal.fill_keys() + self.exports[export.export_id] = export + conf_block = export.to_export_block(self.export_defaults) + self._write_raw_config(conf_block, "export-{}".format(export.export_id)) + self._persist_daemon_configuration() + + def _delete_export(self, export_id): + self._persist_daemon_configuration() + with mgr.rados.open_ioctx(self.rados_pool) as ioctx: + if self.rados_namespace: + ioctx.set_namespace(self.rados_namespace) + ioctx.remove_object("export-{}".format(export_id)) + + def list_exports(self): + return [ex for _, ex in self.exports.items()] + + def create_export(self, ex_dict): + ex_id = self._gen_export_id() + export = Export.from_dict(ex_id, ex_dict) + self._save_export(export) + return ex_id + + def has_export(self, export_id): + return export_id in self.exports + + def update_export(self, ex_dict): + if ex_dict['export_id'] not in self.exports: + return None + old_export = self.exports[ex_dict['export_id']] + del self.exports[ex_dict['export_id']] + export = Export.from_dict(ex_dict['export_id'], ex_dict, old_export) + self._save_export(export) + self.exports[export.export_id] = export + return old_export + + def remove_export(self, export_id): + if export_id not in self.exports: + return None + export = self.exports[export_id] + del self.exports[export_id] + self._delete_export(export_id) + return export + + def get_export(self, export_id): + if export_id in self.exports: + return self.exports[export_id] + return None + + def list_daemons(self): + return [daemon_id for daemon_id in self.daemons_conf_blocks] + + def reload_daemons(self, daemons): + with mgr.rados.open_ioctx(self.rados_pool) as ioctx: + if self.rados_namespace: + ioctx.set_namespace(self.rados_namespace) + for daemon_id in daemons: + ioctx.notify("conf-{}".format(daemon_id)) diff --git a/src/pybind/mgr/dashboard/services/iscsi_cli.py b/src/pybind/mgr/dashboard/services/iscsi_cli.py new file mode 100644 index 00000000..f962de59 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_cli.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import errno +import json + +from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand + +from .iscsi_client import IscsiClient +from .iscsi_config import IscsiGatewaysConfig, IscsiGatewayAlreadyExists, InvalidServiceUrl, \ + ManagedByOrchestratorException, IscsiGatewayDoesNotExist +from ..rest_client import RequestException + + +@CLIReadCommand('dashboard iscsi-gateway-list', desc='List iSCSI gateways') +def list_iscsi_gateways(_): + return 0, json.dumps(IscsiGatewaysConfig.get_gateways_config()), '' + + +@CLIWriteCommand('dashboard iscsi-gateway-add', + desc='Add iSCSI gateway configuration. Gateway URL read from -i <file>') +@CLICheckNonemptyFileInput +def add_iscsi_gateway(_, inbuf): + service_url = inbuf + try: + IscsiGatewaysConfig.validate_service_url(service_url) + name = IscsiClient.instance(service_url=service_url).get_hostname()['data'] + IscsiGatewaysConfig.add_gateway(name, service_url) + return 0, 'Success', '' + except IscsiGatewayAlreadyExists as ex: + return -errno.EEXIST, '', str(ex) + except InvalidServiceUrl as ex: + return -errno.EINVAL, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) + except RequestException as ex: + return -errno.EINVAL, '', str(ex) + + +@CLIWriteCommand('dashboard iscsi-gateway-rm', + 'name=name,type=CephString', + 'Remove iSCSI gateway configuration') +def remove_iscsi_gateway(_, name): + try: + IscsiGatewaysConfig.remove_gateway(name) + return 0, 'Success', '' + except IscsiGatewayDoesNotExist as ex: + return -errno.ENOENT, '', str(ex) + except ManagedByOrchestratorException as ex: + return -errno.EINVAL, '', str(ex) diff --git a/src/pybind/mgr/dashboard/services/iscsi_client.py b/src/pybind/mgr/dashboard/services/iscsi_client.py new file mode 100644 index 00000000..e66bd3cc --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_client.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-public-methods +from __future__ import absolute_import + +import json + +from requests.auth import HTTPBasicAuth + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from .iscsi_config import IscsiGatewaysConfig +from .. import logger +from ..settings import Settings +from ..rest_client import RestClient + + +class IscsiClient(RestClient): + _CLIENT_NAME = 'iscsi' + _instances = {} + + service_url = None + gateway_name = None + + @classmethod + def instance(cls, gateway_name=None, service_url=None): + if not service_url: + if not gateway_name: + gateway_name = list(IscsiGatewaysConfig.get_gateways_config()['gateways'].keys())[0] + gateways_config = IscsiGatewaysConfig.get_gateway_config(gateway_name) + service_url = gateways_config['service_url'] + + instance = cls._instances.get(gateway_name) + if not instance or service_url != instance.service_url or \ + instance.session.verify != Settings.ISCSI_API_SSL_VERIFICATION: + url = urlparse(service_url) + ssl = url.scheme == 'https' + host = url.hostname + port = url.port + username = url.username + password = url.password + if not port: + port = 443 if ssl else 80 + + auth = HTTPBasicAuth(username, password) + instance = IscsiClient(host, port, IscsiClient._CLIENT_NAME, ssl, + auth, Settings.ISCSI_API_SSL_VERIFICATION) + instance.service_url = service_url + instance.gateway_name = gateway_name + if gateway_name: + cls._instances[gateway_name] = instance + + return instance + + @RestClient.api_get('/api/_ping') + def ping(self, request=None): + return request() + + @RestClient.api_get('/api/settings') + def get_settings(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/ip_addresses') + def get_ip_addresses(self, request=None): + return request() + + @RestClient.api_get('/api/sysinfo/hostname') + def get_hostname(self, request=None): + return request() + + @RestClient.api_get('/api/config') + def get_config(self, request=None): + return request({ + 'decrypt_passwords': True + }) + + @RestClient.api_put('/api/target/{target_iqn}') + def create_target(self, target_iqn, target_controls, request=None): + logger.debug("iSCSI[%s] Creating target: %s", self.gateway_name, target_iqn) + return request({ + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_delete('/api/target/{target_iqn}') + def delete_target(self, target_iqn, request=None): + logger.debug("iSCSI[%s] Deleting target: %s", self.gateway_name, target_iqn) + return request() + + @RestClient.api_put('/api/target/{target_iqn}') + def reconfigure_target(self, target_iqn, target_controls, request=None): + logger.debug("iSCSI[%s] Reconfiguring target: %s", self.gateway_name, target_iqn) + return request({ + 'mode': 'reconfigure', + 'controls': json.dumps(target_controls) + }) + + @RestClient.api_put('/api/gateway/{target_iqn}/{gateway_name}') + def create_gateway(self, target_iqn, gateway_name, ip_address, request=None): + logger.debug("iSCSI[%s] Creating gateway: %s/%s", self.gateway_name, target_iqn, + gateway_name) + return request({ + 'ip_address': ','.join(ip_address), + 'skipchecks': 'true' + }) + + @RestClient.api_get('/api/gatewayinfo') + def get_gatewayinfo(self, request=None): + return request() + + @RestClient.api_delete('/api/gateway/{target_iqn}/{gateway_name}') + def delete_gateway(self, target_iqn, gateway_name, request=None): + logger.debug("iSCSI: Deleting gateway: %s/%s", target_iqn, gateway_name) + return request() + + @RestClient.api_put('/api/disk/{pool}/{image}') + def create_disk(self, pool, image, backstore, wwn, request=None): + logger.debug("iSCSI[%s] Creating disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'mode': 'create', + 'backstore': backstore, + 'wwn': wwn + }) + + @RestClient.api_delete('/api/disk/{pool}/{image}') + def delete_disk(self, pool, image, request=None): + logger.debug("iSCSI[%s] Deleting disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'preserve_image': 'true' + }) + + @RestClient.api_put('/api/disk/{pool}/{image}') + def reconfigure_disk(self, pool, image, controls, request=None): + logger.debug("iSCSI[%s] Reconfiguring disk: %s/%s", self.gateway_name, pool, image) + return request({ + 'controls': json.dumps(controls), + 'mode': 'reconfigure' + }) + + @RestClient.api_put('/api/targetlun/{target_iqn}') + def create_target_lun(self, target_iqn, image_id, lun, request=None): + logger.debug("iSCSI[%s] Creating target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id, + 'lun_id': lun + }) + + @RestClient.api_delete('/api/targetlun/{target_iqn}') + def delete_target_lun(self, target_iqn, image_id, request=None): + logger.debug("iSCSI[%s] Deleting target lun: %s/%s", self.gateway_name, target_iqn, + image_id) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/client/{target_iqn}/{client_iqn}') + def create_client(self, target_iqn, client_iqn, request=None): + logger.debug("iSCSI[%s] Creating client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_delete('/api/client/{target_iqn}/{client_iqn}') + def delete_client(self, target_iqn, client_iqn, request=None): + logger.debug("iSCSI[%s] Deleting client: %s/%s", self.gateway_name, target_iqn, client_iqn) + return request() + + @RestClient.api_put('/api/clientlun/{target_iqn}/{client_iqn}') + def create_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("iSCSI[%s] Creating client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_delete('/api/clientlun/{target_iqn}/{client_iqn}') + def delete_client_lun(self, target_iqn, client_iqn, image_id, request=None): + logger.debug("iSCSI[%s] Deleting client lun: %s/%s", self.gateway_name, target_iqn, + client_iqn) + return request({ + 'disk': image_id + }) + + @RestClient.api_put('/api/clientauth/{target_iqn}/{client_iqn}') + def create_client_auth(self, target_iqn, client_iqn, username, password, mutual_username, + mutual_password, request=None): + logger.debug("iSCSI[%s] Creating client auth: %s/%s/%s/%s/%s/%s", + self.gateway_name, target_iqn, client_iqn, username, password, mutual_username, + mutual_password) + return request({ + 'username': username, + 'password': password, + 'mutual_username': mutual_username, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def create_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("iSCSI[%s] Creating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_put('/api/hostgroup/{target_iqn}/{group_name}') + def update_group(self, target_iqn, group_name, members, image_ids, request=None): + logger.debug("iSCSI[%s] Updating group: %s/%s", self.gateway_name, target_iqn, group_name) + return request({ + 'action': 'remove', + 'members': ','.join(members), + 'disks': ','.join(image_ids) + }) + + @RestClient.api_delete('/api/hostgroup/{target_iqn}/{group_name}') + def delete_group(self, target_iqn, group_name, request=None): + logger.debug("iSCSI[%s] Deleting group: %s/%s", self.gateway_name, target_iqn, group_name) + return request() + + @RestClient.api_put('/api/discoveryauth') + def update_discoveryauth(self, user, password, mutual_user, mutual_password, request=None): + logger.debug("iSCSI[%s] Updating discoveryauth: %s/%s/%s/%s", self.gateway_name, user, + password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetacl(self, target_iqn, action, request=None): + logger.debug("iSCSI[%s] Updating targetacl: %s/%s", self.gateway_name, target_iqn, action) + return request({ + 'action': action + }) + + @RestClient.api_put('/api/targetauth/{target_iqn}') + def update_targetauth(self, target_iqn, user, password, mutual_user, mutual_password, + request=None): + logger.debug("iSCSI[%s] Updating targetauth: %s/%s/%s/%s/%s", self.gateway_name, + target_iqn, user, password, mutual_user, mutual_password) + return request({ + 'username': user, + 'password': password, + 'mutual_username': mutual_user, + 'mutual_password': mutual_password + }) + + @RestClient.api_get('/api/targetinfo/{target_iqn}') + def get_targetinfo(self, target_iqn, request=None): + # pylint: disable=unused-argument + return request() + + @RestClient.api_get('/api/clientinfo/{target_iqn}/{client_iqn}') + def get_clientinfo(self, target_iqn, client_iqn, request=None): + # pylint: disable=unused-argument + return request() diff --git a/src/pybind/mgr/dashboard/services/iscsi_config.py b/src/pybind/mgr/dashboard/services/iscsi_config.py new file mode 100644 index 00000000..2f119136 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/iscsi_config.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json + +from orchestrator import OrchestratorError + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +from mgr_util import merge_dicts +from .orchestrator import OrchClient +from .. import mgr + + +class IscsiGatewayAlreadyExists(Exception): + def __init__(self, gateway_name): + super(IscsiGatewayAlreadyExists, self).__init__( + "iSCSI gateway '{}' already exists".format(gateway_name)) + + +class IscsiGatewayDoesNotExist(Exception): + def __init__(self, hostname): + super(IscsiGatewayDoesNotExist, self).__init__( + "iSCSI gateway '{}' does not exist".format(hostname)) + + +class InvalidServiceUrl(Exception): + def __init__(self, service_url): + super(InvalidServiceUrl, self).__init__( + "Invalid service URL '{}'. " + "Valid format: '<scheme>://<username>:<password>@<host>[:port]'.".format(service_url)) + + +class ManagedByOrchestratorException(Exception): + def __init__(self): + super(ManagedByOrchestratorException, self).__init__( + "iSCSI configuration is managed by the orchestrator") + + +_ISCSI_STORE_KEY = "_iscsi_config" + + +class IscsiGatewaysConfig(object): + @classmethod + def _load_config_from_store(cls): + json_db = mgr.get_store(_ISCSI_STORE_KEY, + '{"gateways": {}}') + config = json.loads(json_db) + cls.update_iscsi_config(config) + return config + + @classmethod + def update_iscsi_config(cls, config): + """ + Since `ceph-iscsi` config v10, gateway names were renamed from host short name to FQDN. + If Ceph Dashboard were configured before v10, we try to update our internal gateways + database automatically. + """ + for gateway_name, gateway_config in config['gateways'].items(): + if '.' not in gateway_name: + from .iscsi_client import IscsiClient + from ..rest_client import RequestException + try: + service_url = gateway_config['service_url'] + new_gateway_name = IscsiClient.instance( + service_url=service_url).get_hostname()['data'] + if gateway_name != new_gateway_name: + config['gateways'][new_gateway_name] = gateway_config + del config['gateways'][gateway_name] + cls._save_config(config) + except RequestException: + # If gateway is not acessible, it should be removed manually + # or we will try to update automatically next time + continue + + @staticmethod + def _load_config_from_orchestrator(): + config = {'gateways': {}} + try: + instances = OrchClient().list_service_info("iscsi") + for instance in instances: + config['gateways'][instance.nodename] = { + 'service_url': instance.service_url + } + except (RuntimeError, OrchestratorError, ImportError): + pass + return config + + @classmethod + def _save_config(cls, config): + mgr.set_store(_ISCSI_STORE_KEY, json.dumps(config)) + + @classmethod + def validate_service_url(cls, service_url): + url = urlparse(service_url) + if not url.scheme or not url.hostname or not url.username or not url.password: + raise InvalidServiceUrl(service_url) + + @classmethod + def add_gateway(cls, name, service_url): + config = cls.get_gateways_config() + if name in config: + raise IscsiGatewayAlreadyExists(name) + IscsiGatewaysConfig.validate_service_url(service_url) + config['gateways'][name] = {'service_url': service_url} + cls._save_config(config) + + @classmethod + def remove_gateway(cls, name): + if name in cls._load_config_from_orchestrator()['gateways']: + raise ManagedByOrchestratorException() + + config = cls._load_config_from_store() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + + del config['gateways'][name] + cls._save_config(config) + + @classmethod + def get_gateways_config(cls): + orch_config = cls._load_config_from_orchestrator() + local_config = cls._load_config_from_store() + + return {'gateways': merge_dicts(orch_config['gateways'], local_config['gateways'])} + + @classmethod + def get_gateway_config(cls, name): + config = IscsiGatewaysConfig.get_gateways_config() + if name not in config['gateways']: + raise IscsiGatewayDoesNotExist(name) + return config['gateways'][name] diff --git a/src/pybind/mgr/dashboard/services/orchestrator.py b/src/pybind/mgr/dashboard/services/orchestrator.py new file mode 100644 index 00000000..366ff7de --- /dev/null +++ b/src/pybind/mgr/dashboard/services/orchestrator.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from orchestrator import OrchestratorClientMixin, raise_if_exception, OrchestratorError +from .. import mgr, logger + + +# pylint: disable=abstract-method +class OrchClient(OrchestratorClientMixin): + def __init__(self): + super(OrchClient, self).__init__() + self.set_mgr(mgr) + + def list_service_info(self, service_type): + # type: (str) -> list + completion = self.describe_service(service_type, None, None) + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return completion.result + + def available(self): + try: + status, desc = super(OrchClient, self).available() + logger.info("[ORCH] is orchestrator available: %s, %s", status, desc) + return status + except (RuntimeError, OrchestratorError, ImportError): + return False + + def reload_service(self, service_type, service_ids): + if not isinstance(service_ids, list): + service_ids = [service_ids] + + completion_list = [self.service_action('reload', service_type, + service_name, service_id) + for service_name, service_id in service_ids] + self._orchestrator_wait(completion_list) + for c in completion_list: + raise_if_exception(c) diff --git a/src/pybind/mgr/dashboard/services/rbd.py b/src/pybind/mgr/dashboard/services/rbd.py new file mode 100644 index 00000000..55c6f542 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rbd.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import six + +import rbd + +from .. import mgr +from .ceph_service import CephService + + +RBD_FEATURES_NAME_MAPPING = { + rbd.RBD_FEATURE_LAYERING: "layering", + rbd.RBD_FEATURE_STRIPINGV2: "striping", + rbd.RBD_FEATURE_EXCLUSIVE_LOCK: "exclusive-lock", + rbd.RBD_FEATURE_OBJECT_MAP: "object-map", + rbd.RBD_FEATURE_FAST_DIFF: "fast-diff", + rbd.RBD_FEATURE_DEEP_FLATTEN: "deep-flatten", + rbd.RBD_FEATURE_JOURNALING: "journaling", + rbd.RBD_FEATURE_DATA_POOL: "data-pool", + rbd.RBD_FEATURE_OPERATIONS: "operations", +} + + +def format_bitmask(features): + """ + Formats the bitmask: + + >>> format_bitmask(45) + ['deep-flatten', 'exclusive-lock', 'layering', 'object-map'] + """ + names = [val for key, val in RBD_FEATURES_NAME_MAPPING.items() + if key & features == key] + return sorted(names) + + +def format_features(features): + """ + Converts the features list to bitmask: + + >>> format_features(['deep-flatten', 'exclusive-lock', 'layering', 'object-map']) + 45 + + >>> format_features(None) is None + True + + >>> format_features('deep-flatten, exclusive-lock') + 32 + """ + if isinstance(features, six.string_types): + features = features.split(',') + + if not isinstance(features, list): + return None + + res = 0 + for key, value in RBD_FEATURES_NAME_MAPPING.items(): + if value in features: + res = key | res + return res + + +class RbdConfiguration(object): + _rbd = rbd.RBD() + + def __init__(self, pool_name='', image_name='', pool_ioctx=None, image_ioctx=None): + # type: (str, str, object, object) -> None + assert bool(pool_name) != bool(pool_ioctx) # xor + self._pool_name = pool_name + self._image_name = image_name + self._pool_ioctx = pool_ioctx + self._image_ioctx = image_ioctx + + @staticmethod + def _ensure_prefix(option): + # type: (str) -> str + return option if option.startswith('conf_') else 'conf_' + option + + def list(self): + # type: () -> [dict] + def _list(ioctx): + if self._image_name: # image config + try: + with rbd.Image(ioctx, self._image_name) as image: + result = image.config_list() + except rbd.ImageNotFound: + result = [] + else: # pool config + pg_status = list(CephService.get_pool_pg_status(self._pool_name).keys()) + if len(pg_status) == 1 and 'incomplete' in pg_status[0]: + # If config_list would be called with ioctx if it's a bad pool, + # the dashboard would stop working, waiting for the response + # that would not happen. + # + # This is only a workaround for https://tracker.ceph.com/issues/43771 which + # already got rejected as not worth the effort. + # + # Are more complete workaround for the dashboard will be implemented with + # https://tracker.ceph.com/issues/44224 + # + # @TODO: If #44224 is addressed remove this workaround + return [] + result = self._rbd.config_list(ioctx) + return list(result) + + if self._pool_name: + ioctx = mgr.rados.open_ioctx(self._pool_name) + else: + ioctx = self._pool_ioctx + + return _list(ioctx) + + def get(self, option_name): + # type: (str) -> str + option_name = self._ensure_prefix(option_name) + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + if self._image_name: + with rbd.Image(pool_ioctx, self._image_name) as image: + return image.metadata_get(option_name) + return self._rbd.pool_metadata_get(pool_ioctx, option_name) + + def set(self, option_name, option_value): + # type: (str, str) -> None + + option_value = str(option_value) + option_name = self._ensure_prefix(option_name) + + pool_ioctx = self._pool_ioctx + if self._pool_name: # open ioctx + pool_ioctx = mgr.rados.open_ioctx(self._pool_name) + pool_ioctx.__enter__() + + image_ioctx = self._image_ioctx + if self._image_name: + image_ioctx = rbd.Image(pool_ioctx, self._image_name) + image_ioctx.__enter__() + + if image_ioctx: + image_ioctx.metadata_set(option_name, option_value) + else: + self._rbd.pool_metadata_set(pool_ioctx, option_name, option_value) + + if self._image_name: # Name provided, so we opened it and now have to close it + image_ioctx.__exit__(None, None, None) + if self._pool_name: + pool_ioctx.__exit__(None, None, None) + + def remove(self, option_name): + """ + Removes an option by name. Will not raise an error, if the option hasn't been found. + :type option_name str + """ + def _remove(ioctx): + try: + if self._image_name: + with rbd.Image(ioctx, self._image_name) as image: + image.metadata_remove(option_name) + else: + self._rbd.pool_metadata_remove(ioctx, option_name) + except KeyError: + pass + + option_name = self._ensure_prefix(option_name) + + if self._pool_name: + with mgr.rados.open_ioctx(self._pool_name) as pool_ioctx: + _remove(pool_ioctx) + else: + _remove(self._pool_ioctx) + + def set_configuration(self, configuration): + if configuration: + for option_name, option_value in configuration.items(): + if option_value is not None: + self.set(option_name, option_value) + else: + self.remove(option_name) diff --git a/src/pybind/mgr/dashboard/services/rgw_client.py b/src/pybind/mgr/dashboard/services/rgw_client.py new file mode 100644 index 00000000..eef11794 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/rgw_client.py @@ -0,0 +1,437 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import re +from distutils.util import strtobool +from ..awsauth import S3Auth +from ..settings import Settings, Options +from ..rest_client import RestClient, RequestException +from ..tools import build_url, dict_contains_path, is_valid_ip_address +from .. import mgr, logger + + +class NoCredentialsException(RequestException): + def __init__(self): + super(NoCredentialsException, self).__init__( + 'No RGW credentials found, ' + 'please consult the documentation on how to enable RGW for ' + 'the dashboard.') + + +def _determine_rgw_addr(): + """ + Get a RGW daemon to determine the configured host (IP address) and port. + Note, the service id of the RGW daemons may differ depending on the setup. + Example 1: + { + ... + 'services': { + 'rgw': { + 'daemons': { + 'summary': '', + '0': { + ... + 'addr': '[2001:db8:85a3::8a2e:370:7334]:49774/1534999298', + 'metadata': { + 'frontend_config#0': 'civetweb port=7280', + } + ... + } + } + } + } + } + Example 2: + { + ... + 'services': { + 'rgw': { + 'daemons': { + 'summary': '', + 'rgw': { + ... + 'addr': '192.168.178.3:49774/1534999298', + 'metadata': { + 'frontend_config#0': 'civetweb port=8000', + } + ... + } + } + } + } + } + """ + service_map = mgr.get('service_map') + if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']): + raise LookupError('No RGW found') + daemon = None + daemons = service_map['services']['rgw']['daemons'] + for key in daemons.keys(): + if dict_contains_path(daemons[key], ['metadata', 'frontend_config#0']): + daemon = daemons[key] + break + if daemon is None: + raise LookupError('No RGW daemon found') + + addr = _parse_addr(daemon['addr']) + port, ssl = _parse_frontend_config(daemon['metadata']['frontend_config#0']) + + logger.info('Auto-detected RGW configuration: addr=%s, port=%d, ssl=%s', + addr, port, str(ssl)) + + return addr, port, ssl + + +def _parse_addr(value): + """ + Get the IP address the RGW is running on. + + >>> _parse_addr('192.168.178.3:49774/1534999298') + '192.168.178.3' + + >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298') + '2001:db8:85a3::8a2e:370:7334' + + >>> _parse_addr('xyz') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + >>> _parse_addr('192.168.178.a:8080/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '192.168.178.a' found + + >>> _parse_addr('[2001:0db8:1234]:443/123456789') + Traceback (most recent call last): + ... + LookupError: Invalid RGW address '2001:0db8:1234' found + + >>> _parse_addr('2001:0db8::1234:49774/1534999298') + Traceback (most recent call last): + ... + LookupError: Failed to determine RGW address + + :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'. + :type: str + :raises LookupError if parsing fails to determine the IP address. + :return: The IP address. + :rtype: str + """ + match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value) + if match: + # IPv4: + # Group 0: 192.168.178.3:49774/1534999298 + # Group 3: 192.168.178.3 + # IPv6: + # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298 + # Group 1: [ + # Group 2: 2001:db8:85a3::8a2e:370:7334 + addr = match.group(3) if match.group(3) else match.group(2) + if not is_valid_ip_address(addr): + raise LookupError('Invalid RGW address \'{}\' found'.format(addr)) + return addr + raise LookupError('Failed to determine RGW address') + + +def _parse_frontend_config(config): + """ + Get the port the RGW is running on. Due the complexity of the + syntax not all variations are supported. + + If there are multiple (ssl_)ports/(ssl_)endpoints options, then + the first found option will be returned. + + Get more details about the configuration syntax here: + http://docs.ceph.com/docs/master/radosgw/frontends/ + https://civetweb.github.io/civetweb/UserManual.html + + :param config: The configuration string to parse. + :type config: str + :raises LookupError if parsing fails to determine the port. + :return: A tuple containing the port number and the information + whether SSL is used. + :rtype: (int, boolean) + """ + match = re.search(r'^(beast|civetweb)\s+.+$', config) + if match: + if match.group(1) == 'beast': + match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)', + config) + if match: + option_name = match.group(1) + if option_name in ['port', 'ssl_port']: + match = re.search(r'(\d+)', match.group(2)) + if match: + port = int(match.group(1)) + ssl = option_name == 'ssl_port' + return port, ssl + if option_name in ['endpoint', 'ssl_endpoint']: + match = re.search(r'([\d.]+|\[.+\])(:(\d+))?', + match.group(2)) + if match: + port = int(match.group(3)) if \ + match.group(2) is not None else 443 if \ + option_name == 'ssl_endpoint' else \ + 80 + ssl = option_name == 'ssl_endpoint' + return port, ssl + if match.group(1) == 'civetweb': + match = re.search(r'port=(.*:)?(\d+)(s)?', config) + if match: + port = int(match.group(2)) + ssl = match.group(3) == 's' + return port, ssl + raise LookupError('Failed to determine RGW port from "{}"'.format(config)) + + +class RgwClient(RestClient): + _SYSTEM_USERID = None + _ADMIN_PATH = None + _host = None + _port = None + _ssl = None + _user_instances = {} + _rgw_settings_snapshot = None + + @staticmethod + def _load_settings(): + # The API access key and secret key are mandatory for a minimal configuration. + if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY): + logger.warning('No credentials found, please consult the ' + 'documentation about how to enable RGW for the ' + 'dashboard.') + raise NoCredentialsException() + + if Options.has_default_value('RGW_API_HOST') and \ + Options.has_default_value('RGW_API_PORT') and \ + Options.has_default_value('RGW_API_SCHEME'): + host, port, ssl = _determine_rgw_addr() + else: + host = Settings.RGW_API_HOST + port = Settings.RGW_API_PORT + ssl = Settings.RGW_API_SCHEME == 'https' + + RgwClient._host = host + RgwClient._port = port + RgwClient._ssl = ssl + RgwClient._ADMIN_PATH = Settings.RGW_API_ADMIN_RESOURCE + + # Create an instance using the configured settings. + instance = RgwClient(Settings.RGW_API_USER_ID, + Settings.RGW_API_ACCESS_KEY, + Settings.RGW_API_SECRET_KEY) + + RgwClient._SYSTEM_USERID = instance.userid + + # Append the instance to the internal map. + RgwClient._user_instances[RgwClient._SYSTEM_USERID] = instance + + @staticmethod + def _rgw_settings(): + return (Settings.RGW_API_HOST, + Settings.RGW_API_PORT, + Settings.RGW_API_ACCESS_KEY, + Settings.RGW_API_SECRET_KEY, + Settings.RGW_API_ADMIN_RESOURCE, + Settings.RGW_API_SCHEME, + Settings.RGW_API_USER_ID, + Settings.RGW_API_SSL_VERIFY) + + @staticmethod + def instance(userid): + # Discard all cached instances if any rgw setting has changed + if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings(): + RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings() + RgwClient.drop_instance() + + if not RgwClient._user_instances: + RgwClient._load_settings() + + if not userid: + userid = RgwClient._SYSTEM_USERID + + if userid not in RgwClient._user_instances: + # Get the access and secret keys for the specified user. + keys = RgwClient.admin_instance().get_user_keys(userid) + if not keys: + raise RequestException( + "User '{}' does not have any keys configured.".format( + userid)) + + # Create an instance and append it to the internal map. + RgwClient._user_instances[userid] = RgwClient(userid, + keys['access_key'], + keys['secret_key']) + + return RgwClient._user_instances[userid] + + @staticmethod + def admin_instance(): + return RgwClient.instance(RgwClient._SYSTEM_USERID) + + @staticmethod + def drop_instance(userid=None): + """ + Drop a cached instance by name or all. + """ + if userid: + RgwClient._user_instances.pop(userid, None) + else: + RgwClient._user_instances.clear() + + def _reset_login(self): + if self.userid != RgwClient._SYSTEM_USERID: + logger.info("Fetching new keys for user: %s", self.userid) + keys = RgwClient.admin_instance().get_user_keys(self.userid) + self.auth = S3Auth(keys['access_key'], keys['secret_key'], + service_url=self.service_url) + else: + raise RequestException('Authentication failed for the "{}" user: wrong credentials' + .format(self.userid), status_code=401) + + def __init__(self, # pylint: disable-msg=R0913 + userid, + access_key, + secret_key, + host=None, + port=None, + admin_path=None, + ssl=False): + + if not host and not RgwClient._host: + RgwClient._load_settings() + host = host if host else RgwClient._host + port = port if port else RgwClient._port + admin_path = admin_path if admin_path else RgwClient._ADMIN_PATH + ssl = ssl if ssl else RgwClient._ssl + ssl_verify = Settings.RGW_API_SSL_VERIFY + + self.service_url = build_url(host=host, port=port) + self.admin_path = admin_path + + s3auth = S3Auth(access_key, secret_key, service_url=self.service_url) + super(RgwClient, self).__init__(host, port, 'RGW', ssl, s3auth, ssl_verify=ssl_verify) + + # If user ID is not set, then try to get it via the RGW Admin Ops API. + self.userid = userid if userid else self._get_user_id(self.admin_path) + + logger.info("Created new connection: user=%s, host=%s, port=%s, ssl=%d, sslverify=%d", + self.userid, host, port, ssl, ssl_verify) + + @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)') + def is_service_online(self, request=None): + """ + Consider the service as online if the response contains the + specified keys. Nothing more is checked here. + """ + _ = request({'format': 'json'}) + return True + + @RestClient.api_get('/{admin_path}/metadata/user?myself', + resp_structure='data > user_id') + def _get_user_id(self, admin_path, request=None): + # pylint: disable=unused-argument + """ + Get the user ID of the user that is used to communicate with the + RGW Admin Ops API. + :rtype: str + :return: The user ID of the user that is used to sign the + RGW Admin Ops API calls. + """ + response = request() + return response['data']['user_id'] + + @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]') + def _user_exists(self, admin_path, user_id, request=None): + # pylint: disable=unused-argument + response = request() + if user_id: + return user_id in response + return self.userid in response + + def user_exists(self, user_id=None): + return self._user_exists(self.admin_path, user_id) + + @RestClient.api_get('/{admin_path}/metadata/user?key={userid}', + resp_structure='data > system') + def _is_system_user(self, admin_path, userid, request=None): + # pylint: disable=unused-argument + response = request() + return strtobool(response['data']['system']) + + def is_system_user(self): + return self._is_system_user(self.admin_path, self.userid) + + @RestClient.api_get( + '/{admin_path}/user', + resp_structure='tenant & user_id & email & keys[*] > ' + ' (user & access_key & secret_key)') + def _admin_get_user_keys(self, admin_path, userid, request=None): + # pylint: disable=unused-argument + colon_idx = userid.find(':') + user = userid if colon_idx == -1 else userid[:colon_idx] + response = request({'uid': user}) + for key in response['keys']: + if key['user'] == userid: + return { + 'access_key': key['access_key'], + 'secret_key': key['secret_key'] + } + return None + + def get_user_keys(self, userid): + return self._admin_get_user_keys(self.admin_path, userid) + + @RestClient.api('/{admin_path}/{path}') + def _proxy_request(self, # pylint: disable=too-many-arguments + admin_path, + path, + method, + params, + data, + request=None): + # pylint: disable=unused-argument + return request( + method=method, params=params, data=data, raw_content=True) + + def proxy(self, method, path, params, data): + logger.debug("proxying method=%s path=%s params=%s data=%s", method, + path, params, data) + return self._proxy_request(self.admin_path, path, method, params, data) + + @RestClient.api_get('/', resp_structure='[1][*] > Name') + def get_buckets(self, request=None): + """ + Get a list of names from all existing buckets of this user. + :return: Returns a list of bucket names. + """ + response = request({'format': 'json'}) + return [bucket['Name'] for bucket in response[1]] + + @RestClient.api_get('/{bucket_name}') + def bucket_exists(self, bucket_name, userid, request=None): + """ + Check if the specified bucket exists for this user. + :param bucket_name: The name of the bucket. + :return: Returns True if the bucket exists, otherwise False. + """ + # pylint: disable=unused-argument + try: + request() + my_buckets = self.get_buckets() + if bucket_name not in my_buckets: + raise RequestException( + 'Bucket "{}" belongs to other user'.format(bucket_name), + 403) + return True + except RequestException as e: + if e.status_code == 404: + return False + + raise e + + @RestClient.api_put('/{bucket_name}') + def create_bucket(self, bucket_name, request=None): + logger.info("Creating bucket: %s", bucket_name) + return request() diff --git a/src/pybind/mgr/dashboard/services/sso.py b/src/pybind/mgr/dashboard/services/sso.py new file mode 100644 index 00000000..007a5307 --- /dev/null +++ b/src/pybind/mgr/dashboard/services/sso.py @@ -0,0 +1,268 @@ +# -*- coding: utf-8 -*- +# pylint: disable=too-many-return-statements,too-many-branches +from __future__ import absolute_import + +import errno +import json +import sys +import threading +import six + +try: + from onelogin.saml2.settings import OneLogin_Saml2_Settings + from onelogin.saml2.errors import OneLogin_Saml2_Error + from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser + + python_saml_imported = True +except ImportError: + python_saml_imported = False + + +from .. import mgr, logger +from ..tools import prepare_url_prefix + + +class Saml2(object): + def __init__(self, onelogin_settings): + self.onelogin_settings = onelogin_settings + + def get_username_attribute(self): + return self.onelogin_settings['sp']['attributeConsumingService']['requestedAttributes'][0][ + 'name'] + + def to_dict(self): + return { + 'onelogin_settings': self.onelogin_settings + } + + @classmethod + def from_dict(cls, s_dict): + return Saml2(s_dict['onelogin_settings']) + + +class SsoDB(object): + VERSION = 1 + SSODB_CONFIG_KEY = "ssodb_v" + + def __init__(self, version, protocol, saml2): + self.version = version + self.protocol = protocol + self.saml2 = saml2 + self.lock = threading.RLock() + + def save(self): + with self.lock: + db = { + 'protocol': self.protocol, + 'saml2': self.saml2.to_dict(), + 'version': self.version + } + mgr.set_store(self.ssodb_config_key(), json.dumps(db)) + + @classmethod + def ssodb_config_key(cls, version=None): + if version is None: + version = cls.VERSION + return "{}{}".format(cls.SSODB_CONFIG_KEY, version) + + def check_and_update_db(self): + logger.debug("SSO: Checking for previews DB versions") + if self.VERSION != 1: + raise NotImplementedError() + + @classmethod + def load(cls): + logger.info("SSO: Loading SSO DB version=%s", cls.VERSION) + + json_db = mgr.get_store(cls.ssodb_config_key(), None) + if json_db is None: + logger.debug("SSO: No DB v%s found, creating new...", cls.VERSION) + db = cls(cls.VERSION, '', Saml2({})) + # check if we can update from a previous version database + db.check_and_update_db() + return db + + db = json.loads(json_db) + return cls(db['version'], db.get('protocol'), Saml2.from_dict(db.get('saml2'))) + + +def load_sso_db(): + mgr.SSO_DB = SsoDB.load() + + +SSO_COMMANDS = [ + { + 'cmd': 'dashboard sso enable saml2', + 'desc': 'Enable SAML2 Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso disable', + 'desc': 'Disable Single Sign-On', + 'perm': 'w' + }, + { + 'cmd': 'dashboard sso status', + 'desc': 'Get Single Sign-On status', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso show saml2', + 'desc': 'Show SAML2 configuration', + 'perm': 'r' + }, + { + 'cmd': 'dashboard sso setup saml2 ' + 'name=ceph_dashboard_base_url,type=CephString ' + 'name=idp_metadata,type=CephString ' + 'name=idp_username_attribute,type=CephString,req=false ' + 'name=idp_entity_id,type=CephString,req=false ' + 'name=sp_x_509_cert,type=CephString,req=false ' + 'name=sp_private_key,type=CephString,req=false', + 'desc': 'Setup SAML2 Single Sign-On', + 'perm': 'w' + } +] + + +def _get_optional_attr(cmd, attr, default): + if attr in cmd: + if cmd[attr] != '': + return cmd[attr] + return default + + +def handle_sso_command(cmd): + if cmd['prefix'] not in ['dashboard sso enable saml2', + 'dashboard sso disable', + 'dashboard sso status', + 'dashboard sso show saml2', + 'dashboard sso setup saml2']: + return -errno.ENOSYS, '', '' + + if cmd['prefix'] == 'dashboard sso disable': + mgr.SSO_DB.protocol = '' + mgr.SSO_DB.save() + return 0, 'SSO is "disabled".', '' + + if not python_saml_imported: + python_saml_name = 'python3-saml' if sys.version_info >= (3, 0) else 'python-saml' + return -errno.EPERM, '', 'Required library not found: `{}`'.format(python_saml_name) + + if cmd['prefix'] == 'dashboard sso enable saml2': + try: + OneLogin_Saml2_Settings(mgr.SSO_DB.saml2.onelogin_settings) + except OneLogin_Saml2_Error: + return -errno.EPERM, '', 'Single Sign-On is not configured: ' \ + 'use `ceph dashboard sso setup saml2`' + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + + if cmd['prefix'] == 'dashboard sso status': + if mgr.SSO_DB.protocol == 'saml2': + return 0, 'SSO is "enabled" with "SAML2" protocol.', '' + + return 0, 'SSO is "disabled".', '' + + if cmd['prefix'] == 'dashboard sso show saml2': + return 0, json.dumps(mgr.SSO_DB.saml2.to_dict()), '' + + if cmd['prefix'] == 'dashboard sso setup saml2': + ceph_dashboard_base_url = cmd['ceph_dashboard_base_url'] + idp_metadata = cmd['idp_metadata'] + idp_username_attribute = _get_optional_attr(cmd, 'idp_username_attribute', 'uid') + idp_entity_id = _get_optional_attr(cmd, 'idp_entity_id', None) + sp_x_509_cert = _get_optional_attr(cmd, 'sp_x_509_cert', '') + sp_private_key = _get_optional_attr(cmd, 'sp_private_key', '') + if sp_x_509_cert and not sp_private_key: + return -errno.EINVAL, '', 'Missing parameter `sp_private_key`.' + if not sp_x_509_cert and sp_private_key: + return -errno.EINVAL, '', 'Missing parameter `sp_x_509_cert`.' + has_sp_cert = sp_x_509_cert != "" and sp_private_key != "" + try: + # pylint: disable=undefined-variable + FileNotFoundError + except NameError: + # pylint: disable=redefined-builtin + FileNotFoundError = IOError + try: + f = open(sp_x_509_cert, 'r', encoding='utf-8') if six.PY3 else \ + open(sp_x_509_cert, 'rb') + sp_x_509_cert = f.read() + f.close() + except FileNotFoundError: + pass + try: + f = open(sp_private_key, 'r', encoding='utf-8') if six.PY3 else \ + open(sp_private_key, 'rb') + sp_private_key = f.read() + f.close() + except FileNotFoundError: + pass + try: + idp_settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_metadata, + validate_cert=False, + entity_id=idp_entity_id) + # pylint: disable=broad-except + except Exception: + try: + f = open(idp_metadata, 'r', encoding='utf-8') if six.PY3 else \ + open(idp_metadata, 'rb') + idp_metadata = f.read() + f.close() + except FileNotFoundError: + pass + try: + idp_settings = OneLogin_Saml2_IdPMetadataParser.parse(idp_metadata, + entity_id=idp_entity_id) + # pylint: disable=broad-except + except Exception: + return -errno.EINVAL, '', 'Invalid parameter `idp_metadata`.' + + url_prefix = prepare_url_prefix(mgr.get_module_option('url_prefix', default='')) + settings = { + 'sp': { + 'entityId': '{}{}/auth/saml2/metadata'.format(ceph_dashboard_base_url, url_prefix), + 'assertionConsumerService': { + 'url': '{}{}/auth/saml2'.format(ceph_dashboard_base_url, url_prefix), + 'binding': "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" + }, + 'attributeConsumingService': { + 'serviceName': "Ceph Dashboard", + "serviceDescription": "Ceph Dashboard Service", + "requestedAttributes": [ + { + "name": idp_username_attribute, + "isRequired": True + } + ] + }, + 'singleLogoutService': { + 'url': '{}{}/auth/saml2/logout'.format(ceph_dashboard_base_url, url_prefix), + 'binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect' + }, + "x509cert": sp_x_509_cert, + "privateKey": sp_private_key + }, + 'security': { + "nameIdEncrypted": has_sp_cert, + "authnRequestsSigned": has_sp_cert, + "logoutRequestSigned": has_sp_cert, + "logoutResponseSigned": has_sp_cert, + "signMetadata": has_sp_cert, + "wantMessagesSigned": has_sp_cert, + "wantAssertionsSigned": has_sp_cert, + "wantAssertionsEncrypted": has_sp_cert, + "wantNameIdEncrypted": False, # Not all Identity Providers support this. + "metadataValidUntil": '', + "wantAttributeStatement": False + } + } + settings = OneLogin_Saml2_IdPMetadataParser.merge_settings(settings, idp_settings) + mgr.SSO_DB.saml2.onelogin_settings = settings + mgr.SSO_DB.protocol = 'saml2' + mgr.SSO_DB.save() + return 0, json.dumps(mgr.SSO_DB.saml2.onelogin_settings), '' + + return -errno.ENOSYS, '', '' diff --git a/src/pybind/mgr/dashboard/services/tcmu_service.py b/src/pybind/mgr/dashboard/services/tcmu_service.py new file mode 100644 index 00000000..1f3e4d8d --- /dev/null +++ b/src/pybind/mgr/dashboard/services/tcmu_service.py @@ -0,0 +1,96 @@ +from mgr_util import get_most_recent_rate + +from dashboard.services.ceph_service import CephService +from .. import mgr + +SERVICE_TYPE = 'tcmu-runner' + + +class TcmuService(object): + # pylint: disable=too-many-nested-blocks + @staticmethod + def get_iscsi_info(): + daemons = {} + images = {} + for service in CephService.get_service_list(SERVICE_TYPE): + metadata = service['metadata'] + if metadata is None: + continue + status = service['status'] + hostname = service['hostname'] + + daemon = daemons.get(hostname, None) + if daemon is None: + daemon = { + 'server_hostname': hostname, + 'version': metadata['ceph_version'], + 'optimized_paths': 0, + 'non_optimized_paths': 0 + } + daemons[hostname] = daemon + + service_id = service['id'] + device_id = service_id.split(':')[-1] + image = images.get(device_id) + if image is None: + image = { + 'device_id': device_id, + 'pool_name': metadata['pool_name'], + 'name': metadata['image_name'], + 'id': metadata.get('image_id', None), + 'optimized_paths': [], + 'non_optimized_paths': [] + } + images[device_id] = image + + if status.get('lock_owner', 'false') == 'true': + daemon['optimized_paths'] += 1 + image['optimized_paths'].append(hostname) + + perf_key_prefix = "librbd-{id}-{pool}-{name}.".format( + id=metadata.get('image_id', ''), + pool=metadata['pool_name'], + name=metadata['image_name']) + perf_key = "{}lock_acquired_time".format(perf_key_prefix) + lock_acquired_time = (mgr.get_counter( + 'tcmu-runner', service_id, perf_key)[perf_key] + or [[0, 0]])[-1][1] / 1000000000 + if lock_acquired_time > image.get('optimized_since', 0): + image['optimized_daemon'] = hostname + image['optimized_since'] = lock_acquired_time + image['stats'] = {} + image['stats_history'] = {} + for s in ['rd', 'wr', 'rd_bytes', 'wr_bytes']: + perf_key = "{}{}".format(perf_key_prefix, s) + rates = CephService.get_rates('tcmu-runner', service_id, perf_key) + image['stats'][s] = get_most_recent_rate(rates) + image['stats_history'][s] = rates + else: + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(hostname) + + # clear up races w/ tcmu-runner clients that haven't detected + # loss of optimized path + for image in images.values(): + optimized_daemon = image.get('optimized_daemon', None) + if optimized_daemon: + for daemon_name in image['optimized_paths']: + if daemon_name != optimized_daemon: + daemon = daemons[daemon_name] + daemon['optimized_paths'] -= 1 + daemon['non_optimized_paths'] += 1 + image['non_optimized_paths'].append(daemon_name) + image['optimized_paths'] = [optimized_daemon] + + return { + 'daemons': sorted(daemons.values(), + key=lambda d: d['server_hostname']), + 'images': sorted(images.values(), key=lambda i: ['id']), + } + + @staticmethod + def get_image_info(pool_name, image_name, get_iscsi_info): + for image in get_iscsi_info['images']: + if image['pool_name'] == pool_name and image['name'] == image_name: + return image + return None |