summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/services/rgw_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/dashboard/services/rgw_client.py')
-rw-r--r--src/pybind/mgr/dashboard/services/rgw_client.py764
1 files changed, 764 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/services/rgw_client.py b/src/pybind/mgr/dashboard/services/rgw_client.py
new file mode 100644
index 000000000..4c401f2a4
--- /dev/null
+++ b/src/pybind/mgr/dashboard/services/rgw_client.py
@@ -0,0 +1,764 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import ipaddress
+import json
+import logging
+import re
+import xml.etree.ElementTree as ET # noqa: N814
+from distutils.util import strtobool
+from subprocess import SubprocessError
+
+from mgr_util import build_url
+
+from .. import mgr
+from ..awsauth import S3Auth
+from ..exceptions import DashboardException
+from ..rest_client import RequestException, RestClient
+from ..settings import Settings
+from ..tools import dict_contains_path, dict_get, json_str_to_object
+
+try:
+ from typing import Any, Dict, List, Optional, Tuple, Union
+except ImportError:
+ pass # For typing only
+
+logger = logging.getLogger('rgw_client')
+
+
+class NoRgwDaemonsException(Exception):
+ def __init__(self):
+ super().__init__('No RGW service is running.')
+
+
+class NoCredentialsException(Exception):
+ def __init__(self):
+ super(NoCredentialsException, self).__init__(
+ 'No RGW credentials found, '
+ 'please consult the documentation on how to enable RGW for '
+ 'the dashboard.')
+
+
+class RgwAdminException(Exception):
+ pass
+
+
+class RgwDaemon:
+ """Simple representation of a daemon."""
+ host: str
+ name: str
+ port: int
+ ssl: bool
+ realm_name: str
+ zonegroup_name: str
+ zone_name: str
+
+
+def _get_daemons() -> Dict[str, RgwDaemon]:
+ """
+ Retrieve RGW daemon info from MGR.
+ """
+ service_map = mgr.get('service_map')
+ if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']):
+ raise NoRgwDaemonsException
+
+ daemons = {}
+ daemon_map = service_map['services']['rgw']['daemons']
+ for key in daemon_map.keys():
+ if dict_contains_path(daemon_map[key], ['metadata', 'frontend_config#0']):
+ daemon = _determine_rgw_addr(daemon_map[key])
+ daemon.name = daemon_map[key]['metadata']['id']
+ daemon.realm_name = daemon_map[key]['metadata']['realm_name']
+ daemon.zonegroup_name = daemon_map[key]['metadata']['zonegroup_name']
+ daemon.zone_name = daemon_map[key]['metadata']['zone_name']
+ daemons[daemon.name] = daemon
+ logger.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s',
+ daemon.host, daemon.port, str(daemon.ssl))
+ if not daemons:
+ raise NoRgwDaemonsException
+
+ return daemons
+
+
+def _determine_rgw_addr(daemon_info: Dict[str, Any]) -> RgwDaemon:
+ """
+ Parse RGW daemon info to determine the configured host (IP address) and port.
+ """
+ daemon = RgwDaemon()
+ daemon.host = daemon_info['metadata']['hostname']
+ daemon.port, daemon.ssl = _parse_frontend_config(daemon_info['metadata']['frontend_config#0'])
+
+ return daemon
+
+
+def _parse_addr(value) -> str:
+ """
+ Get the IP address the RGW is running on.
+
+ >>> _parse_addr('192.168.178.3:49774/1534999298')
+ '192.168.178.3'
+
+ >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
+ '2001:db8:85a3::8a2e:370:7334'
+
+ >>> _parse_addr('xyz')
+ Traceback (most recent call last):
+ ...
+ LookupError: Failed to determine RGW address
+
+ >>> _parse_addr('192.168.178.a:8080/123456789')
+ Traceback (most recent call last):
+ ...
+ LookupError: Invalid RGW address '192.168.178.a' found
+
+ >>> _parse_addr('[2001:0db8:1234]:443/123456789')
+ Traceback (most recent call last):
+ ...
+ LookupError: Invalid RGW address '2001:0db8:1234' found
+
+ >>> _parse_addr('2001:0db8::1234:49774/1534999298')
+ Traceback (most recent call last):
+ ...
+ LookupError: Failed to determine RGW address
+
+ :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
+ :type: str
+ :raises LookupError if parsing fails to determine the IP address.
+ :return: The IP address.
+ :rtype: str
+ """
+ match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value)
+ if match:
+ # IPv4:
+ # Group 0: 192.168.178.3:49774/1534999298
+ # Group 3: 192.168.178.3
+ # IPv6:
+ # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
+ # Group 1: [
+ # Group 2: 2001:db8:85a3::8a2e:370:7334
+ addr = match.group(3) if match.group(3) else match.group(2)
+ try:
+ ipaddress.ip_address(addr)
+ return addr
+ except ValueError:
+ raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
+ raise LookupError('Failed to determine RGW address')
+
+
+def _parse_frontend_config(config) -> Tuple[int, bool]:
+ """
+ Get the port the RGW is running on. Due the complexity of the
+ syntax not all variations are supported.
+
+ If there are multiple (ssl_)ports/(ssl_)endpoints options, then
+ the first found option will be returned.
+
+ Get more details about the configuration syntax here:
+ http://docs.ceph.com/en/latest/radosgw/frontends/
+ https://civetweb.github.io/civetweb/UserManual.html
+
+ :param config: The configuration string to parse.
+ :type config: str
+ :raises LookupError if parsing fails to determine the port.
+ :return: A tuple containing the port number and the information
+ whether SSL is used.
+ :rtype: (int, boolean)
+ """
+ match = re.search(r'^(beast|civetweb)\s+.+$', config)
+ if match:
+ if match.group(1) == 'beast':
+ match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
+ config)
+ if match:
+ option_name = match.group(1)
+ if option_name in ['port', 'ssl_port']:
+ match = re.search(r'(\d+)', match.group(2))
+ if match:
+ port = int(match.group(1))
+ ssl = option_name == 'ssl_port'
+ return port, ssl
+ if option_name in ['endpoint', 'ssl_endpoint']:
+ match = re.search(r'([\d.]+|\[.+\])(:(\d+))?',
+ match.group(2)) # type: ignore
+ if match:
+ port = int(match.group(3)) if \
+ match.group(2) is not None else 443 if \
+ option_name == 'ssl_endpoint' else \
+ 80
+ ssl = option_name == 'ssl_endpoint'
+ return port, ssl
+ if match.group(1) == 'civetweb': # type: ignore
+ match = re.search(r'port=(.*:)?(\d+)(s)?', config)
+ if match:
+ port = int(match.group(2))
+ ssl = match.group(3) == 's'
+ return port, ssl
+ raise LookupError('Failed to determine RGW port from "{}"'.format(config))
+
+
+def _parse_secrets(user: str, data: dict) -> Tuple[str, str]:
+ for key in data.get('keys', []):
+ if key.get('user') == user and data.get('system') in ['true', True]:
+ access_key = key.get('access_key')
+ secret_key = key.get('secret_key')
+ return access_key, secret_key
+ return '', ''
+
+
+def _get_user_keys(user: str, realm: Optional[str] = None) -> Tuple[str, str]:
+ access_key = ''
+ secret_key = ''
+ rgw_user_info_cmd = ['user', 'info', '--uid', user]
+ cmd_realm_option = ['--rgw-realm', realm] if realm else []
+ if realm:
+ rgw_user_info_cmd += cmd_realm_option
+ try:
+ _, out, err = mgr.send_rgwadmin_command(rgw_user_info_cmd)
+ if out:
+ access_key, secret_key = _parse_secrets(user, out)
+ if not access_key:
+ rgw_create_user_cmd = [
+ 'user', 'create',
+ '--uid', user,
+ '--display-name', 'Ceph Dashboard',
+ '--system',
+ ] + cmd_realm_option
+ _, out, err = mgr.send_rgwadmin_command(rgw_create_user_cmd)
+ if out:
+ access_key, secret_key = _parse_secrets(user, out)
+ if not access_key:
+ logger.error('Unable to create rgw user "%s": %s', user, err)
+ except SubprocessError as error:
+ logger.exception(error)
+
+ return access_key, secret_key
+
+
+def configure_rgw_credentials():
+ logger.info('Configuring dashboard RGW credentials')
+ user = 'dashboard'
+ realms = []
+ access_key = ''
+ secret_key = ''
+ try:
+ _, out, err = mgr.send_rgwadmin_command(['realm', 'list'])
+ if out:
+ realms = out.get('realms', [])
+ if err:
+ logger.error('Unable to list RGW realms: %s', err)
+ if realms:
+ realm_access_keys = {}
+ realm_secret_keys = {}
+ for realm in realms:
+ realm_access_key, realm_secret_key = _get_user_keys(user, realm)
+ if realm_access_key:
+ realm_access_keys[realm] = realm_access_key
+ realm_secret_keys[realm] = realm_secret_key
+ if realm_access_keys:
+ access_key = json.dumps(realm_access_keys)
+ secret_key = json.dumps(realm_secret_keys)
+ else:
+ access_key, secret_key = _get_user_keys(user)
+
+ assert access_key and secret_key
+ Settings.RGW_API_ACCESS_KEY = access_key
+ Settings.RGW_API_SECRET_KEY = secret_key
+ except (AssertionError, SubprocessError) as error:
+ logger.exception(error)
+ raise NoCredentialsException
+
+
+class RgwClient(RestClient):
+ _host = None
+ _port = None
+ _ssl = None
+ _user_instances = {} # type: Dict[str, Dict[str, RgwClient]]
+ _config_instances = {} # type: Dict[str, RgwClient]
+ _rgw_settings_snapshot = None
+ _daemons: Dict[str, RgwDaemon] = {}
+ daemon: RgwDaemon
+ got_keys_from_config: bool
+ userid: str
+
+ @staticmethod
+ def _handle_response_status_code(status_code: int) -> int:
+ # Do not return auth error codes (so they are not handled as ceph API user auth errors).
+ return 404 if status_code in [401, 403] else status_code
+
+ @staticmethod
+ def _get_daemon_connection_info(daemon_name: str) -> dict:
+ try:
+ realm_name = RgwClient._daemons[daemon_name].realm_name
+ access_key = Settings.RGW_API_ACCESS_KEY[realm_name]
+ secret_key = Settings.RGW_API_SECRET_KEY[realm_name]
+ except TypeError:
+ # Legacy string values.
+ access_key = Settings.RGW_API_ACCESS_KEY
+ secret_key = Settings.RGW_API_SECRET_KEY
+ except KeyError as error:
+ raise DashboardException(msg='Credentials not found for RGW Daemon: {}'.format(error),
+ http_status_code=404,
+ component='rgw')
+
+ return {'access_key': access_key, 'secret_key': secret_key}
+
+ def _get_daemon_zone_info(self): # type: () -> dict
+ return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None))
+
+ def _get_realms_info(self): # type: () -> dict
+ return json_str_to_object(self.proxy('GET', 'realm?list', None, None))
+
+ def _get_realm_info(self, realm_id: str) -> Dict[str, Any]:
+ return json_str_to_object(self.proxy('GET', f'realm?id={realm_id}', None, None))
+
+ @staticmethod
+ def _rgw_settings():
+ return (Settings.RGW_API_ACCESS_KEY,
+ Settings.RGW_API_SECRET_KEY,
+ Settings.RGW_API_ADMIN_RESOURCE,
+ Settings.RGW_API_SSL_VERIFY)
+
+ @staticmethod
+ def instance(userid: Optional[str] = None,
+ daemon_name: Optional[str] = None) -> 'RgwClient':
+ # pylint: disable=too-many-branches
+
+ RgwClient._daemons = _get_daemons()
+
+ # The API access key and secret key are mandatory for a minimal configuration.
+ if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY):
+ configure_rgw_credentials()
+
+ if not daemon_name:
+ # Select 1st daemon:
+ daemon_name = next(iter(RgwClient._daemons.keys()))
+
+ # Discard all cached instances if any rgw setting has changed
+ if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings():
+ RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings()
+ RgwClient.drop_instance()
+
+ if daemon_name not in RgwClient._config_instances:
+ connection_info = RgwClient._get_daemon_connection_info(daemon_name)
+ RgwClient._config_instances[daemon_name] = RgwClient(connection_info['access_key'],
+ connection_info['secret_key'],
+ daemon_name)
+
+ if not userid or userid == RgwClient._config_instances[daemon_name].userid:
+ return RgwClient._config_instances[daemon_name]
+
+ if daemon_name not in RgwClient._user_instances \
+ or userid not in RgwClient._user_instances[daemon_name]:
+ # Get the access and secret keys for the specified user.
+ keys = RgwClient._config_instances[daemon_name].get_user_keys(userid)
+ if not keys:
+ raise RequestException(
+ "User '{}' does not have any keys configured.".format(
+ userid))
+ instance = RgwClient(keys['access_key'],
+ keys['secret_key'],
+ daemon_name,
+ userid)
+ RgwClient._user_instances.update({daemon_name: {userid: instance}})
+
+ return RgwClient._user_instances[daemon_name][userid]
+
+ @staticmethod
+ def admin_instance(daemon_name: Optional[str] = None) -> 'RgwClient':
+ return RgwClient.instance(daemon_name=daemon_name)
+
+ @staticmethod
+ def drop_instance(instance: Optional['RgwClient'] = None):
+ """
+ Drop a cached instance or all.
+ """
+ if instance:
+ if instance.got_keys_from_config:
+ del RgwClient._config_instances[instance.daemon.name]
+ else:
+ del RgwClient._user_instances[instance.daemon.name][instance.userid]
+ else:
+ RgwClient._config_instances.clear()
+ RgwClient._user_instances.clear()
+
+ def _reset_login(self):
+ if self.got_keys_from_config:
+ raise RequestException('Authentication failed for the "{}" user: wrong credentials'
+ .format(self.userid), status_code=401)
+ logger.info("Fetching new keys for user: %s", self.userid)
+ keys = RgwClient.admin_instance(daemon_name=self.daemon.name).get_user_keys(self.userid)
+ self.auth = S3Auth(keys['access_key'], keys['secret_key'],
+ service_url=self.service_url)
+
+ def __init__(self,
+ access_key: str,
+ secret_key: str,
+ daemon_name: str,
+ user_id: Optional[str] = None) -> None:
+ try:
+ daemon = RgwClient._daemons[daemon_name]
+ except KeyError as error:
+ raise DashboardException(msg='RGW Daemon not found: {}'.format(error),
+ http_status_code=404,
+ component='rgw')
+ ssl_verify = Settings.RGW_API_SSL_VERIFY
+ self.admin_path = Settings.RGW_API_ADMIN_RESOURCE
+ self.service_url = build_url(host=daemon.host, port=daemon.port)
+
+ self.auth = S3Auth(access_key, secret_key, service_url=self.service_url)
+ super(RgwClient, self).__init__(daemon.host,
+ daemon.port,
+ 'RGW',
+ daemon.ssl,
+ self.auth,
+ ssl_verify=ssl_verify)
+ self.got_keys_from_config = not user_id
+ try:
+ self.userid = self._get_user_id(self.admin_path) if self.got_keys_from_config \
+ else user_id
+ except RequestException as error:
+ logger.exception(error)
+ msg = 'Error connecting to Object Gateway'
+ if error.status_code == 404:
+ msg = '{}: {}'.format(msg, str(error))
+ raise DashboardException(msg=msg,
+ http_status_code=error.status_code,
+ component='rgw')
+ self.daemon = daemon
+
+ logger.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
+ daemon.name, daemon.host, daemon.port, daemon.ssl, ssl_verify)
+
+ @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)')
+ def is_service_online(self, request=None) -> bool:
+ """
+ Consider the service as online if the response contains the
+ specified keys. Nothing more is checked here.
+ """
+ _ = request({'format': 'json'})
+ return True
+
+ @RestClient.api_get('/{admin_path}/metadata/user?myself',
+ resp_structure='data > user_id')
+ def _get_user_id(self, admin_path, request=None):
+ # pylint: disable=unused-argument
+ """
+ Get the user ID of the user that is used to communicate with the
+ RGW Admin Ops API.
+ :rtype: str
+ :return: The user ID of the user that is used to sign the
+ RGW Admin Ops API calls.
+ """
+ response = request()
+ return response['data']['user_id']
+
+ @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]')
+ def _user_exists(self, admin_path, user_id, request=None):
+ # pylint: disable=unused-argument
+ response = request()
+ if user_id:
+ return user_id in response
+ return self.userid in response
+
+ def user_exists(self, user_id=None):
+ return self._user_exists(self.admin_path, user_id)
+
+ @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
+ resp_structure='data > system')
+ def _is_system_user(self, admin_path, userid, request=None) -> bool:
+ # pylint: disable=unused-argument
+ response = request()
+ return strtobool(response['data']['system'])
+
+ def is_system_user(self) -> bool:
+ return self._is_system_user(self.admin_path, self.userid)
+
+ @RestClient.api_get(
+ '/{admin_path}/user',
+ resp_structure='tenant & user_id & email & keys[*] > '
+ ' (user & access_key & secret_key)')
+ def _admin_get_user_keys(self, admin_path, userid, request=None):
+ # pylint: disable=unused-argument
+ colon_idx = userid.find(':')
+ user = userid if colon_idx == -1 else userid[:colon_idx]
+ response = request({'uid': user})
+ for key in response['keys']:
+ if key['user'] == userid:
+ return {
+ 'access_key': key['access_key'],
+ 'secret_key': key['secret_key']
+ }
+ return None
+
+ def get_user_keys(self, userid):
+ return self._admin_get_user_keys(self.admin_path, userid)
+
+ @RestClient.api('/{admin_path}/{path}')
+ def _proxy_request(
+ self, # pylint: disable=too-many-arguments
+ admin_path,
+ path,
+ method,
+ params,
+ data,
+ request=None):
+ # pylint: disable=unused-argument
+ return request(method=method, params=params, data=data,
+ raw_content=True)
+
+ def proxy(self, method, path, params, data):
+ logger.debug("proxying method=%s path=%s params=%s data=%s",
+ method, path, params, data)
+ return self._proxy_request(self.admin_path, path, method,
+ params, data)
+
+ @RestClient.api_get('/', resp_structure='[1][*] > Name')
+ def get_buckets(self, request=None):
+ """
+ Get a list of names from all existing buckets of this user.
+ :return: Returns a list of bucket names.
+ """
+ response = request({'format': 'json'})
+ return [bucket['Name'] for bucket in response[1]]
+
+ @RestClient.api_get('/{bucket_name}')
+ def bucket_exists(self, bucket_name, userid, request=None):
+ """
+ Check if the specified bucket exists for this user.
+ :param bucket_name: The name of the bucket.
+ :return: Returns True if the bucket exists, otherwise False.
+ """
+ # pylint: disable=unused-argument
+ try:
+ request()
+ my_buckets = self.get_buckets()
+ if bucket_name not in my_buckets:
+ raise RequestException(
+ 'Bucket "{}" belongs to other user'.format(bucket_name),
+ 403)
+ return True
+ except RequestException as e:
+ if e.status_code == 404:
+ return False
+
+ raise e
+
+ @RestClient.api_put('/{bucket_name}')
+ def create_bucket(self, bucket_name, zonegroup=None,
+ placement_target=None, lock_enabled=False,
+ request=None):
+ logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
+ bucket_name, zonegroup, placement_target)
+ data = None
+ if zonegroup and placement_target:
+ create_bucket_configuration = ET.Element('CreateBucketConfiguration')
+ location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint')
+ location_constraint.text = '{}:{}'.format(zonegroup, placement_target)
+ data = ET.tostring(create_bucket_configuration, encoding='unicode')
+
+ headers = None # type: Optional[dict]
+ if lock_enabled:
+ headers = {'x-amz-bucket-object-lock-enabled': 'true'}
+
+ return request(data=data, headers=headers)
+
+ def get_placement_targets(self): # type: () -> dict
+ zone = self._get_daemon_zone_info()
+ placement_targets = [] # type: List[Dict]
+ for placement_pool in zone['placement_pools']:
+ placement_targets.append(
+ {
+ 'name': placement_pool['key'],
+ 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool']
+ }
+ )
+
+ return {'zonegroup': self.daemon.zonegroup_name,
+ 'placement_targets': placement_targets}
+
+ def get_realms(self): # type: () -> List
+ realms_info = self._get_realms_info()
+ if 'realms' in realms_info and realms_info['realms']:
+ return realms_info['realms']
+
+ return []
+
+ def get_default_realm(self) -> str:
+ realms_info = self._get_realms_info()
+ if 'default_info' in realms_info and realms_info['default_info']:
+ realm_info = self._get_realm_info(realms_info['default_info'])
+ if 'name' in realm_info and realm_info['name']:
+ return realm_info['name']
+ raise DashboardException(msg='Default realm not found.',
+ http_status_code=404,
+ component='rgw')
+
+ @RestClient.api_get('/{bucket_name}?versioning')
+ def get_bucket_versioning(self, bucket_name, request=None):
+ """
+ Get bucket versioning.
+ :param str bucket_name: the name of the bucket.
+ :return: versioning info
+ :rtype: Dict
+ """
+ # pylint: disable=unused-argument
+ result = request()
+ if 'Status' not in result:
+ result['Status'] = 'Suspended'
+ if 'MfaDelete' not in result:
+ result['MfaDelete'] = 'Disabled'
+ return result
+
+ @RestClient.api_put('/{bucket_name}?versioning')
+ def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete,
+ mfa_token_serial, mfa_token_pin, request=None):
+ """
+ Set bucket versioning.
+ :param str bucket_name: the name of the bucket.
+ :param str versioning_state:
+ https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
+ :param str mfa_delete: MFA Delete state.
+ :param str mfa_token_serial:
+ https://docs.ceph.com/docs/master/radosgw/mfa/
+ :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
+ :return: None
+ """
+ # pylint: disable=unused-argument
+ versioning_configuration = ET.Element('VersioningConfiguration')
+ status_element = ET.SubElement(versioning_configuration, 'Status')
+ status_element.text = versioning_state
+
+ headers = {}
+ if mfa_delete and mfa_token_serial and mfa_token_pin:
+ headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin)
+ mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete')
+ mfa_delete_element.text = mfa_delete
+
+ data = ET.tostring(versioning_configuration, encoding='unicode')
+
+ try:
+ request(data=data, headers=headers)
+ except RequestException as error:
+ msg = str(error)
+ if mfa_delete and mfa_token_serial and mfa_token_pin \
+ and 'AccessDenied' in error.content.decode():
+ msg = 'Bad MFA credentials: {}'.format(msg)
+ raise DashboardException(msg=msg,
+ http_status_code=error.status_code,
+ component='rgw')
+
+ @RestClient.api_get('/{bucket_name}?object-lock')
+ def get_bucket_locking(self, bucket_name, request=None):
+ # type: (str, Optional[object]) -> dict
+ """
+ Gets the locking configuration for a bucket. The locking
+ configuration will be applied by default to every new object
+ placed in the specified bucket.
+ :param bucket_name: The name of the bucket.
+ :type bucket_name: str
+ :return: The locking configuration.
+ :rtype: Dict
+ """
+ # pylint: disable=unused-argument
+
+ # Try to get the Object Lock configuration. If there is none,
+ # then return default values.
+ try:
+ result = request() # type: ignore
+ return {
+ 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled',
+ 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'),
+ 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0),
+ 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0)
+ }
+ except RequestException as e:
+ if e.content:
+ content = json_str_to_object(e.content)
+ if content.get(
+ 'Code') == 'ObjectLockConfigurationNotFoundError':
+ return {
+ 'lock_enabled': False,
+ 'lock_mode': 'compliance',
+ 'lock_retention_period_days': None,
+ 'lock_retention_period_years': None
+ }
+ raise e
+
+ @RestClient.api_put('/{bucket_name}?object-lock')
+ def set_bucket_locking(self,
+ bucket_name: str,
+ mode: str,
+ retention_period_days: Optional[Union[int, str]] = None,
+ retention_period_years: Optional[Union[int, str]] = None,
+ request: Optional[object] = None) -> None:
+ """
+ Places the locking configuration on the specified bucket. The
+ locking configuration will be applied by default to every new
+ object placed in the specified bucket.
+ :param bucket_name: The name of the bucket.
+ :type bucket_name: str
+ :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
+ :type mode: str
+ :param retention_period_days:
+ :type retention_period_days: int
+ :param retention_period_years:
+ :type retention_period_years: int
+ :rtype: None
+ """
+ # pylint: disable=unused-argument
+
+ # Do some validations.
+ try:
+ retention_period_days = int(retention_period_days) if retention_period_days else 0
+ retention_period_years = int(retention_period_years) if retention_period_years else 0
+ if retention_period_days < 0 or retention_period_years < 0:
+ raise ValueError
+ except (TypeError, ValueError):
+ msg = "Retention period must be a positive integer."
+ raise DashboardException(msg=msg, component='rgw')
+ if retention_period_days and retention_period_years:
+ # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
+ msg = "Retention period requires either Days or Years. "\
+ "You can't specify both at the same time."
+ raise DashboardException(msg=msg, component='rgw')
+ if not retention_period_days and not retention_period_years:
+ msg = "Retention period requires either Days or Years. "\
+ "You must specify at least one."
+ raise DashboardException(msg=msg, component='rgw')
+ if not isinstance(mode, str) or mode.upper() not in ['COMPLIANCE', 'GOVERNANCE']:
+ msg = "Retention mode must be either COMPLIANCE or GOVERNANCE."
+ raise DashboardException(msg=msg, component='rgw')
+
+ # Generate the XML data like this:
+ # <ObjectLockConfiguration>
+ # <ObjectLockEnabled>string</ObjectLockEnabled>
+ # <Rule>
+ # <DefaultRetention>
+ # <Days>integer</Days>
+ # <Mode>string</Mode>
+ # <Years>integer</Years>
+ # </DefaultRetention>
+ # </Rule>
+ # </ObjectLockConfiguration>
+ locking_configuration = ET.Element('ObjectLockConfiguration')
+ enabled_element = ET.SubElement(locking_configuration,
+ 'ObjectLockEnabled')
+ enabled_element.text = 'Enabled' # Locking can't be disabled.
+ rule_element = ET.SubElement(locking_configuration, 'Rule')
+ default_retention_element = ET.SubElement(rule_element,
+ 'DefaultRetention')
+ mode_element = ET.SubElement(default_retention_element, 'Mode')
+ mode_element.text = mode.upper()
+ if retention_period_days:
+ days_element = ET.SubElement(default_retention_element, 'Days')
+ days_element.text = str(retention_period_days)
+ if retention_period_years:
+ years_element = ET.SubElement(default_retention_element, 'Years')
+ years_element.text = str(retention_period_years)
+
+ data = ET.tostring(locking_configuration, encoding='unicode')
+
+ try:
+ _ = request(data=data) # type: ignore
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')