import errno import json import rados import rbd import re import traceback from datetime import datetime from threading import Condition, Lock, Thread from .common import get_rbd_pools from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules def namespace_validator(ioctx): mode = rbd.RBD().mirror_mode_get(ioctx) if mode != rbd.RBD_MIRROR_MODE_IMAGE: raise ValueError("namespace {} is not in mirror image mode".format( ioctx.get_namespace())) def image_validator(image): mode = image.mirror_image_get_mode() if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: raise rbd.InvalidArgument("Invalid mirror image mode") class CreateSnapshotRequests: lock = Lock() condition = Condition(lock) def __init__(self, handler): self.handler = handler self.rados = handler.module.rados self.log = handler.log self.pending = set() self.queue = [] self.ioctxs = {} def __del__(self): self.wait_for_pending() def wait_for_pending(self): with self.lock: while self.pending: self.condition.wait() def add(self, pool_id, namespace, image_id): image_spec = (pool_id, namespace, image_id) self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format( pool_id, namespace, image_id)) max_concurrent = self.handler.module.get_localized_module_option( self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE) with self.lock: if image_spec in self.pending: self.log.info( "CreateSnapshotRequests.add: {}/{}/{}: {}".format( pool_id, namespace, image_id, "previous request is still in progress")) return self.pending.add(image_spec) if len(self.pending) > max_concurrent: self.queue.append(image_spec) return self.open_image(image_spec) def open_image(self, image_spec): pool_id, namespace, image_id = image_spec self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format( pool_id, namespace, image_id)) try: ioctx = self.get_ioctx(image_spec) def cb(comp, image): self.handle_open_image(image_spec, comp, image) rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id) except Exception as e: self.log.error( "exception when opening {}/{}/{}: {}".format( pool_id, namespace, image_id, e)) self.finish(image_spec) def handle_open_image(self, image_spec, comp, image): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format( pool_id, namespace, image_id, comp.get_return_value())) if comp.get_return_value() < 0: if comp.get_return_value() != -errno.ENOENT: self.log.error( "error when opening {}/{}/{}: {}".format( pool_id, namespace, image_id, comp.get_return_value())) self.finish(image_spec) return self.get_mirror_mode(image_spec, image) def get_mirror_mode(self, image_spec, image): pool_id, namespace, image_id = image_spec self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format( pool_id, namespace, image_id)) def cb(comp, mode): self.handle_get_mirror_mode(image_spec, image, comp, mode) try: image.aio_mirror_image_get_mode(cb) except Exception as e: self.log.error( "exception when getting mirror mode for {}/{}/{}: {}".format( pool_id, namespace, image_id, e)) self.close_image(image_spec, image) def handle_get_mirror_mode(self, image_spec, image, comp, mode): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format( pool_id, namespace, image_id, comp.get_return_value(), mode)) if comp.get_return_value() < 0: if comp.get_return_value() != -errno.ENOENT: self.log.error( "error when getting mirror mode for {}/{}/{}: {}".format( pool_id, namespace, image_id, comp.get_return_value())) self.close_image(image_spec, image) return if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: self.log.debug( "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format( pool_id, namespace, image_id, "snapshot mirroring is not enabled")) self.close_image(image_spec, image) return self.get_mirror_info(image_spec, image) def get_mirror_info(self, image_spec, image): pool_id, namespace, image_id = image_spec self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format( pool_id, namespace, image_id)) def cb(comp, info): self.handle_get_mirror_info(image_spec, image, comp, info) try: image.aio_mirror_image_get_info(cb) except Exception as e: self.log.error( "exception when getting mirror info for {}/{}/{}: {}".format( pool_id, namespace, image_id, e)) self.close_image(image_spec, image) def handle_get_mirror_info(self, image_spec, image, comp, info): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format( pool_id, namespace, image_id, comp.get_return_value(), info)) if comp.get_return_value() < 0: if comp.get_return_value() != -errno.ENOENT: self.log.error( "error when getting mirror info for {}/{}/{}: {}".format( pool_id, namespace, image_id, comp.get_return_value())) self.close_image(image_spec, image) return if not info['primary']: self.log.debug( "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format( pool_id, namespace, image_id, "is not primary")) self.close_image(image_spec, image) return self.create_snapshot(image_spec, image) def create_snapshot(self, image_spec, image): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format( pool_id, namespace, image_id)) def cb(comp, snap_id): self.handle_create_snapshot(image_spec, image, comp, snap_id) try: image.aio_mirror_image_create_snapshot(0, cb) except Exception as e: self.log.error( "exception when creating snapshot for {}/{}/{}: {}".format( pool_id, namespace, image_id, e)) self.close_image(image_spec, image) def handle_create_snapshot(self, image_spec, image, comp, snap_id): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format( pool_id, namespace, image_id, comp.get_return_value(), snap_id)) if comp.get_return_value() < 0 and \ comp.get_return_value() != -errno.ENOENT: self.log.error( "error when creating snapshot for {}/{}/{}: {}".format( pool_id, namespace, image_id, comp.get_return_value())) self.close_image(image_spec, image) def close_image(self, image_spec, image): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.close_image {}/{}/{}".format( pool_id, namespace, image_id)) def cb(comp): self.handle_close_image(image_spec, comp) try: image.aio_close(cb) except Exception as e: self.log.error( "exception when closing {}/{}/{}: {}".format( pool_id, namespace, image_id, e)) self.finish(image_spec) def handle_close_image(self, image_spec, comp): pool_id, namespace, image_id = image_spec self.log.debug( "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format( pool_id, namespace, image_id, comp.get_return_value())) if comp.get_return_value() < 0: self.log.error( "error when closing {}/{}/{}: {}".format( pool_id, namespace, image_id, comp.get_return_value())) self.finish(image_spec) def finish(self, image_spec): pool_id, namespace, image_id = image_spec self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format( pool_id, namespace, image_id)) self.put_ioctx(image_spec) with self.lock: self.pending.remove(image_spec) if not self.queue: return image_spec = self.queue.pop(0) self.open_image(image_spec) def get_ioctx(self, image_spec): pool_id, namespace, image_id = image_spec nspec = (pool_id, namespace) with self.lock: ioctx, images = self.ioctxs.get(nspec, (None, None)) if not ioctx: ioctx = self.rados.open_ioctx2(int(pool_id)) ioctx.set_namespace(namespace) images = set() self.ioctxs[nspec] = (ioctx, images) images.add(image_spec) return ioctx def put_ioctx(self, image_spec): pool_id, namespace, image_id = image_spec nspec = (pool_id, namespace) with self.lock: ioctx, images = self.ioctxs[nspec] images.remove(image_spec) if not images: del self.ioctxs[nspec] class MirrorSnapshotScheduleHandler: MODULE_OPTION_NAME = "mirror_snapshot_schedule" MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create" SCHEDULE_OID = "rbd_mirror_snapshot_schedule" REFRESH_DELAY_SECONDS = 60.0 lock = Lock() condition = Condition(lock) thread = None def __init__(self, module): self.module = module self.log = module.log self.last_refresh_images = datetime(1970, 1, 1) self.create_snapshot_requests = CreateSnapshotRequests(self) self.init_schedule_queue() self.thread = Thread(target=self.run) self.thread.start() def _cleanup(self): self.create_snapshot_requests.wait_for_pending() def run(self): try: self.log.info("MirrorSnapshotScheduleHandler: starting") while True: refresh_delay = self.refresh_images() with self.lock: (image_spec, wait_time) = self.dequeue() if not image_spec: self.condition.wait(min(wait_time, refresh_delay)) continue pool_id, namespace, image_id = image_spec self.create_snapshot_requests.add(pool_id, namespace, image_id) with self.lock: self.enqueue(datetime.now(), pool_id, namespace, image_id) except Exception as ex: self.log.fatal("Fatal runtime error: {}\n{}".format( ex, traceback.format_exc())) def init_schedule_queue(self): self.queue = {} self.images = {} self.refresh_images() self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized") def load_schedules(self): self.log.info("MirrorSnapshotScheduleHandler: load_schedules") schedules = Schedules(self) schedules.load(namespace_validator, image_validator) self.schedules = schedules def refresh_images(self): elapsed = (datetime.now() - self.last_refresh_images).total_seconds() if elapsed < self.REFRESH_DELAY_SECONDS: return self.REFRESH_DELAY_SECONDS - elapsed self.log.debug("MirrorSnapshotScheduleHandler: refresh_images") with self.lock: self.load_schedules() if not self.schedules: self.log.debug("MirrorSnapshotScheduleHandler: no schedules") self.images = {} self.queue = {} self.last_refresh_images = datetime.now() return self.REFRESH_DELAY_SECONDS images = {} for pool_id, pool_name in get_rbd_pools(self.module).items(): if not self.schedules.intersects( LevelSpec.from_pool_spec(pool_id, pool_name)): continue with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: self.load_pool_images(ioctx, images) with self.lock: self.refresh_queue(images) self.images = images self.last_refresh_images = datetime.now() return self.REFRESH_DELAY_SECONDS def load_pool_images(self, ioctx, images): pool_id = str(ioctx.get_pool_id()) pool_name = ioctx.get_pool_name() images[pool_id] = {} self.log.debug("load_pool_images: pool={}".format(pool_name)) try: namespaces = [''] + rbd.RBD().namespace_list(ioctx) for namespace in namespaces: if not self.schedules.intersects( LevelSpec.from_pool_spec(pool_id, pool_name, namespace)): continue self.log.debug("load_pool_images: pool={}, namespace={}".format( pool_name, namespace)) images[pool_id][namespace] = {} ioctx.set_namespace(namespace) mirror_images = dict(rbd.RBD().mirror_image_info_list( ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) if not mirror_images: continue image_names = dict( [(x['id'], x['name']) for x in filter( lambda x: x['id'] in mirror_images, rbd.RBD().list2(ioctx))]) for image_id, info in mirror_images.items(): if not info['primary']: continue image_name = image_names.get(image_id) if not image_name: continue if namespace: name = "{}/{}/{}".format(pool_name, namespace, image_name) else: name = "{}/{}".format(pool_name, image_name) self.log.debug( "load_pool_images: adding image {}".format(name)) images[pool_id][namespace][image_id] = name except Exception as e: self.log.error( "load_pool_images: exception when scanning pool {}: {}".format( pool_name, e)) def rebuild_queue(self): now = datetime.now() # don't remove from queue "due" images now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") for schedule_time in list(self.queue): if schedule_time > now_string: del self.queue[schedule_time] if not self.schedules: return for pool_id in self.images: for namespace in self.images[pool_id]: for image_id in self.images[pool_id][namespace]: self.enqueue(now, pool_id, namespace, image_id) self.condition.notify() def refresh_queue(self, current_images): now = datetime.now() for pool_id in self.images: for namespace in self.images[pool_id]: for image_id in self.images[pool_id][namespace]: if pool_id not in current_images or \ namespace not in current_images[pool_id] or \ image_id not in current_images[pool_id][namespace]: self.remove_from_queue(pool_id, namespace, image_id) for pool_id in current_images: for namespace in current_images[pool_id]: for image_id in current_images[pool_id][namespace]: if pool_id not in self.images or \ namespace not in self.images[pool_id] or \ image_id not in self.images[pool_id][namespace]: self.enqueue(now, pool_id, namespace, image_id) self.condition.notify() def enqueue(self, now, pool_id, namespace, image_id): schedule = self.schedules.find(pool_id, namespace, image_id) if not schedule: self.log.debug( "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format( pool_id, namespace, image_id)) return schedule_time = schedule.next_run(now) if schedule_time not in self.queue: self.queue[schedule_time] = [] self.log.debug( "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format( pool_id, namespace, image_id, schedule_time)) image_spec = (pool_id, namespace, image_id) if image_spec not in self.queue[schedule_time]: self.queue[schedule_time].append((pool_id, namespace, image_id)) def dequeue(self): if not self.queue: return None, 1000 now = datetime.now() schedule_time = sorted(self.queue)[0] if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: wait_time = (datetime.strptime(schedule_time, "%Y-%m-%d %H:%M:%S") - now) return None, wait_time.total_seconds() images = self.queue[schedule_time] image = images.pop(0) if not images: del self.queue[schedule_time] return image, 0 def remove_from_queue(self, pool_id, namespace, image_id): self.log.debug( "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format( pool_id, namespace, image_id)) empty_slots = [] for schedule_time, images in self.queue.items(): if (pool_id, namespace, image_id) in images: images.remove((pool_id, namespace, image_id)) if not images: empty_slots.append(schedule_time) for schedule_time in empty_slots: del self.queue[schedule_time] def add_schedule(self, level_spec, interval, start_time): self.log.debug( "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( level_spec.name, interval, start_time)) # TODO: optimize to rebuild only affected part of the queue with self.lock: self.schedules.add(level_spec, interval, start_time) self.rebuild_queue() return 0, "", "" def remove_schedule(self, level_spec, interval, start_time): self.log.debug( "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( level_spec.name, interval, start_time)) # TODO: optimize to rebuild only affected part of the queue with self.lock: self.schedules.remove(level_spec, interval, start_time) self.rebuild_queue() return 0, "", "" def list(self, level_spec): self.log.debug( "MirrorSnapshotScheduleHandler: list: level_spec={}".format( level_spec.name)) with self.lock: result = self.schedules.to_list(level_spec) return 0, json.dumps(result, indent=4, sort_keys=True), "" def status(self, level_spec): self.log.debug( "MirrorSnapshotScheduleHandler: status: level_spec={}".format( level_spec.name)) scheduled_images = [] with self.lock: for schedule_time in sorted(self.queue): for pool_id, namespace, image_id in self.queue[schedule_time]: if not level_spec.matches(pool_id, namespace, image_id): continue image_name = self.images[pool_id][namespace][image_id] scheduled_images.append({ 'schedule_time' : schedule_time, 'image' : image_name }) return 0, json.dumps({'scheduled_images' : scheduled_images}, indent=4, sort_keys=True), "" def handle_command(self, inbuf, prefix, cmd): level_spec_name = cmd.get('level_spec', "") try: level_spec = LevelSpec.from_name(self, level_spec_name, namespace_validator, image_validator) except ValueError as e: return -errno.EINVAL, '', "Invalid level spec {}: {}".format( level_spec_name, e) if prefix == 'add': return self.add_schedule(level_spec, cmd['interval'], cmd.get('start_time')) elif prefix == 'remove': return self.remove_schedule(level_spec, cmd.get('interval'), cmd.get('start_time')) elif prefix == 'list': return self.list(level_spec) elif prefix == 'status': return self.status(level_spec) raise NotImplementedError(cmd['prefix'])