# -*- coding: utf-8 -*- from __future__ import absolute_import import ipaddress import json import logging import re import xml.etree.ElementTree as ET # noqa: N814 from distutils.util import strtobool from subprocess import SubprocessError from mgr_util import build_url from .. import mgr from ..awsauth import S3Auth from ..exceptions import DashboardException from ..rest_client import RequestException, RestClient from ..settings import Settings from ..tools import dict_contains_path, dict_get, json_str_to_object try: from typing import Any, Dict, List, Optional, Tuple, Union except ImportError: pass # For typing only logger = logging.getLogger('rgw_client') class NoRgwDaemonsException(Exception): def __init__(self): super().__init__('No RGW service is running.') class NoCredentialsException(Exception): def __init__(self): super(NoCredentialsException, self).__init__( 'No RGW credentials found, ' 'please consult the documentation on how to enable RGW for ' 'the dashboard.') class RgwAdminException(Exception): pass class RgwDaemon: """Simple representation of a daemon.""" host: str name: str port: int ssl: bool realm_name: str zonegroup_name: str zone_name: str def _get_daemons() -> Dict[str, RgwDaemon]: """ Retrieve RGW daemon info from MGR. """ service_map = mgr.get('service_map') if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']): raise NoRgwDaemonsException daemons = {} daemon_map = service_map['services']['rgw']['daemons'] for key in daemon_map.keys(): if dict_contains_path(daemon_map[key], ['metadata', 'frontend_config#0']): daemon = _determine_rgw_addr(daemon_map[key]) daemon.name = daemon_map[key]['metadata']['id'] daemon.realm_name = daemon_map[key]['metadata']['realm_name'] daemon.zonegroup_name = daemon_map[key]['metadata']['zonegroup_name'] daemon.zone_name = daemon_map[key]['metadata']['zone_name'] daemons[daemon.name] = daemon logger.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s', daemon.host, daemon.port, str(daemon.ssl)) if not daemons: raise NoRgwDaemonsException return daemons def _determine_rgw_addr(daemon_info: Dict[str, Any]) -> RgwDaemon: """ Parse RGW daemon info to determine the configured host (IP address) and port. """ daemon = RgwDaemon() daemon.host = daemon_info['metadata']['hostname'] daemon.port, daemon.ssl = _parse_frontend_config(daemon_info['metadata']['frontend_config#0']) return daemon def _parse_addr(value) -> str: """ Get the IP address the RGW is running on. >>> _parse_addr('192.168.178.3:49774/1534999298') '192.168.178.3' >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298') '2001:db8:85a3::8a2e:370:7334' >>> _parse_addr('xyz') Traceback (most recent call last): ... LookupError: Failed to determine RGW address >>> _parse_addr('192.168.178.a:8080/123456789') Traceback (most recent call last): ... LookupError: Invalid RGW address '192.168.178.a' found >>> _parse_addr('[2001:0db8:1234]:443/123456789') Traceback (most recent call last): ... LookupError: Invalid RGW address '2001:0db8:1234' found >>> _parse_addr('2001:0db8::1234:49774/1534999298') Traceback (most recent call last): ... LookupError: Failed to determine RGW address :param value: The string to process. The syntax is ':/'. :type: str :raises LookupError if parsing fails to determine the IP address. :return: The IP address. :rtype: str """ match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value) if match: # IPv4: # Group 0: 192.168.178.3:49774/1534999298 # Group 3: 192.168.178.3 # IPv6: # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298 # Group 1: [ # Group 2: 2001:db8:85a3::8a2e:370:7334 addr = match.group(3) if match.group(3) else match.group(2) try: ipaddress.ip_address(addr) return addr except ValueError: raise LookupError('Invalid RGW address \'{}\' found'.format(addr)) raise LookupError('Failed to determine RGW address') def _parse_frontend_config(config) -> Tuple[int, bool]: """ Get the port the RGW is running on. Due the complexity of the syntax not all variations are supported. If there are multiple (ssl_)ports/(ssl_)endpoints options, then the first found option will be returned. Get more details about the configuration syntax here: http://docs.ceph.com/en/latest/radosgw/frontends/ https://civetweb.github.io/civetweb/UserManual.html :param config: The configuration string to parse. :type config: str :raises LookupError if parsing fails to determine the port. :return: A tuple containing the port number and the information whether SSL is used. :rtype: (int, boolean) """ match = re.search(r'^(beast|civetweb)\s+.+$', config) if match: if match.group(1) == 'beast': match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)', config) if match: option_name = match.group(1) if option_name in ['port', 'ssl_port']: match = re.search(r'(\d+)', match.group(2)) if match: port = int(match.group(1)) ssl = option_name == 'ssl_port' return port, ssl if option_name in ['endpoint', 'ssl_endpoint']: match = re.search(r'([\d.]+|\[.+\])(:(\d+))?', match.group(2)) # type: ignore if match: port = int(match.group(3)) if \ match.group(2) is not None else 443 if \ option_name == 'ssl_endpoint' else \ 80 ssl = option_name == 'ssl_endpoint' return port, ssl if match.group(1) == 'civetweb': # type: ignore match = re.search(r'port=(.*:)?(\d+)(s)?', config) if match: port = int(match.group(2)) ssl = match.group(3) == 's' return port, ssl raise LookupError('Failed to determine RGW port from "{}"'.format(config)) def _parse_secrets(user: str, data: dict) -> Tuple[str, str]: for key in data.get('keys', []): if key.get('user') == user and data.get('system') in ['true', True]: access_key = key.get('access_key') secret_key = key.get('secret_key') return access_key, secret_key return '', '' def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]: access_key = '' secret_key = '' rgw_user_info_cmd = ['user', 'info', '--uid', user] cmd_realm_option = ['--rgw-realm', realm] if realm else [] if realm: rgw_user_info_cmd += cmd_realm_option try: _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd) if out: access_key, secret_key = _parse_secrets(user, out) if not access_key: rgw_create_user_cmd = [ 'user', 'create', '--uid', user, '--display-name', 'Ceph Dashboard', '--system', ] + cmd_realm_option _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd) if out: access_key, secret_key = _parse_secrets(user, out) if not access_key: logger.error('Unable to create rgw user "%s": %s', user, err) except SubprocessError as error: logger.exception(error) return access_key, secret_key def configure_rgw_credentials(): logger.info('Configuring dashboard RGW credentials') user = 'dashboard' realms = [] access_key = '' secret_key = '' try: _, out, err = mgr.send_rgwadmin_command(['realm', 'list']) if out: realms = out.get('realms', []) if err: logger.error('Unable to list RGW realms: %s', err) if realms: realm_access_keys = {} realm_secret_keys = {} for realm in realms: realm_access_key, realm_secret_key = _get_user_keys(user, realm) if realm_access_key: realm_access_keys[realm] = realm_access_key realm_secret_keys[realm] = realm_secret_key if realm_access_keys: access_key = json.dumps(realm_access_keys) secret_key = json.dumps(realm_secret_keys) else: access_key, secret_key = _get_user_keys(user) assert access_key and secret_key Settings.RGW_API_ACCESS_KEY = access_key Settings.RGW_API_SECRET_KEY = secret_key except (AssertionError, SubprocessError) as error: logger.exception(error) raise NoCredentialsException class RgwClient(RestClient): _host = None _port = None _ssl = None _user_instances = {} # type: Dict[str, Dict[str, RgwClient]] _config_instances = {} # type: Dict[str, RgwClient] _rgw_settings_snapshot = None _daemons: Dict[str, RgwDaemon] = {} daemon: RgwDaemon got_keys_from_config: bool userid: str @staticmethod def _handle_response_status_code(status_code: int) -> int: # Do not return auth error codes (so they are not handled as ceph API user auth errors). return 404 if status_code in [401, 403] else status_code @staticmethod def _get_daemon_connection_info(daemon_name: str) -> dict: try: realm_name = RgwClient._daemons[daemon_name].realm_name access_key = Settings.RGW_API_ACCESS_KEY[realm_name] secret_key = Settings.RGW_API_SECRET_KEY[realm_name] except TypeError: # Legacy string values. access_key = Settings.RGW_API_ACCESS_KEY secret_key = Settings.RGW_API_SECRET_KEY except KeyError as error: raise DashboardException(msg='Credentials not found for RGW Daemon: {}'.format(error), http_status_code=404, component='rgw') return {'access_key': access_key, 'secret_key': secret_key} def _get_daemon_zone_info(self): # type: () -> dict return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None)) def _get_realms_info(self): # type: () -> dict return json_str_to_object(self.proxy('GET', 'realm?list', None, None)) def _get_realm_info(self, realm_id: str) -> Dict[str, Any]: return json_str_to_object(self.proxy('GET', f'realm?id={realm_id}', None, None)) @staticmethod def _rgw_settings(): return (Settings.RGW_API_ACCESS_KEY, Settings.RGW_API_SECRET_KEY, Settings.RGW_API_ADMIN_RESOURCE, Settings.RGW_API_SSL_VERIFY) @staticmethod def instance(userid: Optional[str] = None, daemon_name: Optional[str] = None) -> 'RgwClient': # pylint: disable=too-many-branches RgwClient._daemons = _get_daemons() # The API access key and secret key are mandatory for a minimal configuration. if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY): configure_rgw_credentials() if not daemon_name: # Select 1st daemon: daemon_name = next(iter(RgwClient._daemons.keys())) # Discard all cached instances if any rgw setting has changed if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings(): RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings() RgwClient.drop_instance() if daemon_name not in RgwClient._config_instances: connection_info = RgwClient._get_daemon_connection_info(daemon_name) RgwClient._config_instances[daemon_name] = RgwClient(connection_info['access_key'], connection_info['secret_key'], daemon_name) if not userid or userid == RgwClient._config_instances[daemon_name].userid: return RgwClient._config_instances[daemon_name] if daemon_name not in RgwClient._user_instances \ or userid not in RgwClient._user_instances[daemon_name]: # Get the access and secret keys for the specified user. keys = RgwClient._config_instances[daemon_name].get_user_keys(userid) if not keys: raise RequestException( "User '{}' does not have any keys configured.".format( userid)) instance = RgwClient(keys['access_key'], keys['secret_key'], daemon_name, userid) RgwClient._user_instances.update({daemon_name: {userid: instance}}) return RgwClient._user_instances[daemon_name][userid] @staticmethod def admin_instance(daemon_name: Optional[str] = None) -> 'RgwClient': return RgwClient.instance(daemon_name=daemon_name) @staticmethod def drop_instance(instance: Optional['RgwClient'] = None): """ Drop a cached instance or all. """ if instance: if instance.got_keys_from_config: del RgwClient._config_instances[instance.daemon.name] else: del RgwClient._user_instances[instance.daemon.name][instance.userid] else: RgwClient._config_instances.clear() RgwClient._user_instances.clear() def _reset_login(self): if self.got_keys_from_config: raise RequestException('Authentication failed for the "{}" user: wrong credentials' .format(self.userid), status_code=401) logger.info("Fetching new keys for user: %s", self.userid) keys = RgwClient.admin_instance(daemon_name=self.daemon.name).get_user_keys(self.userid) self.auth = S3Auth(keys['access_key'], keys['secret_key'], service_url=self.service_url) def __init__(self, access_key: str, secret_key: str, daemon_name: str, user_id: Optional[str] = None) -> None: try: daemon = RgwClient._daemons[daemon_name] except KeyError as error: raise DashboardException(msg='RGW Daemon not found: {}'.format(error), http_status_code=404, component='rgw') ssl_verify = Settings.RGW_API_SSL_VERIFY self.admin_path = Settings.RGW_API_ADMIN_RESOURCE self.service_url = build_url(host=daemon.host, port=daemon.port) self.auth = S3Auth(access_key, secret_key, service_url=self.service_url) super(RgwClient, self).__init__(daemon.host, daemon.port, 'RGW', daemon.ssl, self.auth, ssl_verify=ssl_verify) self.got_keys_from_config = not user_id try: self.userid = self._get_user_id(self.admin_path) if self.got_keys_from_config \ else user_id except RequestException as error: logger.exception(error) msg = 'Error connecting to Object Gateway' if error.status_code == 404: msg = '{}: {}'.format(msg, str(error)) raise DashboardException(msg=msg, http_status_code=error.status_code, component='rgw') self.daemon = daemon logger.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d", daemon.name, daemon.host, daemon.port, daemon.ssl, ssl_verify) @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)') def is_service_online(self, request=None) -> bool: """ Consider the service as online if the response contains the specified keys. Nothing more is checked here. """ _ = request({'format': 'json'}) return True @RestClient.api_get('/{admin_path}/metadata/user?myself', resp_structure='data > user_id') def _get_user_id(self, admin_path, request=None): # pylint: disable=unused-argument """ Get the user ID of the user that is used to communicate with the RGW Admin Ops API. :rtype: str :return: The user ID of the user that is used to sign the RGW Admin Ops API calls. """ response = request() return response['data']['user_id'] @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]') def _user_exists(self, admin_path, user_id, request=None): # pylint: disable=unused-argument response = request() if user_id: return user_id in response return self.userid in response def user_exists(self, user_id=None): return self._user_exists(self.admin_path, user_id) @RestClient.api_get('/{admin_path}/metadata/user?key={userid}', resp_structure='data > system') def _is_system_user(self, admin_path, userid, request=None) -> bool: # pylint: disable=unused-argument response = request() return strtobool(response['data']['system']) def is_system_user(self) -> bool: return self._is_system_user(self.admin_path, self.userid) @RestClient.api_get( '/{admin_path}/user', resp_structure='tenant & user_id & email & keys[*] > ' ' (user & access_key & secret_key)') def _admin_get_user_keys(self, admin_path, userid, request=None): # pylint: disable=unused-argument colon_idx = userid.find(':') user = userid if colon_idx == -1 else userid[:colon_idx] response = request({'uid': user}) for key in response['keys']: if key['user'] == userid: return { 'access_key': key['access_key'], 'secret_key': key['secret_key'] } return None def get_user_keys(self, userid): return self._admin_get_user_keys(self.admin_path, userid) @RestClient.api('/{admin_path}/{path}') def _proxy_request( self, # pylint: disable=too-many-arguments admin_path, path, method, params, data, request=None): # pylint: disable=unused-argument return request(method=method, params=params, data=data, raw_content=True) def proxy(self, method, path, params, data): logger.debug("proxying method=%s path=%s params=%s data=%s", method, path, params, data) return self._proxy_request(self.admin_path, path, method, params, data) @RestClient.api_get('/', resp_structure='[1][*] > Name') def get_buckets(self, request=None): """ Get a list of names from all existing buckets of this user. :return: Returns a list of bucket names. """ response = request({'format': 'json'}) return [bucket['Name'] for bucket in response[1]] @RestClient.api_get('/{bucket_name}') def bucket_exists(self, bucket_name, userid, request=None): """ Check if the specified bucket exists for this user. :param bucket_name: The name of the bucket. :return: Returns True if the bucket exists, otherwise False. """ # pylint: disable=unused-argument try: request() my_buckets = self.get_buckets() if bucket_name not in my_buckets: raise RequestException( 'Bucket "{}" belongs to other user'.format(bucket_name), 403) return True except RequestException as e: if e.status_code == 404: return False raise e @RestClient.api_put('/{bucket_name}') def create_bucket(self, bucket_name, zonegroup=None, placement_target=None, lock_enabled=False, request=None): logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s", bucket_name, zonegroup, placement_target) data = None if zonegroup and placement_target: create_bucket_configuration = ET.Element('CreateBucketConfiguration') location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint') location_constraint.text = '{}:{}'.format(zonegroup, placement_target) data = ET.tostring(create_bucket_configuration, encoding='unicode') headers = None # type: Optional[dict] if lock_enabled: headers = {'x-amz-bucket-object-lock-enabled': 'true'} return request(data=data, headers=headers) def get_placement_targets(self): # type: () -> dict zone = self._get_daemon_zone_info() placement_targets = [] # type: List[Dict] for placement_pool in zone['placement_pools']: placement_targets.append( { 'name': placement_pool['key'], 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool'] } ) return {'zonegroup': self.daemon.zonegroup_name, 'placement_targets': placement_targets} def get_realms(self): # type: () -> List realms_info = self._get_realms_info() if 'realms' in realms_info and realms_info['realms']: return realms_info['realms'] return [] def get_default_realm(self) -> str: realms_info = self._get_realms_info() if 'default_info' in realms_info and realms_info['default_info']: realm_info = self._get_realm_info(realms_info['default_info']) if 'name' in realm_info and realm_info['name']: return realm_info['name'] raise DashboardException(msg='Default realm not found.', http_status_code=404, component='rgw') @RestClient.api_get('/{bucket_name}?versioning') def get_bucket_versioning(self, bucket_name, request=None): """ Get bucket versioning. :param str bucket_name: the name of the bucket. :return: versioning info :rtype: Dict """ # pylint: disable=unused-argument result = request() if 'Status' not in result: result['Status'] = 'Suspended' if 'MfaDelete' not in result: result['MfaDelete'] = 'Disabled' return result @RestClient.api_put('/{bucket_name}?versioning') def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete, mfa_token_serial, mfa_token_pin, request=None): """ Set bucket versioning. :param str bucket_name: the name of the bucket. :param str versioning_state: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html :param str mfa_delete: MFA Delete state. :param str mfa_token_serial: https://docs.ceph.com/docs/master/radosgw/mfa/ :param str mfa_token_pin: value of a TOTP token at a certain time (auth code) :return: None """ # pylint: disable=unused-argument versioning_configuration = ET.Element('VersioningConfiguration') status_element = ET.SubElement(versioning_configuration, 'Status') status_element.text = versioning_state headers = {} if mfa_delete and mfa_token_serial and mfa_token_pin: headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin) mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete') mfa_delete_element.text = mfa_delete data = ET.tostring(versioning_configuration, encoding='unicode') try: request(data=data, headers=headers) except RequestException as error: msg = str(error) if mfa_delete and mfa_token_serial and mfa_token_pin \ and 'AccessDenied' in error.content.decode(): msg = 'Bad MFA credentials: {}'.format(msg) raise DashboardException(msg=msg, http_status_code=error.status_code, component='rgw') @RestClient.api_get('/{bucket_name}?object-lock') def get_bucket_locking(self, bucket_name, request=None): # type: (str, Optional[object]) -> dict """ Gets the locking configuration for a bucket. The locking configuration will be applied by default to every new object placed in the specified bucket. :param bucket_name: The name of the bucket. :type bucket_name: str :return: The locking configuration. :rtype: Dict """ # pylint: disable=unused-argument # Try to get the Object Lock configuration. If there is none, # then return default values. try: result = request() # type: ignore return { 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled', 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'), 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0), 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0) } except RequestException as e: if e.content: content = json_str_to_object(e.content) if content.get( 'Code') == 'ObjectLockConfigurationNotFoundError': return { 'lock_enabled': False, 'lock_mode': 'compliance', 'lock_retention_period_days': None, 'lock_retention_period_years': None } raise e @RestClient.api_put('/{bucket_name}?object-lock') def set_bucket_locking(self, bucket_name: str, mode: str, retention_period_days: Optional[Union[int, str]] = None, retention_period_years: Optional[Union[int, str]] = None, request: Optional[object] = None) -> None: """ Places the locking configuration on the specified bucket. The locking configuration will be applied by default to every new object placed in the specified bucket. :param bucket_name: The name of the bucket. :type bucket_name: str :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`. :type mode: str :param retention_period_days: :type retention_period_days: int :param retention_period_years: :type retention_period_years: int :rtype: None """ # pylint: disable=unused-argument # Do some validations. try: retention_period_days = int(retention_period_days) if retention_period_days else 0 retention_period_years = int(retention_period_years) if retention_period_years else 0 if retention_period_days < 0 or retention_period_years < 0: raise ValueError except (TypeError, ValueError): msg = "Retention period must be a positive integer." raise DashboardException(msg=msg, component='rgw') if retention_period_days and retention_period_years: # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html msg = "Retention period requires either Days or Years. "\ "You can't specify both at the same time." raise DashboardException(msg=msg, component='rgw') if not retention_period_days and not retention_period_years: msg = "Retention period requires either Days or Years. "\ "You must specify at least one." raise DashboardException(msg=msg, component='rgw') if not isinstance(mode, str) or mode.upper() not in ['COMPLIANCE', 'GOVERNANCE']: msg = "Retention mode must be either COMPLIANCE or GOVERNANCE." raise DashboardException(msg=msg, component='rgw') # Generate the XML data like this: # # string # # # integer # string # integer # # # locking_configuration = ET.Element('ObjectLockConfiguration') enabled_element = ET.SubElement(locking_configuration, 'ObjectLockEnabled') enabled_element.text = 'Enabled' # Locking can't be disabled. rule_element = ET.SubElement(locking_configuration, 'Rule') default_retention_element = ET.SubElement(rule_element, 'DefaultRetention') mode_element = ET.SubElement(default_retention_element, 'Mode') mode_element.text = mode.upper() if retention_period_days: days_element = ET.SubElement(default_retention_element, 'Days') days_element.text = str(retention_period_days) if retention_period_years: years_element = ET.SubElement(default_retention_element, 'Years') years_element.text = str(retention_period_years) data = ET.tostring(locking_configuration, encoding='unicode') try: _ = request(data=data) # type: ignore except RequestException as e: raise DashboardException(msg=str(e), component='rgw')