From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../mgr/rbd_support/mirror_snapshot_schedule.py | 595 +++++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py (limited to 'src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py') diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py new file mode 100644 index 000000000..536ee3d16 --- /dev/null +++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py @@ -0,0 +1,595 @@ +import errno +import json +import rados +import rbd +import re +import traceback + +from datetime import datetime +from threading import Condition, Lock, Thread + +from .common import get_rbd_pools +from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules + +def namespace_validator(ioctx): + mode = rbd.RBD().mirror_mode_get(ioctx) + if mode != rbd.RBD_MIRROR_MODE_IMAGE: + raise ValueError("namespace {} is not in mirror image mode".format( + ioctx.get_namespace())) + +def image_validator(image): + mode = image.mirror_image_get_mode() + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + raise rbd.InvalidArgument("Invalid mirror image mode") + + +class CreateSnapshotRequests: + + lock = Lock() + condition = Condition(lock) + + def __init__(self, handler): + self.handler = handler + self.rados = handler.module.rados + self.log = handler.log + self.pending = set() + self.queue = [] + self.ioctxs = {} + + def __del__(self): + self.wait_for_pending() + + def wait_for_pending(self): + with self.lock: + while self.pending: + self.condition.wait() + + def add(self, pool_id, namespace, image_id): + image_spec = (pool_id, namespace, image_id) + + self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format( + pool_id, namespace, image_id)) + + max_concurrent = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE) + + with self.lock: + if image_spec in self.pending: + self.log.info( + "CreateSnapshotRequests.add: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "previous request is still in progress")) + return + self.pending.add(image_spec) + + if len(self.pending) > max_concurrent: + self.queue.append(image_spec) + return + + self.open_image(image_spec) + + def open_image(self, image_spec): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format( + pool_id, namespace, image_id)) + + try: + ioctx = self.get_ioctx(image_spec) + + def cb(comp, image): + self.handle_open_image(image_spec, comp, image) + + rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id) + except Exception as e: + self.log.error( + "exception when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_open_image(self, image_spec, comp, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.finish(image_spec) + return + + self.get_mirror_mode(image_spec, image) + + def get_mirror_mode(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, mode): + self.handle_get_mirror_mode(image_spec, image, comp, mode) + + try: + image.aio_mirror_image_get_mode(cb) + except Exception as e: + self.log.error( + "exception when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_mode(self, image_spec, image, comp, mode): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format( + pool_id, namespace, image_id, comp.get_return_value(), mode)) + + if comp.get_return_value() < 0: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "snapshot mirroring is not enabled")) + self.close_image(image_spec, image) + return + + self.get_mirror_info(image_spec, image) + + def get_mirror_info(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, info): + self.handle_get_mirror_info(image_spec, image, comp, info) + + try: + image.aio_mirror_image_get_info(cb) + except Exception as e: + self.log.error( + "exception when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_info(self, image_spec, image, comp, info): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format( + pool_id, namespace, image_id, comp.get_return_value(), info)) + + if comp.get_return_value() < 0: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + if not info['primary']: + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "is not primary")) + self.close_image(image_spec, image) + return + + self.create_snapshot(image_spec, image) + + def create_snapshot(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, snap_id): + self.handle_create_snapshot(image_spec, image, comp, snap_id) + + try: + image.aio_mirror_image_create_snapshot(0, cb) + except Exception as e: + self.log.error( + "exception when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + + def handle_create_snapshot(self, image_spec, image, comp, snap_id): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format( + pool_id, namespace, image_id, comp.get_return_value(), snap_id)) + + if comp.get_return_value() < 0 and \ + comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.close_image(image_spec, image) + + def close_image(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.close_image {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp): + self.handle_close_image(image_spec, comp) + + try: + image.aio_close(cb) + except Exception as e: + self.log.error( + "exception when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_close_image(self, image_spec, comp): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + self.log.error( + "error when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.finish(image_spec) + + def finish(self, image_spec): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format( + pool_id, namespace, image_id)) + + self.put_ioctx(image_spec) + + with self.lock: + self.pending.remove(image_spec) + if not self.queue: + return + image_spec = self.queue.pop(0) + + self.open_image(image_spec) + + def get_ioctx(self, image_spec): + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs.get(nspec, (None, None)) + if not ioctx: + ioctx = self.rados.open_ioctx2(int(pool_id)) + ioctx.set_namespace(namespace) + images = set() + self.ioctxs[nspec] = (ioctx, images) + images.add(image_spec) + + return ioctx + + def put_ioctx(self, image_spec): + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs[nspec] + images.remove(image_spec) + if not images: + del self.ioctxs[nspec] + + +class MirrorSnapshotScheduleHandler: + MODULE_OPTION_NAME = "mirror_snapshot_schedule" + MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create" + SCHEDULE_OID = "rbd_mirror_snapshot_schedule" + REFRESH_DELAY_SECONDS = 60.0 + + lock = Lock() + condition = Condition(lock) + thread = None + + def __init__(self, module): + self.module = module + self.log = module.log + self.last_refresh_images = datetime(1970, 1, 1) + self.create_snapshot_requests = CreateSnapshotRequests(self) + + self.init_schedule_queue() + + self.thread = Thread(target=self.run) + self.thread.start() + + def _cleanup(self): + self.create_snapshot_requests.wait_for_pending() + + def run(self): + try: + self.log.info("MirrorSnapshotScheduleHandler: starting") + while True: + refresh_delay = self.refresh_images() + with self.lock: + (image_spec, wait_time) = self.dequeue() + if not image_spec: + self.condition.wait(min(wait_time, refresh_delay)) + continue + pool_id, namespace, image_id = image_spec + self.create_snapshot_requests.add(pool_id, namespace, image_id) + with self.lock: + self.enqueue(datetime.now(), pool_id, namespace, image_id) + + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def init_schedule_queue(self): + self.queue = {} + self.images = {} + self.refresh_images() + self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized") + + def load_schedules(self): + self.log.info("MirrorSnapshotScheduleHandler: load_schedules") + + schedules = Schedules(self) + schedules.load(namespace_validator, image_validator) + self.schedules = schedules + + def refresh_images(self): + elapsed = (datetime.now() - self.last_refresh_images).total_seconds() + if elapsed < self.REFRESH_DELAY_SECONDS: + return self.REFRESH_DELAY_SECONDS - elapsed + + self.log.debug("MirrorSnapshotScheduleHandler: refresh_images") + + with self.lock: + self.load_schedules() + if not self.schedules: + self.log.debug("MirrorSnapshotScheduleHandler: no schedules") + self.images = {} + self.queue = {} + self.last_refresh_images = datetime.now() + return self.REFRESH_DELAY_SECONDS + + images = {} + + for pool_id, pool_name in get_rbd_pools(self.module).items(): + if not self.schedules.intersects( + LevelSpec.from_pool_spec(pool_id, pool_name)): + continue + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_pool_images(ioctx, images) + + with self.lock: + self.refresh_queue(images) + self.images = images + + self.last_refresh_images = datetime.now() + return self.REFRESH_DELAY_SECONDS + + def load_pool_images(self, ioctx, images): + pool_id = str(ioctx.get_pool_id()) + pool_name = ioctx.get_pool_name() + images[pool_id] = {} + + self.log.debug("load_pool_images: pool={}".format(pool_name)) + + try: + namespaces = [''] + rbd.RBD().namespace_list(ioctx) + for namespace in namespaces: + if not self.schedules.intersects( + LevelSpec.from_pool_spec(pool_id, pool_name, namespace)): + continue + self.log.debug("load_pool_images: pool={}, namespace={}".format( + pool_name, namespace)) + images[pool_id][namespace] = {} + ioctx.set_namespace(namespace) + mirror_images = dict(rbd.RBD().mirror_image_info_list( + ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) + if not mirror_images: + continue + image_names = dict( + [(x['id'], x['name']) for x in filter( + lambda x: x['id'] in mirror_images, + rbd.RBD().list2(ioctx))]) + for image_id, info in mirror_images.items(): + if not info['primary']: + continue + image_name = image_names.get(image_id) + if not image_name: + continue + if namespace: + name = "{}/{}/{}".format(pool_name, namespace, + image_name) + else: + name = "{}/{}".format(pool_name, image_name) + self.log.debug( + "load_pool_images: adding image {}".format(name)) + images[pool_id][namespace][image_id] = name + except Exception as e: + self.log.error( + "load_pool_images: exception when scanning pool {}: {}".format( + pool_name, e)) + + def rebuild_queue(self): + now = datetime.now() + + # don't remove from queue "due" images + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] + + if not self.schedules: + return + + for pool_id in self.images: + for namespace in self.images[pool_id]: + for image_id in self.images[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, image_id) + + self.condition.notify() + + def refresh_queue(self, current_images): + now = datetime.now() + + for pool_id in self.images: + for namespace in self.images[pool_id]: + for image_id in self.images[pool_id][namespace]: + if pool_id not in current_images or \ + namespace not in current_images[pool_id] or \ + image_id not in current_images[pool_id][namespace]: + self.remove_from_queue(pool_id, namespace, image_id) + + for pool_id in current_images: + for namespace in current_images[pool_id]: + for image_id in current_images[pool_id][namespace]: + if pool_id not in self.images or \ + namespace not in self.images[pool_id] or \ + image_id not in self.images[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, image_id) + + self.condition.notify() + + def enqueue(self, now, pool_id, namespace, image_id): + schedule = self.schedules.find(pool_id, namespace, image_id) + if not schedule: + self.log.debug( + "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format( + pool_id, namespace, image_id)) + return + + schedule_time = schedule.next_run(now) + if schedule_time not in self.queue: + self.queue[schedule_time] = [] + self.log.debug( + "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format( + pool_id, namespace, image_id, schedule_time)) + image_spec = (pool_id, namespace, image_id) + if image_spec not in self.queue[schedule_time]: + self.queue[schedule_time].append((pool_id, namespace, image_id)) + + def dequeue(self): + if not self.queue: + return None, 1000 + + now = datetime.now() + schedule_time = sorted(self.queue)[0] + + if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: + wait_time = (datetime.strptime(schedule_time, + "%Y-%m-%d %H:%M:%S") - now) + return None, wait_time.total_seconds() + + images = self.queue[schedule_time] + image = images.pop(0) + if not images: + del self.queue[schedule_time] + return image, 0 + + def remove_from_queue(self, pool_id, namespace, image_id): + self.log.debug( + "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format( + pool_id, namespace, image_id)) + + empty_slots = [] + for schedule_time, images in self.queue.items(): + if (pool_id, namespace, image_id) in images: + images.remove((pool_id, namespace, image_id)) + if not images: + empty_slots.append(schedule_time) + for schedule_time in empty_slots: + del self.queue[schedule_time] + + def add_schedule(self, level_spec, interval, start_time): + self.log.debug( + "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.add(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def remove_schedule(self, level_spec, interval, start_time): + self.log.debug( + "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.remove(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def list(self, level_spec): + self.log.debug( + "MirrorSnapshotScheduleHandler: list: level_spec={}".format( + level_spec.name)) + + with self.lock: + result = self.schedules.to_list(level_spec) + + return 0, json.dumps(result, indent=4, sort_keys=True), "" + + def status(self, level_spec): + self.log.debug( + "MirrorSnapshotScheduleHandler: status: level_spec={}".format( + level_spec.name)) + + scheduled_images = [] + with self.lock: + for schedule_time in sorted(self.queue): + for pool_id, namespace, image_id in self.queue[schedule_time]: + if not level_spec.matches(pool_id, namespace, image_id): + continue + image_name = self.images[pool_id][namespace][image_id] + scheduled_images.append({ + 'schedule_time' : schedule_time, + 'image' : image_name + }) + return 0, json.dumps({'scheduled_images' : scheduled_images}, + indent=4, sort_keys=True), "" + + def handle_command(self, inbuf, prefix, cmd): + level_spec_name = cmd.get('level_spec', "") + + try: + level_spec = LevelSpec.from_name(self, level_spec_name, + namespace_validator, + image_validator) + except ValueError as e: + return -errno.EINVAL, '', "Invalid level spec {}: {}".format( + level_spec_name, e) + + if prefix == 'add': + return self.add_schedule(level_spec, cmd['interval'], + cmd.get('start_time')) + elif prefix == 'remove': + return self.remove_schedule(level_spec, cmd.get('interval'), + cmd.get('start_time')) + elif prefix == 'list': + return self.list(level_spec) + elif prefix == 'status': + return self.status(level_spec) + + raise NotImplementedError(cmd['prefix']) -- cgit v1.2.3