# -*- coding: utf-8 -*- from __future__ import absolute_import import json import logging import re from functools import partial from typing import NamedTuple, Optional import cherrypy import rbd from .. import mgr from ..controllers.pool import RBDPool from ..controllers.service import Service from ..security import Scope from ..services.ceph_service import CephService from ..services.exception import handle_rados_error, handle_rbd_error, serialize_dashboard_exception from ..services.orchestrator import OrchClient from ..services.rbd import rbd_call from ..tools import ViewCache from . import APIDoc, APIRouter, BaseController, CreatePermission, Endpoint, \ EndpointDoc, ReadPermission, RESTController, Task, UIRouter, \ UpdatePermission, allow_empty_body try: from typing import no_type_check except ImportError: no_type_check = object() # Just for type checking logger = logging.getLogger('controllers.rbd_mirror') # pylint: disable=not-callable def handle_rbd_mirror_error(): def composed_decorator(func): func = handle_rados_error('rbd-mirroring')(func) return handle_rbd_error()(func) return composed_decorator # pylint: disable=not-callable def RbdMirroringTask(name, metadata, wait_for): # noqa: N802 def composed_decorator(func): func = handle_rbd_mirror_error()(func) return Task("rbd/mirroring/{}".format(name), metadata, wait_for, partial(serialize_dashboard_exception, include_http_status=True))(func) return composed_decorator @ViewCache() def get_daemons_and_pools(): # pylint: disable=R0915 def get_daemons(): daemons = [] for hostname, server in CephService.get_service_map('rbd-mirror').items(): for service in server['services']: id = service['id'] # pylint: disable=W0622 metadata = service['metadata'] status = service['status'] or {} try: status = json.loads(status['json']) except (ValueError, KeyError): status = {} instance_id = metadata['instance_id'] if id == instance_id: # new version that supports per-cluster leader elections id = metadata['id'] # extract per-daemon service data and health daemon = { 'id': id, 'instance_id': instance_id, 'version': metadata['ceph_version'], 'server_hostname': hostname, 'service': service, 'server': server, 'metadata': metadata, 'status': status } daemon = dict(daemon, **get_daemon_health(daemon)) daemons.append(daemon) return sorted(daemons, key=lambda k: k['instance_id']) def get_daemon_health(daemon): health = { 'health_color': 'info', 'health': 'Unknown' } for _, pool_data in daemon['status'].items(): if (health['health'] != 'error' and [k for k, v in pool_data.get('callouts', {}).items() if v['level'] == 'error']): health = { 'health_color': 'error', 'health': 'Error' } elif (health['health'] != 'error' and [k for k, v in pool_data.get('callouts', {}).items() if v['level'] == 'warning']): health = { 'health_color': 'warning', 'health': 'Warning' } elif health['health_color'] == 'info': health = { 'health_color': 'success', 'health': 'OK' } return health def get_pools(daemons): # pylint: disable=R0912, R0915 pool_names = [pool['pool_name'] for pool in CephService.get_pool_list('rbd') if pool.get('type', 1) == 1] pool_stats = {} rbdctx = rbd.RBD() for pool_name in pool_names: logger.debug("Constructing IOCtx %s", pool_name) try: ioctx = mgr.rados.open_ioctx(pool_name) except TypeError: logger.exception("Failed to open pool %s", pool_name) continue try: mirror_mode = rbdctx.mirror_mode_get(ioctx) peer_uuids = [x['uuid'] for x in rbdctx.mirror_peer_list(ioctx)] except: # noqa pylint: disable=W0702 logger.exception("Failed to query mirror settings %s", pool_name) mirror_mode = None peer_uuids = [] stats = {} if mirror_mode == rbd.RBD_MIRROR_MODE_DISABLED: mirror_mode = "disabled" stats['health_color'] = "info" stats['health'] = "Disabled" elif mirror_mode == rbd.RBD_MIRROR_MODE_IMAGE: mirror_mode = "image" elif mirror_mode == rbd.RBD_MIRROR_MODE_POOL: mirror_mode = "pool" else: mirror_mode = "unknown" stats['health_color'] = "warning" stats['health'] = "Warning" pool_stats[pool_name] = dict(stats, **{ 'mirror_mode': mirror_mode, 'peer_uuids': peer_uuids }) for daemon in daemons: for _, pool_data in daemon['status'].items(): stats = pool_stats.get(pool_data['name'], None) # type: ignore if stats is None: continue if pool_data.get('leader', False): # leader instance stores image counts stats['leader_id'] = daemon['metadata']['instance_id'] stats['image_local_count'] = pool_data.get('image_local_count', 0) stats['image_remote_count'] = pool_data.get('image_remote_count', 0) if (stats.get('health_color', '') != 'error' and pool_data.get('image_error_count', 0) > 0): stats['health_color'] = 'error' stats['health'] = 'Error' elif (stats.get('health_color', '') != 'error' and pool_data.get('image_warning_count', 0) > 0): stats['health_color'] = 'warning' stats['health'] = 'Warning' elif stats.get('health', None) is None: stats['health_color'] = 'success' stats['health'] = 'OK' for _, stats in pool_stats.items(): if stats['mirror_mode'] == 'disabled': continue if stats.get('health', None) is None: # daemon doesn't know about pool stats['health_color'] = 'error' stats['health'] = 'Error' elif stats.get('leader_id', None) is None: # no daemons are managing the pool as leader instance stats['health_color'] = 'warning' stats['health'] = 'Warning' return pool_stats daemons = get_daemons() return { 'daemons': daemons, 'pools': get_pools(daemons) } class ReplayingData(NamedTuple): bytes_per_second: Optional[int] = None seconds_until_synced: Optional[int] = None syncing_percent: Optional[float] = None entries_behind_primary: Optional[int] = None @ViewCache() @no_type_check def _get_pool_datum(pool_name): data = {} logger.debug("Constructing IOCtx %s", pool_name) try: ioctx = mgr.rados.open_ioctx(pool_name) except TypeError: logger.exception("Failed to open pool %s", pool_name) return None mirror_state = { 'down': { 'health': 'issue', 'state_color': 'warning', 'state': 'Unknown', 'description': None }, rbd.MIRROR_IMAGE_STATUS_STATE_UNKNOWN: { 'health': 'issue', 'state_color': 'warning', 'state': 'Unknown' }, rbd.MIRROR_IMAGE_STATUS_STATE_ERROR: { 'health': 'issue', 'state_color': 'error', 'state': 'Error' }, rbd.MIRROR_IMAGE_STATUS_STATE_SYNCING: { 'health': 'syncing', 'state_color': 'success', 'state': 'Syncing' }, rbd.MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY: { 'health': 'syncing', 'state_color': 'success', 'state': 'Starting' }, rbd.MIRROR_IMAGE_STATUS_STATE_REPLAYING: { 'health': 'syncing', 'state_color': 'success', 'state': 'Replaying' }, rbd.MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY: { 'health': 'ok', 'state_color': 'success', 'state': 'Stopping' }, rbd.MIRROR_IMAGE_STATUS_STATE_STOPPED: { 'health': 'ok', 'state_color': 'info', 'state': 'Stopped' } } rbdctx = rbd.RBD() try: mirror_image_status = rbdctx.mirror_image_status_list(ioctx) data['mirror_images'] = sorted([ dict({ 'name': image['name'], 'description': image['description'] }, **mirror_state['down' if not image['up'] else image['state']]) for image in mirror_image_status ], key=lambda k: k['name']) except rbd.ImageNotFound: pass except: # noqa pylint: disable=W0702 logger.exception("Failed to list mirror image status %s", pool_name) raise return data def _update_syncing_image_data(mirror_image, image): if mirror_image['state'] == 'Replaying': p = re.compile("replaying, ({.*})") replaying_data = p.findall(mirror_image['description']) assert len(replaying_data) == 1 replaying_data = json.loads(replaying_data[0]) if 'replay_state' in replaying_data and replaying_data['replay_state'] == 'idle': image.update({ 'state_color': 'info', 'state': 'Idle' }) for field in ReplayingData._fields: try: image[field] = replaying_data[field] except KeyError: pass else: p = re.compile("bootstrapping, IMAGE_COPY/COPY_OBJECT (.*)%") image.update({ 'progress': (p.findall(mirror_image['description']) or [0])[0] }) @ViewCache() def _get_content_data(): # pylint: disable=R0914 pool_names = [pool['pool_name'] for pool in CephService.get_pool_list('rbd') if pool.get('type', 1) == 1] _, data = get_daemons_and_pools() daemons = data.get('daemons', []) pool_stats = data.get('pools', {}) pools = [] image_error = [] image_syncing = [] image_ready = [] for pool_name in pool_names: _, pool = _get_pool_datum(pool_name) if not pool: pool = {} stats = pool_stats.get(pool_name, {}) if stats.get('mirror_mode', None) is None: continue mirror_images = pool.get('mirror_images', []) for mirror_image in mirror_images: image = { 'pool_name': pool_name, 'name': mirror_image['name'], 'state_color': mirror_image['state_color'], 'state': mirror_image['state'] } if mirror_image['health'] == 'ok': image.update({ 'description': mirror_image['description'] }) image_ready.append(image) elif mirror_image['health'] == 'syncing': _update_syncing_image_data(mirror_image, image) image_syncing.append(image) else: image.update({ 'description': mirror_image['description'] }) image_error.append(image) pools.append(dict({ 'name': pool_name }, **stats)) return { 'daemons': daemons, 'pools': pools, 'image_error': image_error, 'image_syncing': image_syncing, 'image_ready': image_ready } def _reset_view_cache(): get_daemons_and_pools.reset() _get_pool_datum.reset() _get_content_data.reset() RBD_MIRROR_SCHEMA = { "site_name": (str, "Site Name") } RBDM_POOL_SCHEMA = { "mirror_mode": (str, "Mirror Mode") } RBDM_SUMMARY_SCHEMA = { "site_name": (str, "site name"), "status": (int, ""), "content_data": ({ "daemons": ([str], ""), "pools": ([{ "name": (str, "Pool name"), "health_color": (str, ""), "health": (str, "pool health"), "mirror_mode": (str, "status"), "peer_uuids": ([str], "") }], "Pools"), "image_error": ([str], ""), "image_syncing": ([str], ""), "image_ready": ([str], "") }, "") } @APIRouter('/block/mirroring', Scope.RBD_MIRRORING) @APIDoc("RBD Mirroring Management API", "RbdMirroring") class RbdMirroring(BaseController): @Endpoint(method='GET', path='site_name') @handle_rbd_mirror_error() @ReadPermission @EndpointDoc("Display Rbd Mirroring sitename", responses={200: RBD_MIRROR_SCHEMA}) def get(self): return self._get_site_name() @Endpoint(method='PUT', path='site_name') @handle_rbd_mirror_error() @UpdatePermission def set(self, site_name): rbd.RBD().mirror_site_name_set(mgr.rados, site_name) return self._get_site_name() def _get_site_name(self): return {'site_name': rbd.RBD().mirror_site_name_get(mgr.rados)} @APIRouter('/block/mirroring/summary', Scope.RBD_MIRRORING) @APIDoc("RBD Mirroring Summary Management API", "RbdMirroringSummary") class RbdMirroringSummary(BaseController): @Endpoint() @handle_rbd_mirror_error() @ReadPermission @EndpointDoc("Display Rbd Mirroring Summary", responses={200: RBDM_SUMMARY_SCHEMA}) def __call__(self): site_name = rbd.RBD().mirror_site_name_get(mgr.rados) status, content_data = _get_content_data() return {'site_name': site_name, 'status': status, 'content_data': content_data} @APIRouter('/block/mirroring/pool', Scope.RBD_MIRRORING) @APIDoc("RBD Mirroring Pool Mode Management API", "RbdMirroringPoolMode") class RbdMirroringPoolMode(RESTController): RESOURCE_ID = "pool_name" MIRROR_MODES = { rbd.RBD_MIRROR_MODE_DISABLED: 'disabled', rbd.RBD_MIRROR_MODE_IMAGE: 'image', rbd.RBD_MIRROR_MODE_POOL: 'pool' } @handle_rbd_mirror_error() @EndpointDoc("Display Rbd Mirroring Summary", parameters={ 'pool_name': (str, 'Pool Name'), }, responses={200: RBDM_POOL_SCHEMA}) def get(self, pool_name): ioctx = mgr.rados.open_ioctx(pool_name) mode = rbd.RBD().mirror_mode_get(ioctx) data = { 'mirror_mode': self.MIRROR_MODES.get(mode, 'unknown') } return data @RbdMirroringTask('pool/edit', {'pool_name': '{pool_name}'}, 5.0) def set(self, pool_name, mirror_mode=None): def _edit(ioctx, mirror_mode=None): if mirror_mode: mode_enum = {x[1]: x[0] for x in self.MIRROR_MODES.items()}.get(mirror_mode, None) if mode_enum is None: raise rbd.Error('invalid mirror mode "{}"'.format(mirror_mode)) current_mode_enum = rbd.RBD().mirror_mode_get(ioctx) if mode_enum != current_mode_enum: rbd.RBD().mirror_mode_set(ioctx, mode_enum) _reset_view_cache() return rbd_call(pool_name, None, _edit, mirror_mode) @APIRouter('/block/mirroring/pool/{pool_name}/bootstrap', Scope.RBD_MIRRORING) @APIDoc("RBD Mirroring Pool Bootstrap Management API", "RbdMirroringPoolBootstrap") class RbdMirroringPoolBootstrap(BaseController): @Endpoint(method='POST', path='token') @handle_rbd_mirror_error() @UpdatePermission @allow_empty_body def create_token(self, pool_name): ioctx = mgr.rados.open_ioctx(pool_name) token = rbd.RBD().mirror_peer_bootstrap_create(ioctx) return {'token': token} @Endpoint(method='POST', path='peer') @handle_rbd_mirror_error() @UpdatePermission @allow_empty_body def import_token(self, pool_name, direction, token): ioctx = mgr.rados.open_ioctx(pool_name) directions = { 'rx': rbd.RBD_MIRROR_PEER_DIRECTION_RX, 'rx-tx': rbd.RBD_MIRROR_PEER_DIRECTION_RX_TX } direction_enum = directions.get(direction) if direction_enum is None: raise rbd.Error('invalid direction "{}"'.format(direction)) rbd.RBD().mirror_peer_bootstrap_import(ioctx, direction_enum, token) return {} @APIRouter('/block/mirroring/pool/{pool_name}/peer', Scope.RBD_MIRRORING) @APIDoc("RBD Mirroring Pool Peer Management API", "RbdMirroringPoolPeer") class RbdMirroringPoolPeer(RESTController): RESOURCE_ID = "peer_uuid" @handle_rbd_mirror_error() def list(self, pool_name): ioctx = mgr.rados.open_ioctx(pool_name) peer_list = rbd.RBD().mirror_peer_list(ioctx) return [x['uuid'] for x in peer_list] @handle_rbd_mirror_error() def create(self, pool_name, cluster_name, client_id, mon_host=None, key=None): ioctx = mgr.rados.open_ioctx(pool_name) mode = rbd.RBD().mirror_mode_get(ioctx) if mode == rbd.RBD_MIRROR_MODE_DISABLED: raise rbd.Error('mirroring must be enabled') uuid = rbd.RBD().mirror_peer_add(ioctx, cluster_name, 'client.{}'.format(client_id)) attributes = {} if mon_host is not None: attributes[rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_MON_HOST] = mon_host if key is not None: attributes[rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_KEY] = key if attributes: rbd.RBD().mirror_peer_set_attributes(ioctx, uuid, attributes) _reset_view_cache() return {'uuid': uuid} @handle_rbd_mirror_error() def get(self, pool_name, peer_uuid): ioctx = mgr.rados.open_ioctx(pool_name) peer_list = rbd.RBD().mirror_peer_list(ioctx) peer = next((x for x in peer_list if x['uuid'] == peer_uuid), None) if not peer: raise cherrypy.HTTPError(404) # convert full client name to just the client id peer['client_id'] = peer['client_name'].split('.', 1)[-1] del peer['client_name'] # convert direction enum to string directions = { rbd.RBD_MIRROR_PEER_DIRECTION_RX: 'rx', rbd.RBD_MIRROR_PEER_DIRECTION_TX: 'tx', rbd.RBD_MIRROR_PEER_DIRECTION_RX_TX: 'rx-tx' } peer['direction'] = directions[peer.get('direction', rbd.RBD_MIRROR_PEER_DIRECTION_RX)] try: attributes = rbd.RBD().mirror_peer_get_attributes(ioctx, peer_uuid) except rbd.ImageNotFound: attributes = {} peer['mon_host'] = attributes.get(rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_MON_HOST, '') peer['key'] = attributes.get(rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_KEY, '') return peer @handle_rbd_mirror_error() def delete(self, pool_name, peer_uuid): ioctx = mgr.rados.open_ioctx(pool_name) rbd.RBD().mirror_peer_remove(ioctx, peer_uuid) _reset_view_cache() @handle_rbd_mirror_error() def set(self, pool_name, peer_uuid, cluster_name=None, client_id=None, mon_host=None, key=None): ioctx = mgr.rados.open_ioctx(pool_name) if cluster_name: rbd.RBD().mirror_peer_set_cluster(ioctx, peer_uuid, cluster_name) if client_id: rbd.RBD().mirror_peer_set_client(ioctx, peer_uuid, 'client.{}'.format(client_id)) if mon_host is not None or key is not None: try: attributes = rbd.RBD().mirror_peer_get_attributes(ioctx, peer_uuid) except rbd.ImageNotFound: attributes = {} if mon_host is not None: attributes[rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_MON_HOST] = mon_host if key is not None: attributes[rbd.RBD_MIRROR_PEER_ATTRIBUTE_NAME_KEY] = key rbd.RBD().mirror_peer_set_attributes(ioctx, peer_uuid, attributes) _reset_view_cache() @UIRouter('/block/mirroring', Scope.RBD_MIRRORING) class RbdMirroringStatus(BaseController): @EndpointDoc('Display RBD Mirroring Status') @Endpoint() @ReadPermission def status(self): status = {'available': True, 'message': None} orch_status = OrchClient.instance().status() # if the orch is not available we can't create the service # using dashboard. if not orch_status['available']: return status if not CephService.get_service_list('rbd-mirror') or not CephService.get_pool_list('rbd'): status['available'] = False status['message'] = 'RBD mirroring is not configured' # type: ignore return status @Endpoint('POST') @EndpointDoc('Configure RBD Mirroring') @CreatePermission def configure(self): rbd_pool = RBDPool() service = Service() service_spec = { 'service_type': 'rbd-mirror', 'placement': {}, 'unmanaged': False } if not CephService.get_service_list('rbd-mirror'): service.create(service_spec, 'rbd-mirror') if not CephService.get_pool_list('rbd'): rbd_pool.create()