diff options
Diffstat (limited to 'src/pybind/mgr/rgw')
-rw-r--r-- | src/pybind/mgr/rgw/__init__.py | 2 | ||||
-rw-r--r-- | src/pybind/mgr/rgw/module.py | 383 |
2 files changed, 385 insertions, 0 deletions
diff --git a/src/pybind/mgr/rgw/__init__.py b/src/pybind/mgr/rgw/__init__.py new file mode 100644 index 000000000..ee85dc9d3 --- /dev/null +++ b/src/pybind/mgr/rgw/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .module import Module diff --git a/src/pybind/mgr/rgw/module.py b/src/pybind/mgr/rgw/module.py new file mode 100644 index 000000000..f48e2e09f --- /dev/null +++ b/src/pybind/mgr/rgw/module.py @@ -0,0 +1,383 @@ +import json +import threading +import yaml +import errno +import base64 +import functools +import sys + +from mgr_module import MgrModule, CLICommand, HandleCommandResult, Option +import orchestrator + +from ceph.deployment.service_spec import RGWSpec, PlacementSpec, SpecValidationError +from typing import Any, Optional, Sequence, Iterator, List, Callable, TypeVar, cast, Dict, Tuple, Union, TYPE_CHECKING + +from ceph.rgw.types import RGWAMException, RGWAMEnvMgr, RealmToken +from ceph.rgw.rgwam_core import EnvArgs, RGWAM +from orchestrator import OrchestratorClientMixin, OrchestratorError, DaemonDescription, OrchResult + + +FuncT = TypeVar('FuncT', bound=Callable[..., Any]) + +if TYPE_CHECKING: + # this uses a version check as opposed to a try/except because this + # form makes mypy happy and try/except doesn't. + if sys.version_info >= (3, 8): + from typing import Protocol + else: + from typing_extensions import Protocol + + class MgrModuleProtocol(Protocol): + def tool_exec(self, args: List[str]) -> Tuple[int, str, str]: + ... + + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: + ... + + def list_daemons(self, service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> OrchResult[List['DaemonDescription']]: + ... +else: + class MgrModuleProtocol: + pass + + +class RGWSpecParsingError(Exception): + pass + + +class OrchestratorAPI(OrchestratorClientMixin): + def __init__(self, mgr: MgrModule): + super(OrchestratorAPI, self).__init__() + self.set_mgr(mgr) + + def status(self) -> Dict[str, Union[str, bool]]: + try: + status, message, _module_details = super().available() + return dict(available=status, message=message) + except (RuntimeError, OrchestratorError, ImportError) as e: + return dict(available=False, message=f'Orchestrator is unavailable: {e}') + + +class RGWAMOrchMgr(RGWAMEnvMgr): + def __init__(self, mgr: MgrModuleProtocol): + self.mgr = mgr + + def tool_exec(self, prog: str, args: List[str]) -> Tuple[List[str], int, str, str]: + cmd = [prog] + args + rc, stdout, stderr = self.mgr.tool_exec(args=cmd) + return cmd, rc, stdout, stderr + + def apply_rgw(self, spec: RGWSpec) -> None: + completion = self.mgr.apply_rgw(spec) + orchestrator.raise_if_exception(completion) + + def list_daemons(self, service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = True) -> List['DaemonDescription']: + completion = self.mgr.list_daemons(service_name, + daemon_type, + daemon_id=daemon_id, + host=host, + refresh=refresh) + return orchestrator.raise_if_exception(completion) + + +def check_orchestrator(func: FuncT) -> FuncT: + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> HandleCommandResult: + available = self.api.status()['available'] + if available: + return func(self, *args, **kwargs) + else: + err_msg = "Cephadm is not available. Please enable cephadm by 'ceph mgr module enable cephadm'." + return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) + return cast(FuncT, wrapper) + + +class Module(orchestrator.OrchestratorClientMixin, MgrModule): + MODULE_OPTIONS: List[Option] = [] + + # These are "native" Ceph options that this module cares about. + NATIVE_OPTIONS: List[Option] = [] + + def __init__(self, *args: Any, **kwargs: Any): + self.inited = False + self.lock = threading.Lock() + super(Module, self).__init__(*args, **kwargs) + self.api = OrchestratorAPI(self) + + # ensure config options members are initialized; see config_notify() + self.config_notify() + + with self.lock: + self.inited = True + self.env = EnvArgs(RGWAMOrchMgr(self)) + + # set up some members to enable the serve() method and shutdown() + self.run = True + self.event = threading.Event() + + def config_notify(self) -> None: + """ + This method is called whenever one of our config options is changed. + """ + # This is some boilerplate that stores MODULE_OPTIONS in a class + # member, so that, for instance, the 'emphatic' option is always + # available as 'self.emphatic'. + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) + # Do the same for the native options. + for opt in self.NATIVE_OPTIONS: + setattr(self, + opt, # type: ignore + self.get_ceph_option(opt)) + self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore + + @CLICommand('rgw admin', perm='rw') + def _cmd_rgw_admin(self, params: Sequence[str]) -> HandleCommandResult: + """rgw admin""" + cmd, returncode, out, err = self.env.mgr.tool_exec('radosgw-admin', params or []) + + self.log.error('retcode=%d' % returncode) + self.log.error('out=%s' % out) + self.log.error('err=%s' % err) + + return HandleCommandResult(retval=returncode, stdout=out, stderr=err) + + @CLICommand('rgw realm bootstrap', perm='rw') + @check_orchestrator + def _cmd_rgw_realm_bootstrap(self, + realm_name: Optional[str] = None, + zonegroup_name: Optional[str] = None, + zone_name: Optional[str] = None, + port: Optional[int] = None, + placement: Optional[str] = None, + zone_endpoints: Optional[str] = None, + start_radosgw: Optional[bool] = True, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Bootstrap new rgw realm, zonegroup, and zone""" + + if inbuf: + try: + rgw_specs = self._parse_rgw_specs(inbuf) + except RGWSpecParsingError as e: + return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}') + elif (realm_name and zonegroup_name and zone_name): + placement_spec = PlacementSpec.from_string(placement) if placement else None + rgw_specs = [RGWSpec(rgw_realm=realm_name, + rgw_zonegroup=zonegroup_name, + rgw_zone=zone_name, + rgw_frontend_port=port, + placement=placement_spec, + zone_endpoints=zone_endpoints)] + else: + err_msg = 'Invalid arguments: either pass a spec with -i or provide the realm, zonegroup and zone.' + return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) + + try: + for spec in rgw_specs: + RGWAM(self.env).realm_bootstrap(spec, start_radosgw) + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + return HandleCommandResult(retval=0, stdout="Realm(s) created correctly. Please, use 'ceph rgw realm tokens' to get the token.", stderr='') + + def _parse_rgw_specs(self, inbuf: str) -> List[RGWSpec]: + """Parse RGW specs from a YAML file.""" + # YAML '---' document separator with no content generates + # None entries in the output. Let's skip them silently. + yaml_objs: Iterator = yaml.safe_load_all(inbuf) + specs = [o for o in yaml_objs if o is not None] + rgw_specs = [] + for spec in specs: + # A secondary zone spec normally contains only the zone and the reaml token + # since no rgw_realm is specified in this case we extract it from the token + if 'rgw_realm_token' in spec: + realm_token = RealmToken.from_base64_str(spec['rgw_realm_token']) + if realm_token is None: + raise RGWSpecParsingError(f"Invalid realm token: {spec['rgw_realm_token']}") + spec['rgw_realm'] = realm_token.realm_name + + try: + rgw_spec = RGWSpec.from_json(spec) + rgw_spec.validate() + except SpecValidationError as e: + raise RGWSpecParsingError(f'RGW Spec parsing/validation error: {e}') + else: + rgw_specs.append(rgw_spec) + + return rgw_specs + + @CLICommand('rgw realm zone-creds create', perm='rw') + def _cmd_rgw_realm_new_zone_creds(self, + realm_name: Optional[str] = None, + endpoints: Optional[str] = None, + sys_uid: Optional[str] = None) -> HandleCommandResult: + """Create credentials for new zone creation""" + + try: + retval, out, err = RGWAM(self.env).realm_new_zone_creds(realm_name, endpoints, sys_uid) + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + return HandleCommandResult(retval=retval, stdout=out, stderr=err) + + @CLICommand('rgw realm zone-creds remove', perm='rw') + def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None) -> HandleCommandResult: + """Create credentials for new zone creation""" + + try: + retval, out, err = RGWAM(self.env).realm_rm_zone_creds(realm_token) + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + return HandleCommandResult(retval=retval, stdout=out, stderr=err) + + @CLICommand('rgw realm tokens', perm='r') + def list_realm_tokens(self) -> HandleCommandResult: + try: + realms_info = self.get_realm_tokens() + except RGWAMException as e: + self.log.error(f'cmd run exception: ({e.retcode}) {e.message}') + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + return HandleCommandResult(retval=0, stdout=json.dumps(realms_info, indent=4), stderr='') + + def get_realm_tokens(self) -> List[Dict]: + realms_info = [] + for realm_info in RGWAM(self.env).get_realms_info(): + if not realm_info['master_zone_id']: + realms_info.append({'realm': realm_info['realm_name'], 'token': 'realm has no master zone'}) + elif not realm_info['endpoint']: + realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no endpoint'}) + elif not (realm_info['access_key'] and realm_info['secret']): + realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no access/secret keys'}) + else: + keys = ['realm_name', 'realm_id', 'endpoint', 'access_key', 'secret'] + realm_token = RealmToken(**{k: realm_info[k] for k in keys}) + realm_token_b = realm_token.to_json().encode('utf-8') + realm_token_s = base64.b64encode(realm_token_b).decode('utf-8') + realms_info.append({'realm': realm_info['realm_name'], 'token': realm_token_s}) + return realms_info + + @CLICommand('rgw zone modify', perm='rw') + def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]) -> HandleCommandResult: + try: + retval, out, err = RGWAM(self.env).zone_modify(realm_name, + zonegroup_name, + zone_name, + zone_endpoints, + realm_token) + return HandleCommandResult(retval, 'Zone updated successfully', '') + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + @CLICommand('rgw zone create', perm='rw') + @check_orchestrator + def _cmd_rgw_zone_create(self, + zone_name: Optional[str] = None, + realm_token: Optional[str] = None, + port: Optional[int] = None, + placement: Optional[str] = None, + start_radosgw: Optional[bool] = True, + zone_endpoints: Optional[str] = None, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Bootstrap new rgw zone that syncs with zone on another cluster in the same realm""" + + created_zones = self.rgw_zone_create(zone_name, realm_token, port, placement, + start_radosgw, zone_endpoints, inbuf) + + return HandleCommandResult(retval=0, stdout=f"Zones {', '.join(created_zones)} created successfully") + + def rgw_zone_create(self, + zone_name: Optional[str] = None, + realm_token: Optional[str] = None, + port: Optional[int] = None, + placement: Optional[Union[str, Dict[str, Any]]] = None, + start_radosgw: Optional[bool] = True, + zone_endpoints: Optional[str] = None, + inbuf: Optional[str] = None) -> Any: + + if inbuf: + try: + rgw_specs = self._parse_rgw_specs(inbuf) + except RGWSpecParsingError as e: + return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}') + elif (zone_name and realm_token): + token = RealmToken.from_base64_str(realm_token) + if isinstance(placement, dict): + placement_spec = PlacementSpec.from_json(placement) if placement else None + elif isinstance(placement, str): + placement_spec = PlacementSpec.from_string(placement) if placement else None + rgw_specs = [RGWSpec(rgw_realm=token.realm_name, + rgw_zone=zone_name, + rgw_realm_token=realm_token, + rgw_frontend_port=port, + placement=placement_spec, + zone_endpoints=zone_endpoints)] + else: + err_msg = 'Invalid arguments: either pass a spec with -i or provide the zone_name and realm_token.' + return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) + + try: + created_zones = [] + for rgw_spec in rgw_specs: + RGWAM(self.env).zone_create(rgw_spec, start_radosgw) + if rgw_spec.rgw_zone is not None: + created_zones.append(rgw_spec.rgw_zone) + return created_zones + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + return created_zones + + @CLICommand('rgw realm reconcile', perm='rw') + def _cmd_rgw_realm_reconcile(self, + realm_name: Optional[str] = None, + zonegroup_name: Optional[str] = None, + zone_name: Optional[str] = None, + update: Optional[bool] = False) -> HandleCommandResult: + """Bootstrap new rgw zone that syncs with existing zone""" + + try: + retval, out, err = RGWAM(self.env).realm_reconcile(realm_name, zonegroup_name, + zone_name, update) + except RGWAMException as e: + self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) + return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) + + return HandleCommandResult(retval=retval, stdout=out, stderr=err) + + def shutdown(self) -> None: + """ + This method is called by the mgr when the module needs to shut + down (i.e., when the serve() function needs to exit). + """ + self.log.info('Stopping') + self.run = False + self.event.set() + + def import_realm_token(self, + zone_name: Optional[str] = None, + realm_token: Optional[str] = None, + port: Optional[int] = None, + placement: Optional[dict] = None, + start_radosgw: Optional[bool] = True, + zone_endpoints: Optional[str] = None) -> None: + placement_spec = placement.get('placement') if placement else None + self.rgw_zone_create(zone_name, realm_token, port, placement_spec, start_radosgw, + zone_endpoints) |