summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rbd_support
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rbd_support')
-rw-r--r--src/pybind/mgr/rbd_support/__init__.py1
-rw-r--r--src/pybind/mgr/rbd_support/common.py34
-rw-r--r--src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py595
-rw-r--r--src/pybind/mgr/rbd_support/module.py204
-rw-r--r--src/pybind/mgr/rbd_support/perf.py457
-rw-r--r--src/pybind/mgr/rbd_support/schedule.py520
-rw-r--r--src/pybind/mgr/rbd_support/task.py832
-rw-r--r--src/pybind/mgr/rbd_support/trash_purge_schedule.py287
8 files changed, 2930 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/__init__.py b/src/pybind/mgr/rbd_support/__init__.py
new file mode 100644
index 000000000..8f210ac92
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/__init__.py
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/rbd_support/common.py b/src/pybind/mgr/rbd_support/common.py
new file mode 100644
index 000000000..f6bac8f39
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/common.py
@@ -0,0 +1,34 @@
+import re
+
+GLOBAL_POOL_KEY = (None, None)
+
+class NotAuthorizedError(Exception):
+ pass
+
+
+def is_authorized(module, pool, namespace):
+ return module.is_authorized({"pool": pool or '',
+ "namespace": namespace or ''})
+
+
+def authorize_request(module, pool, namespace):
+ if not is_authorized(module, pool, namespace):
+ raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
+ pool, namespace))
+
+
+def extract_pool_key(pool_spec):
+ if not pool_spec:
+ return GLOBAL_POOL_KEY
+
+ match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
+ if not match:
+ raise ValueError("Invalid pool spec: {}".format(pool_spec))
+ return (match.group(1), match.group(2) or '')
+
+
+def get_rbd_pools(module):
+ osd_map = module.get('osd_map')
+ return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
+ if 'rbd' in pool.get('application_metadata', {})}
+
diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
new file mode 100644
index 000000000..536ee3d16
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
@@ -0,0 +1,595 @@
+import errno
+import json
+import rados
+import rbd
+import re
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+
+def namespace_validator(ioctx):
+ mode = rbd.RBD().mirror_mode_get(ioctx)
+ if mode != rbd.RBD_MIRROR_MODE_IMAGE:
+ raise ValueError("namespace {} is not in mirror image mode".format(
+ ioctx.get_namespace()))
+
+def image_validator(image):
+ mode = image.mirror_image_get_mode()
+ if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ raise rbd.InvalidArgument("Invalid mirror image mode")
+
+
+class CreateSnapshotRequests:
+
+ lock = Lock()
+ condition = Condition(lock)
+
+ def __init__(self, handler):
+ self.handler = handler
+ self.rados = handler.module.rados
+ self.log = handler.log
+ self.pending = set()
+ self.queue = []
+ self.ioctxs = {}
+
+ def __del__(self):
+ self.wait_for_pending()
+
+ def wait_for_pending(self):
+ with self.lock:
+ while self.pending:
+ self.condition.wait()
+
+ def add(self, pool_id, namespace, image_id):
+ image_spec = (pool_id, namespace, image_id)
+
+ self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ max_concurrent = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
+
+ with self.lock:
+ if image_spec in self.pending:
+ self.log.info(
+ "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
+ pool_id, namespace, image_id,
+ "previous request is still in progress"))
+ return
+ self.pending.add(image_spec)
+
+ if len(self.pending) > max_concurrent:
+ self.queue.append(image_spec)
+ return
+
+ self.open_image(image_spec)
+
+ def open_image(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ try:
+ ioctx = self.get_ioctx(image_spec)
+
+ def cb(comp, image):
+ self.handle_open_image(image_spec, comp, image)
+
+ rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
+ except Exception as e:
+ self.log.error(
+ "exception when opening {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.finish(image_spec)
+
+ def handle_open_image(self, image_spec, comp, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ if comp.get_return_value() < 0:
+ if comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when opening {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.finish(image_spec)
+ return
+
+ self.get_mirror_mode(image_spec, image)
+
+ def get_mirror_mode(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, mode):
+ self.handle_get_mirror_mode(image_spec, image, comp, mode)
+
+ try:
+ image.aio_mirror_image_get_mode(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when getting mirror mode for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+ def handle_get_mirror_mode(self, image_spec, image, comp, mode):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), mode))
+
+ if comp.get_return_value() < 0:
+ if comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when getting mirror mode for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.close_image(image_spec, image)
+ return
+
+ if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
+ pool_id, namespace, image_id,
+ "snapshot mirroring is not enabled"))
+ self.close_image(image_spec, image)
+ return
+
+ self.get_mirror_info(image_spec, image)
+
+ def get_mirror_info(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, info):
+ self.handle_get_mirror_info(image_spec, image, comp, info)
+
+ try:
+ image.aio_mirror_image_get_info(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when getting mirror info for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+ def handle_get_mirror_info(self, image_spec, image, comp, info):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), info))
+
+ if comp.get_return_value() < 0:
+ if comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when getting mirror info for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.close_image(image_spec, image)
+ return
+
+ if not info['primary']:
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
+ pool_id, namespace, image_id,
+ "is not primary"))
+ self.close_image(image_spec, image)
+ return
+
+ self.create_snapshot(image_spec, image)
+
+ def create_snapshot(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, snap_id):
+ self.handle_create_snapshot(image_spec, image, comp, snap_id)
+
+ try:
+ image.aio_mirror_image_create_snapshot(0, cb)
+ except Exception as e:
+ self.log.error(
+ "exception when creating snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+
+ def handle_create_snapshot(self, image_spec, image, comp, snap_id):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), snap_id))
+
+ if comp.get_return_value() < 0 and \
+ comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when creating snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ self.close_image(image_spec, image)
+
+ def close_image(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.close_image {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp):
+ self.handle_close_image(image_spec, comp)
+
+ try:
+ image.aio_close(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when closing {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.finish(image_spec)
+
+ def handle_close_image(self, image_spec, comp):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when closing {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ self.finish(image_spec)
+
+ def finish(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ self.put_ioctx(image_spec)
+
+ with self.lock:
+ self.pending.remove(image_spec)
+ if not self.queue:
+ return
+ image_spec = self.queue.pop(0)
+
+ self.open_image(image_spec)
+
+ def get_ioctx(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, images = self.ioctxs.get(nspec, (None, None))
+ if not ioctx:
+ ioctx = self.rados.open_ioctx2(int(pool_id))
+ ioctx.set_namespace(namespace)
+ images = set()
+ self.ioctxs[nspec] = (ioctx, images)
+ images.add(image_spec)
+
+ return ioctx
+
+ def put_ioctx(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, images = self.ioctxs[nspec]
+ images.remove(image_spec)
+ if not images:
+ del self.ioctxs[nspec]
+
+
+class MirrorSnapshotScheduleHandler:
+ MODULE_OPTION_NAME = "mirror_snapshot_schedule"
+ MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
+ SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
+ REFRESH_DELAY_SECONDS = 60.0
+
+ lock = Lock()
+ condition = Condition(lock)
+ thread = None
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+ self.last_refresh_images = datetime(1970, 1, 1)
+ self.create_snapshot_requests = CreateSnapshotRequests(self)
+
+ self.init_schedule_queue()
+
+ self.thread = Thread(target=self.run)
+ self.thread.start()
+
+ def _cleanup(self):
+ self.create_snapshot_requests.wait_for_pending()
+
+ def run(self):
+ try:
+ self.log.info("MirrorSnapshotScheduleHandler: starting")
+ while True:
+ refresh_delay = self.refresh_images()
+ with self.lock:
+ (image_spec, wait_time) = self.dequeue()
+ if not image_spec:
+ self.condition.wait(min(wait_time, refresh_delay))
+ continue
+ pool_id, namespace, image_id = image_spec
+ self.create_snapshot_requests.add(pool_id, namespace, image_id)
+ with self.lock:
+ self.enqueue(datetime.now(), pool_id, namespace, image_id)
+
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def init_schedule_queue(self):
+ self.queue = {}
+ self.images = {}
+ self.refresh_images()
+ self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
+
+ def load_schedules(self):
+ self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
+
+ schedules = Schedules(self)
+ schedules.load(namespace_validator, image_validator)
+ self.schedules = schedules
+
+ def refresh_images(self):
+ elapsed = (datetime.now() - self.last_refresh_images).total_seconds()
+ if elapsed < self.REFRESH_DELAY_SECONDS:
+ return self.REFRESH_DELAY_SECONDS - elapsed
+
+ self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
+
+ with self.lock:
+ self.load_schedules()
+ if not self.schedules:
+ self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
+ self.images = {}
+ self.queue = {}
+ self.last_refresh_images = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ images = {}
+
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(pool_id, pool_name)):
+ continue
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_pool_images(ioctx, images)
+
+ with self.lock:
+ self.refresh_queue(images)
+ self.images = images
+
+ self.last_refresh_images = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ def load_pool_images(self, ioctx, images):
+ pool_id = str(ioctx.get_pool_id())
+ pool_name = ioctx.get_pool_name()
+ images[pool_id] = {}
+
+ self.log.debug("load_pool_images: pool={}".format(pool_name))
+
+ try:
+ namespaces = [''] + rbd.RBD().namespace_list(ioctx)
+ for namespace in namespaces:
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(pool_id, pool_name, namespace)):
+ continue
+ self.log.debug("load_pool_images: pool={}, namespace={}".format(
+ pool_name, namespace))
+ images[pool_id][namespace] = {}
+ ioctx.set_namespace(namespace)
+ mirror_images = dict(rbd.RBD().mirror_image_info_list(
+ ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
+ if not mirror_images:
+ continue
+ image_names = dict(
+ [(x['id'], x['name']) for x in filter(
+ lambda x: x['id'] in mirror_images,
+ rbd.RBD().list2(ioctx))])
+ for image_id, info in mirror_images.items():
+ if not info['primary']:
+ continue
+ image_name = image_names.get(image_id)
+ if not image_name:
+ continue
+ if namespace:
+ name = "{}/{}/{}".format(pool_name, namespace,
+ image_name)
+ else:
+ name = "{}/{}".format(pool_name, image_name)
+ self.log.debug(
+ "load_pool_images: adding image {}".format(name))
+ images[pool_id][namespace][image_id] = name
+ except Exception as e:
+ self.log.error(
+ "load_pool_images: exception when scanning pool {}: {}".format(
+ pool_name, e))
+
+ def rebuild_queue(self):
+ now = datetime.now()
+
+ # don't remove from queue "due" images
+ now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+ for schedule_time in list(self.queue):
+ if schedule_time > now_string:
+ del self.queue[schedule_time]
+
+ if not self.schedules:
+ return
+
+ for pool_id in self.images:
+ for namespace in self.images[pool_id]:
+ for image_id in self.images[pool_id][namespace]:
+ self.enqueue(now, pool_id, namespace, image_id)
+
+ self.condition.notify()
+
+ def refresh_queue(self, current_images):
+ now = datetime.now()
+
+ for pool_id in self.images:
+ for namespace in self.images[pool_id]:
+ for image_id in self.images[pool_id][namespace]:
+ if pool_id not in current_images or \
+ namespace not in current_images[pool_id] or \
+ image_id not in current_images[pool_id][namespace]:
+ self.remove_from_queue(pool_id, namespace, image_id)
+
+ for pool_id in current_images:
+ for namespace in current_images[pool_id]:
+ for image_id in current_images[pool_id][namespace]:
+ if pool_id not in self.images or \
+ namespace not in self.images[pool_id] or \
+ image_id not in self.images[pool_id][namespace]:
+ self.enqueue(now, pool_id, namespace, image_id)
+
+ self.condition.notify()
+
+ def enqueue(self, now, pool_id, namespace, image_id):
+ schedule = self.schedules.find(pool_id, namespace, image_id)
+ if not schedule:
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
+ pool_id, namespace, image_id))
+ return
+
+ schedule_time = schedule.next_run(now)
+ if schedule_time not in self.queue:
+ self.queue[schedule_time] = []
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
+ pool_id, namespace, image_id, schedule_time))
+ image_spec = (pool_id, namespace, image_id)
+ if image_spec not in self.queue[schedule_time]:
+ self.queue[schedule_time].append((pool_id, namespace, image_id))
+
+ def dequeue(self):
+ if not self.queue:
+ return None, 1000
+
+ now = datetime.now()
+ schedule_time = sorted(self.queue)[0]
+
+ if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+ wait_time = (datetime.strptime(schedule_time,
+ "%Y-%m-%d %H:%M:%S") - now)
+ return None, wait_time.total_seconds()
+
+ images = self.queue[schedule_time]
+ image = images.pop(0)
+ if not images:
+ del self.queue[schedule_time]
+ return image, 0
+
+ def remove_from_queue(self, pool_id, namespace, image_id):
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ empty_slots = []
+ for schedule_time, images in self.queue.items():
+ if (pool_id, namespace, image_id) in images:
+ images.remove((pool_id, namespace, image_id))
+ if not images:
+ empty_slots.append(schedule_time)
+ for schedule_time in empty_slots:
+ del self.queue[schedule_time]
+
+ def add_schedule(self, level_spec, interval, start_time):
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.add(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def remove_schedule(self, level_spec, interval, start_time):
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.remove(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def list(self, level_spec):
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
+ level_spec.name))
+
+ with self.lock:
+ result = self.schedules.to_list(level_spec)
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def status(self, level_spec):
+ self.log.debug(
+ "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
+ level_spec.name))
+
+ scheduled_images = []
+ with self.lock:
+ for schedule_time in sorted(self.queue):
+ for pool_id, namespace, image_id in self.queue[schedule_time]:
+ if not level_spec.matches(pool_id, namespace, image_id):
+ continue
+ image_name = self.images[pool_id][namespace][image_id]
+ scheduled_images.append({
+ 'schedule_time' : schedule_time,
+ 'image' : image_name
+ })
+ return 0, json.dumps({'scheduled_images' : scheduled_images},
+ indent=4, sort_keys=True), ""
+
+ def handle_command(self, inbuf, prefix, cmd):
+ level_spec_name = cmd.get('level_spec', "")
+
+ try:
+ level_spec = LevelSpec.from_name(self, level_spec_name,
+ namespace_validator,
+ image_validator)
+ except ValueError as e:
+ return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
+ level_spec_name, e)
+
+ if prefix == 'add':
+ return self.add_schedule(level_spec, cmd['interval'],
+ cmd.get('start_time'))
+ elif prefix == 'remove':
+ return self.remove_schedule(level_spec, cmd.get('interval'),
+ cmd.get('start_time'))
+ elif prefix == 'list':
+ return self.list(level_spec)
+ elif prefix == 'status':
+ return self.status(level_spec)
+
+ raise NotImplementedError(cmd['prefix'])
diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py
new file mode 100644
index 000000000..82bd06e62
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/module.py
@@ -0,0 +1,204 @@
+"""
+RBD support module
+"""
+
+import errno
+import rados
+import rbd
+import traceback
+
+from mgr_module import MgrModule
+
+from .common import NotAuthorizedError
+from .mirror_snapshot_schedule import MirrorSnapshotScheduleHandler
+from .perf import PerfHandler
+from .task import TaskHandler
+from .trash_purge_schedule import TrashPurgeScheduleHandler
+
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "rbd mirror snapshot schedule add "
+ "name=level_spec,type=CephString "
+ "name=interval,type=CephString "
+ "name=start_time,type=CephString,req=false ",
+ "desc": "Add rbd mirror snapshot schedule",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd mirror snapshot schedule remove "
+ "name=level_spec,type=CephString "
+ "name=interval,type=CephString,req=false "
+ "name=start_time,type=CephString,req=false ",
+ "desc": "Remove rbd mirror snapshot schedule",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd mirror snapshot schedule list "
+ "name=level_spec,type=CephString,req=false ",
+ "desc": "List rbd mirror snapshot schedule",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd mirror snapshot schedule status "
+ "name=level_spec,type=CephString,req=false ",
+ "desc": "Show rbd mirror snapshot schedule status",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd perf image stats "
+ "name=pool_spec,type=CephString,req=false "
+ "name=sort_by,type=CephChoices,strings="
+ "write_ops|write_bytes|write_latency|"
+ "read_ops|read_bytes|read_latency,"
+ "req=false ",
+ "desc": "Retrieve current RBD IO performance stats",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd perf image counters "
+ "name=pool_spec,type=CephString,req=false "
+ "name=sort_by,type=CephChoices,strings="
+ "write_ops|write_bytes|write_latency|"
+ "read_ops|read_bytes|read_latency,"
+ "req=false ",
+ "desc": "Retrieve current RBD IO performance counters",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd task add flatten "
+ "name=image_spec,type=CephString",
+ "desc": "Flatten a cloned image asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task add remove "
+ "name=image_spec,type=CephString",
+ "desc": "Remove an image asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task add trash remove "
+ "name=image_id_spec,type=CephString",
+ "desc": "Remove an image from the trash asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task add migration execute "
+ "name=image_spec,type=CephString",
+ "desc": "Execute an image migration asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task add migration commit "
+ "name=image_spec,type=CephString",
+ "desc": "Commit an executed migration asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task add migration abort "
+ "name=image_spec,type=CephString",
+ "desc": "Abort a prepared migration asynchronously in the background",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd task cancel "
+ "name=task_id,type=CephString ",
+ "desc": "Cancel a pending or running asynchronous task",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd task list "
+ "name=task_id,type=CephString,req=false ",
+ "desc": "List pending or running asynchronous tasks",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd trash purge schedule add "
+ "name=level_spec,type=CephString "
+ "name=interval,type=CephString "
+ "name=start_time,type=CephString,req=false ",
+ "desc": "Add rbd trash purge schedule",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd trash purge schedule remove "
+ "name=level_spec,type=CephString "
+ "name=interval,type=CephString,req=false "
+ "name=start_time,type=CephString,req=false ",
+ "desc": "Remove rbd trash purge schedule",
+ "perm": "w"
+ },
+ {
+ "cmd": "rbd trash purge schedule list "
+ "name=level_spec,type=CephString,req=false ",
+ "desc": "List rbd trash purge schedule",
+ "perm": "r"
+ },
+ {
+ "cmd": "rbd trash purge schedule status "
+ "name=level_spec,type=CephString,req=false ",
+ "desc": "Show rbd trash purge schedule status",
+ "perm": "r"
+ }
+ ]
+ MODULE_OPTIONS = [
+ {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
+ {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, 'type': 'int', 'default': 10},
+ {'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME},
+ ]
+
+ mirror_snapshot_schedule = None
+ perf = None
+ task = None
+ trash_purge_schedule = None
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.rados.wait_for_latest_osdmap()
+ self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
+ self.perf = PerfHandler(self)
+ self.task = TaskHandler(self)
+ self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
+
+ def handle_command(self, inbuf, cmd):
+ # ensure we have latest pools available
+ self.rados.wait_for_latest_osdmap()
+
+ prefix = cmd['prefix']
+ try:
+ try:
+ if prefix.startswith('rbd mirror snapshot schedule '):
+ return self.mirror_snapshot_schedule.handle_command(
+ inbuf, prefix[29:], cmd)
+ elif prefix.startswith('rbd perf '):
+ return self.perf.handle_command(inbuf, prefix[9:], cmd)
+ elif prefix.startswith('rbd task '):
+ return self.task.handle_command(inbuf, prefix[9:], cmd)
+ elif prefix.startswith('rbd trash purge schedule '):
+ return self.trash_purge_schedule.handle_command(
+ inbuf, prefix[25:], cmd)
+
+ except NotAuthorizedError:
+ raise
+ except Exception as ex:
+ # log the full traceback but don't send it to the CLI user
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+ raise
+
+ except rados.Error as ex:
+ return -ex.errno, "", str(ex)
+ except rbd.OSError as ex:
+ return -ex.errno, "", str(ex)
+ except rbd.Error as ex:
+ return -errno.EINVAL, "", str(ex)
+ except KeyError as ex:
+ return -errno.ENOENT, "", str(ex)
+ except ValueError as ex:
+ return -errno.EINVAL, "", str(ex)
+ except NotAuthorizedError as ex:
+ return -errno.EACCES, "", str(ex)
+
+ raise NotImplementedError(cmd['prefix'])
diff --git a/src/pybind/mgr/rbd_support/perf.py b/src/pybind/mgr/rbd_support/perf.py
new file mode 100644
index 000000000..c5accf114
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/perf.py
@@ -0,0 +1,457 @@
+import errno
+import json
+import rados
+import rbd
+import time
+import traceback
+
+from datetime import datetime, timedelta
+from threading import Condition, Lock, Thread
+
+from .common import (GLOBAL_POOL_KEY, authorize_request, extract_pool_key,
+ get_rbd_pools)
+
+QUERY_POOL_ID = "pool_id"
+QUERY_POOL_ID_MAP = "pool_id_map"
+QUERY_IDS = "query_ids"
+QUERY_SUM_POOL_COUNTERS = "pool_counters"
+QUERY_RAW_POOL_COUNTERS = "raw_pool_counters"
+QUERY_LAST_REQUEST = "last_request"
+
+OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$'
+OSD_PERF_QUERY_COUNTERS = ['write_ops',
+ 'read_ops',
+ 'write_bytes',
+ 'read_bytes',
+ 'write_latency',
+ 'read_latency']
+OSD_PERF_QUERY_COUNTERS_INDICES = {
+ OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))}
+
+OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5]
+OSD_PERF_QUERY_MAX_RESULTS = 256
+
+POOL_REFRESH_INTERVAL = timedelta(minutes=5)
+QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+STATS_RATE_INTERVAL = timedelta(minutes=1)
+
+REPORT_MAX_RESULTS = 64
+
+
+class PerfHandler:
+ user_queries = {}
+ image_cache = {}
+
+ lock = Lock()
+ query_condition = Condition(lock)
+ refresh_condition = Condition(lock)
+ thread = None
+
+ image_name_cache = {}
+ image_name_refresh_time = datetime.fromtimestamp(0)
+
+ @classmethod
+ def prepare_regex(cls, value):
+ return '^({})$'.format(value)
+
+ @classmethod
+ def prepare_osd_perf_query(cls, pool_id, namespace, counter_type):
+ pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
+ namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
+ if pool_id:
+ pool_id_regex = cls.prepare_regex(pool_id)
+ if namespace:
+ namespace_regex = cls.prepare_regex(namespace)
+
+ return {
+ 'key_descriptor': [
+ {'type': 'pool_id', 'regex': pool_id_regex},
+ {'type': 'namespace', 'regex': namespace_regex},
+ {'type': 'object_name',
+ 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
+ ],
+ 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS,
+ 'limit': {'order_by': counter_type,
+ 'max_count': OSD_PERF_QUERY_MAX_RESULTS},
+ }
+
+ @classmethod
+ def pool_spec_search_keys(cls, pool_key):
+ return [pool_key[0:len(pool_key) - x]
+ for x in range(0, len(pool_key) + 1)]
+
+ @classmethod
+ def submatch_pool_key(cls, pool_key, search_key):
+ return ((pool_key[1] == search_key[1] or not search_key[1])
+ and (pool_key[0] == search_key[0] or not search_key[0]))
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+
+ self.thread = Thread(target=self.run)
+ self.thread.start()
+
+ def run(self):
+ try:
+ self.log.info("PerfHandler: starting")
+ while True:
+ with self.lock:
+ self.scrub_expired_queries()
+ self.process_raw_osd_perf_counters()
+ self.refresh_condition.notify()
+
+ stats_period = self.module.get_ceph_option("mgr_stats_period")
+ self.query_condition.wait(stats_period)
+
+ self.log.debug("PerfHandler: tick")
+
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def merge_raw_osd_perf_counters(self, pool_key, query, now_ts,
+ resolve_image_names):
+ pool_id_map = query[QUERY_POOL_ID_MAP]
+
+ # collect and combine the raw counters from all sort orders
+ raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
+ for query_id in query[QUERY_IDS]:
+ res = self.module.get_osd_perf_counters(query_id)
+ for counter in res['counters']:
+ # replace pool id from object name if it exists
+ k = counter['k']
+ pool_id = int(k[2][0]) if k[2][0] else int(k[0][0])
+ namespace = k[1][0]
+ image_id = k[2][1]
+
+ # ignore metrics from non-matching pools/namespaces
+ if pool_id not in pool_id_map:
+ continue
+ if pool_key[1] is not None and pool_key[1] != namespace:
+ continue
+
+ # flag the pool (and namespace) for refresh if we cannot find
+ # image name in the cache
+ resolve_image_key = (pool_id, namespace)
+ if image_id not in self.image_name_cache.get(resolve_image_key, {}):
+ resolve_image_names.add(resolve_image_key)
+
+ # copy the 'sum' counter values for each image (ignore count)
+ # if we haven't already processed it for this round
+ raw_namespaces = raw_pool_counters.setdefault(pool_id, {})
+ raw_images = raw_namespaces.setdefault(namespace, {})
+ raw_image = raw_images.setdefault(image_id, [None, None])
+
+ # save the last two perf counters for each image
+ if raw_image[0] and raw_image[0][0] < now_ts:
+ raw_image[1] = raw_image[0]
+ raw_image[0] = None
+ if not raw_image[0]:
+ raw_image[0] = [now_ts, [int(x[0]) for x in counter['c']]]
+
+ self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters))
+ return raw_pool_counters
+
+ def sum_osd_perf_counters(self, query, raw_pool_counters, now_ts):
+ # update the cumulative counters for each image
+ sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
+ for pool_id, raw_namespaces in raw_pool_counters.items():
+ sum_namespaces = sum_pool_counters.setdefault(pool_id, {})
+ for namespace, raw_images in raw_namespaces.items():
+ sum_namespace = sum_namespaces.setdefault(namespace, {})
+ for image_id, raw_image in raw_images.items():
+ # zero-out non-updated raw counters
+ if not raw_image[0]:
+ continue
+ elif raw_image[0][0] < now_ts:
+ raw_image[1] = raw_image[0]
+ raw_image[0] = [now_ts, [0 for x in raw_image[1][1]]]
+ continue
+
+ counters = raw_image[0][1]
+
+ # copy raw counters if this is a newly discovered image or
+ # increment existing counters
+ sum_image = sum_namespace.setdefault(image_id, None)
+ if sum_image:
+ for i in range(len(counters)):
+ sum_image[i] += counters[i]
+ else:
+ sum_namespace[image_id] = [x for x in counters]
+
+ self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters))
+ return sum_pool_counters
+
+ def refresh_image_names(self, resolve_image_names):
+ for pool_id, namespace in resolve_image_names:
+ image_key = (pool_id, namespace)
+ images = self.image_name_cache.setdefault(image_key, {})
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ ioctx.set_namespace(namespace)
+ for image_meta in rbd.RBD().list2(ioctx):
+ images[image_meta['id']] = image_meta['name']
+ self.log.debug("resolve_image_names: {}={}".format(image_key, images))
+
+ def scrub_missing_images(self):
+ for pool_key, query in self.user_queries.items():
+ raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {})
+ sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {})
+ for pool_id, sum_namespaces in sum_pool_counters.items():
+ raw_namespaces = raw_pool_counters.get(pool_id, {})
+ for namespace, sum_images in sum_namespaces.items():
+ raw_images = raw_namespaces.get(namespace, {})
+
+ image_key = (pool_id, namespace)
+ image_names = self.image_name_cache.get(image_key, {})
+ for image_id in list(sum_images.keys()):
+ # scrub image counters if we failed to resolve image name
+ if image_id not in image_names:
+ self.log.debug("scrub_missing_images: dropping {}/{}".format(
+ image_key, image_id))
+ del sum_images[image_id]
+ if image_id in raw_images:
+ del raw_images[image_id]
+
+ def process_raw_osd_perf_counters(self):
+ now = datetime.now()
+ now_ts = int(now.strftime("%s"))
+
+ # clear the image name cache if we need to refresh all active pools
+ if self.image_name_cache and \
+ self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now:
+ self.log.debug("process_raw_osd_perf_counters: expiring image name cache")
+ self.image_name_cache = {}
+
+ resolve_image_names = set()
+ for pool_key, query in self.user_queries.items():
+ if not query[QUERY_IDS]:
+ continue
+
+ raw_pool_counters = self.merge_raw_osd_perf_counters(
+ pool_key, query, now_ts, resolve_image_names)
+ self.sum_osd_perf_counters(query, raw_pool_counters, now_ts)
+
+ if resolve_image_names:
+ self.image_name_refresh_time = now
+ self.refresh_image_names(resolve_image_names)
+ self.scrub_missing_images()
+ elif not self.image_name_cache:
+ self.scrub_missing_images()
+
+ def resolve_pool_id(self, pool_name):
+ pool_id = self.module.rados.pool_lookup(pool_name)
+ if not pool_id:
+ raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name),
+ errno.ENOENT)
+ return pool_id
+
+ def scrub_expired_queries(self):
+ # perf counters need to be periodically refreshed to continue
+ # to be registered
+ expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
+ for pool_key in list(self.user_queries.keys()):
+ user_query = self.user_queries[pool_key]
+ if user_query[QUERY_LAST_REQUEST] < expire_time:
+ self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
+ del self.user_queries[pool_key]
+
+ def register_osd_perf_queries(self, pool_id, namespace):
+ query_ids = []
+ try:
+ for counter in OSD_PERF_QUERY_COUNTERS:
+ query = self.prepare_osd_perf_query(pool_id, namespace, counter)
+ self.log.debug("register_osd_perf_queries: {}".format(query))
+
+ query_id = self.module.add_osd_perf_query(query)
+ if query_id is None:
+ raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
+ query_ids.append(query_id)
+
+ except Exception:
+ for query_id in query_ids:
+ self.module.remove_osd_perf_query(query_id)
+ raise
+
+ return query_ids
+
+ def unregister_osd_perf_queries(self, pool_key, query_ids):
+ self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
+ pool_key, query_ids))
+ for query_id in query_ids:
+ self.module.remove_osd_perf_query(query_id)
+ query_ids[:] = []
+
+ def register_query(self, pool_key):
+ if pool_key not in self.user_queries:
+ pool_id = None
+ if pool_key[0]:
+ pool_id = self.resolve_pool_id(pool_key[0])
+
+ user_query = {
+ QUERY_POOL_ID: pool_id,
+ QUERY_POOL_ID_MAP: {pool_id: pool_key[0]},
+ QUERY_IDS: self.register_osd_perf_queries(pool_id, pool_key[1]),
+ QUERY_LAST_REQUEST: datetime.now()
+ }
+
+ self.user_queries[pool_key] = user_query
+
+ # force an immediate stat pull if this is a new query
+ self.query_condition.notify()
+ self.refresh_condition.wait(5)
+
+ else:
+ user_query = self.user_queries[pool_key]
+
+ # ensure query doesn't expire
+ user_query[QUERY_LAST_REQUEST] = datetime.now()
+
+ if pool_key == GLOBAL_POOL_KEY:
+ # refresh the global pool id -> name map upon each
+ # processing period
+ user_query[QUERY_POOL_ID_MAP] = {
+ pool_id: pool_name for pool_id, pool_name
+ in get_rbd_pools(self.module).items()}
+
+ self.log.debug("register_query: pool_key={}, query_ids={}".format(
+ pool_key, user_query[QUERY_IDS]))
+
+ return user_query
+
+ def extract_stat(self, index, raw_image, sum_image):
+ # require two raw counters between a fixed time window
+ if not raw_image or not raw_image[0] or not raw_image[1]:
+ return 0
+
+ current_time = raw_image[0][0]
+ previous_time = raw_image[1][0]
+ if current_time <= previous_time or \
+ current_time - previous_time > STATS_RATE_INTERVAL.total_seconds():
+ return 0
+
+ current_value = raw_image[0][1][index]
+ instant_rate = float(current_value) / (current_time - previous_time)
+
+ # convert latencies from sum to average per op
+ ops_index = None
+ if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency':
+ ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops']
+ elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency':
+ ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops']
+
+ if ops_index is not None:
+ ops = max(1, self.extract_stat(ops_index, raw_image, sum_image))
+ instant_rate /= ops
+
+ return instant_rate
+
+ def extract_counter(self, index, raw_image, sum_image):
+ if sum_image:
+ return sum_image[index]
+ return 0
+
+ def generate_report(self, query, sort_by, extract_data):
+ pool_id_map = query[QUERY_POOL_ID_MAP]
+ sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
+ raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
+
+ sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by)
+
+ # pre-sort and limit the response
+ results = []
+ for pool_id, sum_namespaces in sum_pool_counters.items():
+ if pool_id not in pool_id_map:
+ continue
+ raw_namespaces = raw_pool_counters.get(pool_id, {})
+ for namespace, sum_images in sum_namespaces.items():
+ raw_images = raw_namespaces.get(namespace, {})
+ for image_id, sum_image in sum_images.items():
+ raw_image = raw_images.get(image_id, [])
+
+ # always sort by recent IO activity
+ results.append([(pool_id, namespace, image_id),
+ self.extract_stat(sort_by_index, raw_image,
+ sum_image)])
+ results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS]
+
+ # build the report in sorted order
+ pool_descriptors = {}
+ counters = []
+ for key, _ in results:
+ pool_id = key[0]
+ pool_name = pool_id_map[pool_id]
+
+ namespace = key[1]
+ image_id = key[2]
+ image_names = self.image_name_cache.get((pool_id, namespace), {})
+ image_name = image_names[image_id]
+
+ raw_namespaces = raw_pool_counters.get(pool_id, {})
+ raw_images = raw_namespaces.get(namespace, {})
+ raw_image = raw_images.get(image_id, [])
+
+ sum_namespaces = sum_pool_counters[pool_id]
+ sum_images = sum_namespaces[namespace]
+ sum_image = sum_images.get(image_id, [])
+
+ pool_descriptor = pool_name
+ if namespace:
+ pool_descriptor += "/{}".format(namespace)
+ pool_index = pool_descriptors.setdefault(pool_descriptor,
+ len(pool_descriptors))
+ image_descriptor = "{}/{}".format(pool_index, image_name)
+ data = [extract_data(i, raw_image, sum_image)
+ for i in range(len(OSD_PERF_QUERY_COUNTERS))]
+
+ # skip if no data to report
+ if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]:
+ continue
+
+ counters.append({image_descriptor: data})
+
+ return {idx: descriptor for descriptor, idx
+ in pool_descriptors.items()}, \
+ counters
+
+ def get_perf_data(self, report, pool_spec, sort_by, extract_data):
+ self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
+ report, pool_spec, sort_by))
+ self.scrub_expired_queries()
+
+ pool_key = extract_pool_key(pool_spec)
+ authorize_request(self.module, pool_key[0], pool_key[1])
+
+ user_query = self.register_query(pool_key)
+
+ now = datetime.now()
+ pool_descriptors, counters = self.generate_report(
+ user_query, sort_by, extract_data)
+
+ report = {
+ 'timestamp': time.mktime(now.timetuple()),
+ '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS,
+ 'pool_descriptors': pool_descriptors,
+ '{}s'.format(report): counters
+ }
+
+ return 0, json.dumps(report), ""
+
+ def get_perf_stats(self, pool_spec, sort_by):
+ return self.get_perf_data(
+ "stat", pool_spec, sort_by, self.extract_stat)
+
+ def get_perf_counters(self, pool_spec, sort_by):
+ return self.get_perf_data(
+ "counter", pool_spec, sort_by, self.extract_counter)
+
+ def handle_command(self, inbuf, prefix, cmd):
+ with self.lock:
+ if prefix == 'image stats':
+ return self.get_perf_stats(cmd.get('pool_spec', None),
+ cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
+ elif prefix == 'image counters':
+ return self.get_perf_counters(cmd.get('pool_spec', None),
+ cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
+
+ raise NotImplementedError(cmd['prefix'])
diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py
new file mode 100644
index 000000000..3063a086a
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/schedule.py
@@ -0,0 +1,520 @@
+import json
+import rados
+import rbd
+import re
+
+from datetime import datetime, timedelta, time
+from dateutil.parser import parse
+
+from .common import get_rbd_pools
+
+SCHEDULE_INTERVAL = "interval"
+SCHEDULE_START_TIME = "start_time"
+
+
+class LevelSpec:
+
+ def __init__(self, name, id, pool_id, namespace, image_id=None):
+ self.name = name
+ self.id = id
+ self.pool_id = pool_id
+ self.namespace = namespace
+ self.image_id = image_id
+
+ def __eq__(self, level_spec):
+ return self.id == level_spec.id
+
+ def is_child_of(self, level_spec):
+ if level_spec.is_global():
+ return not self.is_global()
+ if level_spec.pool_id != self.pool_id:
+ return False
+ if level_spec.namespace is None:
+ return self.namespace is not None
+ if level_spec.namespace != self.namespace:
+ return False
+ if level_spec.image_id is None:
+ return self.image_id is not None
+ return False
+
+ def is_global(self):
+ return self.pool_id is None
+
+ def get_pool_id(self):
+ return self.pool_id
+
+ def matches(self, pool_id, namespace, image_id=None):
+ if self.pool_id and self.pool_id != pool_id:
+ return False
+ if self.namespace and self.namespace != namespace:
+ return False
+ if self.image_id and self.image_id != image_id:
+ return False
+ return True
+
+ def intersects(self, level_spec):
+ if self.pool_id is None or level_spec.pool_id is None:
+ return True
+ if self.pool_id != level_spec.pool_id:
+ return False
+ if self.namespace is None or level_spec.namespace is None:
+ return True
+ if self.namespace != level_spec.namespace:
+ return False
+ if self.image_id is None or level_spec.image_id is None:
+ return True
+ if self.image_id != level_spec.image_id:
+ return False
+ return True
+
+ @classmethod
+ def make_global(cls):
+ return LevelSpec("", "", None, None, None)
+
+ @classmethod
+ def from_pool_spec(cls, pool_id, pool_name, namespace=None):
+ if namespace is None:
+ id = "{}".format(pool_id)
+ name = "{}/".format(pool_name)
+ else:
+ id = "{}/{}".format(pool_id, namespace)
+ name = "{}/{}/".format(pool_name, namespace)
+ return LevelSpec(name, id, str(pool_id), namespace, None)
+
+ @classmethod
+ def from_name(cls, handler, name, namespace_validator=None,
+ image_validator=None, allow_image_level=True):
+ # parse names like:
+ # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
+ match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
+ name)
+ if not match:
+ raise ValueError("failed to parse {}".format(name))
+ if match.group(3) and not allow_image_level:
+ raise ValueError(
+ "invalid name {}: image level is not allowed".format(name))
+
+ id = ""
+ pool_id = None
+ namespace = None
+ image_name = None
+ image_id = None
+ if match.group(1):
+ pool_name = match.group(1)
+ try:
+ pool_id = handler.module.rados.pool_lookup(pool_name)
+ if pool_id is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ if pool_id not in get_rbd_pools(handler.module):
+ raise ValueError("{} is not an RBD pool".format(pool_name))
+ pool_id = str(pool_id)
+ id += pool_id
+ if match.group(2) is not None or match.group(3):
+ id += "/"
+ with handler.module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ id += namespace
+ ioctx.set_namespace(namespace)
+ if namespace_validator:
+ namespace_validator(ioctx)
+ if match.group(3):
+ image_name = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_name,
+ read_only=True) as image:
+ image_id = image.id()
+ id += "/" + image_id
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_name))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_name))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_name))
+
+ # normalize possible input name like 'rbd//image'
+ if not namespace and image_name:
+ name = "{}/{}".format(pool_name, image_name)
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+ @classmethod
+ def from_id(cls, handler, id, namespace_validator=None,
+ image_validator=None):
+ # parse ids like:
+ # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
+ match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
+ if not match:
+ raise ValueError("failed to parse: {}".format(id))
+
+ name = ""
+ pool_id = None
+ namespace = None
+ image_id = None
+ if match.group(1):
+ pool_id = match.group(1)
+ try:
+ pool_name = handler.module.rados.pool_reverse_lookup(
+ int(pool_id))
+ if pool_name is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ name += pool_name + "/"
+ if match.group(2) is not None or match.group(3):
+ with handler.module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ name += namespace + "/"
+ if namespace_validator:
+ ioctx.set_namespace(namespace)
+ elif not match.group(3):
+ name += "/"
+ if match.group(3):
+ image_id = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_id=image_id,
+ read_only=True) as image:
+ image_name = image.get_name()
+ name += image_name
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_id))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_id))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_id))
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+
+class Interval:
+
+ def __init__(self, minutes):
+ self.minutes = minutes
+
+ def __eq__(self, interval):
+ return self.minutes == interval.minutes
+
+ def __hash__(self):
+ return hash(self.minutes)
+
+ def to_string(self):
+ if self.minutes % (60 * 24) == 0:
+ interval = int(self.minutes / (60 * 24))
+ units = 'd'
+ elif self.minutes % 60 == 0:
+ interval = int(self.minutes / 60)
+ units = 'h'
+ else:
+ interval = int(self.minutes)
+ units = 'm'
+
+ return "{}{}".format(interval, units)
+
+ @classmethod
+ def from_string(cls, interval):
+ match = re.match(r'^(\d+)(d|h|m)?$', interval)
+ if not match:
+ raise ValueError("Invalid interval ({})".format(interval))
+
+ minutes = int(match.group(1))
+ if match.group(2) == 'd':
+ minutes *= 60 * 24
+ elif match.group(2) == 'h':
+ minutes *= 60
+
+ return Interval(minutes)
+
+
+class StartTime:
+
+ def __init__(self, hour, minute, tzinfo):
+ self.time = time(hour, minute, tzinfo=tzinfo)
+ self.minutes = self.time.hour * 60 + self.time.minute
+ if self.time.tzinfo:
+ self.minutes += int(self.time.utcoffset().seconds / 60)
+
+ def __eq__(self, start_time):
+ return self.minutes == start_time.minutes
+
+ def __hash__(self):
+ return hash(self.minutes)
+
+ def to_string(self):
+ return self.time.isoformat()
+
+ @classmethod
+ def from_string(cls, start_time):
+ if not start_time:
+ return None
+
+ try:
+ t = parse(start_time).timetz()
+ except ValueError as e:
+ raise ValueError("Invalid start time {}: {}".format(start_time, e))
+
+ return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
+
+
+class Schedule:
+
+ def __init__(self, name):
+ self.name = name
+ self.items = set()
+
+ def __len__(self):
+ return len(self.items)
+
+ def add(self, interval, start_time=None):
+ self.items.add((interval, start_time))
+
+ def remove(self, interval, start_time=None):
+ self.items.discard((interval, start_time))
+
+ def next_run(self, now):
+ schedule_time = None
+ for item in self.items:
+ period = timedelta(minutes=item[0].minutes)
+ start_time = datetime(1970, 1, 1)
+ if item[1]:
+ start_time += timedelta(minutes=item[1].minutes)
+ time = start_time + \
+ (int((now - start_time) / period) + 1) * period
+ if schedule_time is None or time < schedule_time:
+ schedule_time = time
+ return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
+
+ def to_list(self):
+ return [{SCHEDULE_INTERVAL: i[0].to_string(),
+ SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
+ for i in self.items]
+
+ def to_json(self):
+ return json.dumps(self.to_list(), indent=4, sort_keys=True)
+
+ @classmethod
+ def from_json(cls, name, val):
+ try:
+ items = json.loads(val)
+ schedule = Schedule(name)
+ for item in items:
+ interval = Interval.from_string(item[SCHEDULE_INTERVAL])
+ start_time = item[SCHEDULE_START_TIME] and \
+ StartTime.from_string(item[SCHEDULE_START_TIME]) or None
+ schedule.add(interval, start_time)
+ return schedule
+ except json.JSONDecodeError as e:
+ raise ValueError("Invalid JSON ({})".format(str(e)))
+ except KeyError as e:
+ raise ValueError(
+ "Invalid schedule format (missing key {})".format(str(e)))
+ except TypeError as e:
+ raise ValueError("Invalid schedule format ({})".format(str(e)))
+
+class Schedules:
+
+ def __init__(self, handler):
+ self.handler = handler
+ self.level_specs = {}
+ self.schedules = {}
+
+ def __len__(self):
+ return len(self.schedules)
+
+ def load(self, namespace_validator=None, image_validator=None):
+
+ schedule_cfg = self.handler.module.get_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+
+ # Previous versions incorrectly stored the global config in
+ # the localized module option. Check the config is here and fix it.
+ if not schedule_cfg:
+ schedule_cfg = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+ if schedule_cfg:
+ self.handler.module.set_module_option(
+ self.handler.MODULE_OPTION_NAME, schedule_cfg)
+ self.handler.module.set_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, None)
+
+ if schedule_cfg:
+ try:
+ level_spec = LevelSpec.make_global()
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, schedule_cfg)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode configured schedule {}".format(
+ schedule_cfg))
+
+ for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
+ try:
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_from_pool(ioctx, namespace_validator,
+ image_validator)
+ except rados.Error as e:
+ self.handler.log.error(
+ "Failed to load schedules for pool {}: {}".format(
+ pool_name, e))
+
+ def load_from_pool(self, ioctx, namespace_validator, image_validator):
+ pool_id = ioctx.get_pool_id()
+ pool_name = ioctx.get_pool_name()
+ stale_keys = ()
+ start_after = ''
+ try:
+ while True:
+ with rados.ReadOpCtx() as read_op:
+ self.handler.log.info(
+ "load_schedules: {}, start_after={}".format(
+ pool_name, start_after))
+ it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+ ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
+
+ it = list(it)
+ for k, v in it:
+ start_after = k
+ v = v.decode()
+ self.handler.log.info(
+ "load_schedule: {} {}".format(k, v))
+ try:
+ try:
+ level_spec = LevelSpec.from_id(
+ self.handler, k, namespace_validator,
+ image_validator)
+ except ValueError:
+ self.handler.log.debug(
+ "Stail schedule key {} in pool".format(
+ k, pool_name))
+ stale_keys += (k,)
+ continue
+
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, v)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode schedule: pool={}, {} {}".format(
+ pool_name, k, v))
+ if not it:
+ break
+
+ except StopIteration:
+ pass
+ except rados.ObjectNotFound:
+ pass
+
+ if stale_keys:
+ with rados.WriteOpCtx() as write_op:
+ ioctx.remove_omap_keys(write_op, stale_keys)
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+ def save(self, level_spec, schedule):
+ if level_spec.is_global():
+ schedule_cfg = schedule and schedule.to_json() or None
+ self.handler.module.set_module_option(
+ self.handler.MODULE_OPTION_NAME, schedule_cfg)
+ return
+
+ pool_id = level_spec.get_pool_id()
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ with rados.WriteOpCtx() as write_op:
+ if schedule:
+ ioctx.set_omap(write_op, (level_spec.id, ),
+ (schedule.to_json(), ))
+ else:
+ ioctx.remove_omap_keys(write_op, (level_spec.id, ))
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+
+ def add(self, level_spec, interval, start_time):
+ schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
+ schedule.add(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ self.schedules[level_spec.id] = schedule
+ self.level_specs[level_spec.id] = level_spec
+ self.save(level_spec, schedule)
+
+ def remove(self, level_spec, interval, start_time):
+ schedule = self.schedules.pop(level_spec.id, None)
+ if schedule:
+ if interval is None:
+ schedule = None
+ else:
+ try:
+ schedule.remove(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ finally:
+ if schedule:
+ self.schedules[level_spec.id] = schedule
+ if not schedule:
+ del self.level_specs[level_spec.id]
+ self.save(level_spec, schedule)
+
+ def find(self, pool_id, namespace, image_id=None):
+ levels = [None, pool_id, namespace]
+ if image_id:
+ levels.append(image_id)
+
+ while levels:
+ level_spec_id = "/".join(levels[1:])
+ if level_spec_id in self.schedules:
+ return self.schedules[level_spec_id]
+ del levels[-1]
+ return None
+
+ def intersects(self, level_spec):
+ for ls in self.level_specs.values():
+ if ls.intersects(level_spec):
+ return True
+ return False
+
+ def to_list(self, level_spec):
+ if level_spec.id in self.schedules:
+ parent = level_spec
+ else:
+ # try to find existing parent
+ parent = None
+ for level_spec_id in self.schedules:
+ ls = self.level_specs[level_spec_id]
+ if ls == level_spec:
+ parent = ls
+ break
+ if level_spec.is_child_of(ls) and \
+ (not parent or ls.is_child_of(parent)):
+ parent = ls
+ if not parent:
+ # set to non-existing parent so we still could list its children
+ parent = level_spec
+
+ result = {}
+ for level_spec_id, schedule in self.schedules.items():
+ ls = self.level_specs[level_spec_id]
+ if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
+ result[level_spec_id] = {
+ 'name' : schedule.name,
+ 'schedule' : schedule.to_list(),
+ }
+ return result
+
diff --git a/src/pybind/mgr/rbd_support/task.py b/src/pybind/mgr/rbd_support/task.py
new file mode 100644
index 000000000..ff096fd9b
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/task.py
@@ -0,0 +1,832 @@
+import errno
+import json
+import rados
+import rbd
+import re
+import traceback
+import uuid
+
+from contextlib import contextmanager
+from datetime import datetime, timedelta
+from functools import partial, wraps
+from threading import Condition, Lock, Thread
+
+from .common import (authorize_request, extract_pool_key, get_rbd_pools,
+ is_authorized)
+
+
+RBD_TASK_OID = "rbd_task"
+
+TASK_SEQUENCE = "sequence"
+TASK_ID = "id"
+TASK_REFS = "refs"
+TASK_MESSAGE = "message"
+TASK_RETRY_ATTEMPTS = "retry_attempts"
+TASK_RETRY_TIME = "retry_time"
+TASK_RETRY_MESSAGE = "retry_message"
+TASK_IN_PROGRESS = "in_progress"
+TASK_PROGRESS = "progress"
+TASK_CANCELED = "canceled"
+
+TASK_REF_POOL_NAME = "pool_name"
+TASK_REF_POOL_NAMESPACE = "pool_namespace"
+TASK_REF_IMAGE_NAME = "image_name"
+TASK_REF_IMAGE_ID = "image_id"
+TASK_REF_ACTION = "action"
+
+TASK_REF_ACTION_FLATTEN = "flatten"
+TASK_REF_ACTION_REMOVE = "remove"
+TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
+TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
+TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
+TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
+
+VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
+ TASK_REF_ACTION_REMOVE,
+ TASK_REF_ACTION_TRASH_REMOVE,
+ TASK_REF_ACTION_MIGRATION_EXECUTE,
+ TASK_REF_ACTION_MIGRATION_COMMIT,
+ TASK_REF_ACTION_MIGRATION_ABORT]
+
+TASK_RETRY_INTERVAL = timedelta(seconds=30)
+TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
+MAX_COMPLETED_TASKS = 50
+
+
+class Throttle:
+ def __init__(self, throttle_period):
+ self.throttle_period = throttle_period
+ self.time_of_last_call = datetime.min
+
+ def __call__(self, fn):
+ @wraps(fn)
+ def wrapper(*args, **kwargs):
+ now = datetime.now()
+ if self.time_of_last_call + self.throttle_period <= now:
+ self.time_of_last_call = now
+ return fn(*args, **kwargs)
+ return wrapper
+
+
+class Task:
+ def __init__(self, sequence, task_id, message, refs):
+ self.sequence = sequence
+ self.task_id = task_id
+ self.message = message
+ self.refs = refs
+ self.retry_message = None
+ self.retry_attempts = 0
+ self.retry_time = None
+ self.in_progress = False
+ self.progress = 0.0
+ self.canceled = False
+ self.failed = False
+ self.progress_posted = False
+
+ def __str__(self):
+ return self.to_json()
+
+ @property
+ def sequence_key(self):
+ return "{0:016X}".format(self.sequence)
+
+ def cancel(self):
+ self.canceled = True
+ self.fail("Operation canceled")
+
+ def fail(self, message):
+ self.failed = True
+ self.failure_message = message
+
+ def to_dict(self):
+ d = {TASK_SEQUENCE: self.sequence,
+ TASK_ID: self.task_id,
+ TASK_MESSAGE: self.message,
+ TASK_REFS: self.refs
+ }
+ if self.retry_message:
+ d[TASK_RETRY_MESSAGE] = self.retry_message
+ if self.retry_attempts:
+ d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
+ if self.retry_time:
+ d[TASK_RETRY_TIME] = self.retry_time.isoformat()
+ if self.in_progress:
+ d[TASK_IN_PROGRESS] = True
+ d[TASK_PROGRESS] = self.progress
+ if self.canceled:
+ d[TASK_CANCELED] = True
+ return d
+
+ def to_json(self):
+ return str(json.dumps(self.to_dict()))
+
+ @classmethod
+ def from_json(cls, val):
+ try:
+ d = json.loads(val)
+ action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
+ if action not in VALID_TASK_ACTIONS:
+ raise ValueError("Invalid task action: {}".format(action))
+
+ return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
+ except json.JSONDecodeError as e:
+ raise ValueError("Invalid JSON ({})".format(str(e)))
+ except KeyError as e:
+ raise ValueError("Invalid task format (missing key {})".format(str(e)))
+
+
+class TaskHandler:
+ lock = Lock()
+ condition = Condition(lock)
+ thread = None
+
+ in_progress_task = None
+ tasks_by_sequence = dict()
+ tasks_by_id = dict()
+
+ completed_tasks = []
+
+ sequence = 0
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+
+ with self.lock:
+ self.init_task_queue()
+
+ self.thread = Thread(target=self.run)
+ self.thread.start()
+
+ @property
+ def default_pool_name(self):
+ return self.module.get_ceph_option("rbd_default_pool")
+
+ def extract_pool_spec(self, pool_spec):
+ pool_spec = extract_pool_key(pool_spec)
+ if pool_spec == GLOBAL_POOL_KEY:
+ pool_spec = (self.default_pool_name, '')
+ return pool_spec
+
+ def extract_image_spec(self, image_spec):
+ match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
+ image_spec or '')
+ if not match:
+ raise ValueError("Invalid image spec: {}".format(image_spec))
+ return (match.group(1) or self.default_pool_name, match.group(2) or '',
+ match.group(3))
+
+ def run(self):
+ try:
+ self.log.info("TaskHandler: starting")
+ while True:
+ with self.lock:
+ now = datetime.now()
+ for sequence in sorted([sequence for sequence, task
+ in self.tasks_by_sequence.items()
+ if not task.retry_time or task.retry_time <= now]):
+ self.execute_task(sequence)
+
+ self.condition.wait(5)
+ self.log.debug("TaskHandler: tick")
+
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ @contextmanager
+ def open_ioctx(self, spec):
+ try:
+ with self.module.rados.open_ioctx(spec[0]) as ioctx:
+ ioctx.set_namespace(spec[1])
+ yield ioctx
+ except rados.ObjectNotFound:
+ self.log.error("Failed to locate pool {}".format(spec[0]))
+ raise
+
+ @classmethod
+ def format_image_spec(cls, image_spec):
+ image = image_spec[2]
+ if image_spec[1]:
+ image = "{}/{}".format(image_spec[1], image)
+ if image_spec[0]:
+ image = "{}/{}".format(image_spec[0], image)
+ return image
+
+ def init_task_queue(self):
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ try:
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_task_queue(ioctx, pool_name)
+
+ try:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ except rbd.OperationNotSupported:
+ self.log.debug("Namespaces not supported")
+ continue
+
+ for namespace in namespaces:
+ ioctx.set_namespace(namespace)
+ self.load_task_queue(ioctx, pool_name)
+
+ except rados.ObjectNotFound:
+ # pool DNE
+ pass
+
+ if self.tasks_by_sequence:
+ self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
+
+ self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
+ self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
+
+ def load_task_queue(self, ioctx, pool_name):
+ pool_spec = pool_name
+ if ioctx.nspace:
+ pool_spec += "/{}".format(ioctx.nspace)
+
+ start_after = ''
+ try:
+ while True:
+ with rados.ReadOpCtx() as read_op:
+ self.log.info("load_task_task: {}, start_after={}".format(
+ pool_spec, start_after))
+ it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+ ioctx.operate_read_op(read_op, RBD_TASK_OID)
+
+ it = list(it)
+ for k, v in it:
+ start_after = k
+ v = v.decode()
+ self.log.info("load_task_task: task={}".format(v))
+
+ try:
+ task = Task.from_json(v)
+ self.append_task(task)
+ except ValueError:
+ self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
+
+ if not it:
+ break
+
+ except StopIteration:
+ pass
+ except rados.ObjectNotFound:
+ # rbd_task DNE
+ pass
+
+ def append_task(self, task):
+ self.tasks_by_sequence[task.sequence] = task
+ self.tasks_by_id[task.task_id] = task
+
+ def task_refs_match(self, task_refs, refs):
+ if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
+ task_refs = task_refs.copy()
+ del task_refs[TASK_REF_IMAGE_ID]
+
+ self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
+ return task_refs == refs
+
+ def find_task(self, refs):
+ self.log.debug("find_task: refs={}".format(refs))
+
+ # search for dups and return the original
+ for task_id in reversed(sorted(self.tasks_by_id.keys())):
+ task = self.tasks_by_id[task_id]
+ if self.task_refs_match(task.refs, refs):
+ return task
+
+ # search for a completed task (message replay)
+ for task in reversed(self.completed_tasks):
+ if self.task_refs_match(task.refs, refs):
+ return task
+
+ def add_task(self, ioctx, message, refs):
+ self.log.debug("add_task: message={}, refs={}".format(message, refs))
+
+ # ensure unique uuid across all pools
+ while True:
+ task_id = str(uuid.uuid4())
+ if task_id not in self.tasks_by_id:
+ break
+
+ self.sequence += 1
+ task = Task(self.sequence, task_id, message, refs)
+
+ # add the task to the rbd_task omap
+ task_json = task.to_json()
+ omap_keys = (task.sequence_key, )
+ omap_vals = (str.encode(task_json), )
+ self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0]))
+
+ with rados.WriteOpCtx() as write_op:
+ ioctx.set_omap(write_op, omap_keys, omap_vals)
+ ioctx.operate_write_op(write_op, RBD_TASK_OID)
+ self.append_task(task)
+
+ self.condition.notify()
+ return task_json
+
+ def remove_task(self, ioctx, task, remove_in_memory=True):
+ self.log.info("remove_task: task={}".format(str(task)))
+ if ioctx:
+ try:
+ with rados.WriteOpCtx() as write_op:
+ omap_keys = (task.sequence_key, )
+ ioctx.remove_omap_keys(write_op, omap_keys)
+ ioctx.operate_write_op(write_op, RBD_TASK_OID)
+ except rados.ObjectNotFound:
+ pass
+
+ if remove_in_memory:
+ try:
+ del self.tasks_by_id[task.task_id]
+ del self.tasks_by_sequence[task.sequence]
+
+ # keep a record of the last N tasks to help avoid command replay
+ # races
+ if not task.failed and not task.canceled:
+ self.log.debug("remove_task: moving to completed tasks")
+ self.completed_tasks.append(task)
+ self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
+
+ except KeyError:
+ pass
+
+ def execute_task(self, sequence):
+ task = self.tasks_by_sequence[sequence]
+ self.log.info("execute_task: task={}".format(str(task)))
+
+ pool_valid = False
+ try:
+ with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+ pool_valid = True
+
+ action = task.refs[TASK_REF_ACTION]
+ execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
+ TASK_REF_ACTION_REMOVE: self.execute_remove,
+ TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
+ TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
+ TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
+ TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
+ }.get(action)
+ if not execute_fn:
+ self.log.error("Invalid task action: {}".format(action))
+ else:
+ task.in_progress = True
+ self.in_progress_task = task
+
+ self.lock.release()
+ try:
+ execute_fn(ioctx, task)
+
+ except rbd.OperationCanceled:
+ self.log.info("Operation canceled: task={}".format(
+ str(task)))
+
+ finally:
+ self.lock.acquire()
+
+ task.in_progress = False
+ self.in_progress_task = None
+
+ self.complete_progress(task)
+ self.remove_task(ioctx, task)
+
+ except rados.ObjectNotFound as e:
+ self.log.error("execute_task: {}".format(e))
+ if pool_valid:
+ task.retry_message = "{}".format(e)
+ self.update_progress(task, 0)
+ else:
+ # pool DNE -- remove in-memory task
+ self.complete_progress(task)
+ self.remove_task(None, task)
+
+ except (rados.Error, rbd.Error) as e:
+ self.log.error("execute_task: {}".format(e))
+ task.retry_message = "{}".format(e)
+ self.update_progress(task, 0)
+
+ finally:
+ task.in_progress = False
+ task.retry_attempts += 1
+ task.retry_time = datetime.now() + min(
+ TASK_RETRY_INTERVAL * task.retry_attempts,
+ TASK_MAX_RETRY_INTERVAL)
+
+ def progress_callback(self, task, current, total):
+ progress = float(current) / float(total)
+ self.log.debug("progress_callback: task={}, progress={}".format(
+ str(task), progress))
+
+ # avoid deadlocking when a new command comes in during a progress callback
+ if not self.lock.acquire(False):
+ return 0
+
+ try:
+ if not self.in_progress_task or self.in_progress_task.canceled:
+ return -rbd.ECANCELED
+ self.in_progress_task.progress = progress
+ finally:
+ self.lock.release()
+
+ if not task.progress_posted:
+ # delayed creation of progress event until first callback
+ self.post_progress(task, progress)
+ else:
+ self.throttled_update_progress(task, progress)
+
+ return 0
+
+ def execute_flatten(self, ioctx, task):
+ self.log.info("execute_flatten: task={}".format(str(task)))
+
+ try:
+ with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
+ image.flatten(on_progress=partial(self.progress_callback, task))
+ except rbd.InvalidArgument:
+ task.fail("Image does not have parent")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_remove(self, ioctx, task):
+ self.log.info("execute_remove: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_trash_remove(self, ioctx, task):
+ self.log.info("execute_trash_remove: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_execute(self, ioctx, task):
+ self.log.info("execute_migration_execute: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_commit(self, ioctx, task):
+ self.log.info("execute_migration_commit: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating or migration not executed")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_abort(self, ioctx, task):
+ self.log.info("execute_migration_abort: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def complete_progress(self, task):
+ if not task.progress_posted:
+ # ensure progress event exists before we complete/fail it
+ self.post_progress(task, 0)
+
+ self.log.debug("complete_progress: task={}".format(str(task)))
+ try:
+ if task.failed:
+ self.module.remote("progress", "fail", task.task_id,
+ task.failure_message)
+ else:
+ self.module.remote("progress", "complete", task.task_id)
+ except ImportError:
+ # progress module is disabled
+ pass
+
+ def _update_progress(self, task, progress):
+ self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
+ try:
+ refs = {"origin": "rbd_support"}
+ refs.update(task.refs)
+
+ self.module.remote("progress", "update", task.task_id,
+ task.message, progress, refs)
+ except ImportError:
+ # progress module is disabled
+ pass
+
+ def post_progress(self, task, progress):
+ self._update_progress(task, progress)
+ task.progress_posted = True
+
+ def update_progress(self, task, progress):
+ if task.progress_posted:
+ self._update_progress(task, progress)
+
+ @Throttle(timedelta(seconds=1))
+ def throttled_update_progress(self, task, progress):
+ self.update_progress(task, progress)
+
+ def queue_flatten(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_flatten: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+
+ try:
+ parent_image_id = image.parent_id()
+ except rbd.ImageNotFound:
+ parent_image_id = None
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if not parent_image_id:
+ raise rbd.ImageNotFound("Image {} does not have a parent".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+
+ return 0, self.add_task(ioctx,
+ "Flattening image {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ""
+
+ def queue_remove(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_remove: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+ snaps = list(image.list_snaps())
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if snaps:
+ raise rbd.ImageBusy("Image {} has snapshots".format(
+ self.format_image_spec(image_spec)), errno=errno.EBUSY)
+
+ return 0, self.add_task(ioctx,
+ "Removing image {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def queue_trash_remove(self, image_id_spec):
+ image_id_spec = self.extract_image_spec(image_id_spec)
+
+ authorize_request(self.module, image_id_spec[0], image_id_spec[1])
+ self.log.info("queue_trash_remove: {}".format(image_id_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
+ TASK_REF_POOL_NAME: image_id_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_id_spec[1],
+ TASK_REF_IMAGE_ID: image_id_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ # verify that image exists in trash
+ with self.open_ioctx(image_id_spec) as ioctx:
+ rbd.RBD().trash_get(ioctx, image_id_spec[2])
+
+ return 0, self.add_task(ioctx,
+ "Removing image {} from trash".format(
+ self.format_image_spec(image_id_spec)),
+ refs), ''
+
+ def get_migration_status(self, ioctx, image_spec):
+ try:
+ return rbd.RBD().migration_status(ioctx, image_spec[2])
+ except (rbd.InvalidArgument, rbd.ImageNotFound):
+ return None
+
+ def validate_image_migrating(self, image_spec, migration_status):
+ if not migration_status:
+ raise rbd.InvalidArgument("Image {} is not migrating".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ def resolve_pool_name(self, pool_id):
+ osd_map = self.module.get('osd_map')
+ for pool in osd_map['pools']:
+ if pool['pool'] == pool_id:
+ return pool['pool_name']
+ return '<unknown>'
+
+ def queue_migration_execute(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_execute: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
+ rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
+ raise rbd.InvalidArgument("Image {} is not in ready state".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ source_pool = self.resolve_pool_name(status['source_pool_id'])
+ dest_pool = self.resolve_pool_name(status['dest_pool_id'])
+ return 0, self.add_task(ioctx,
+ "Migrating image {} to {}".format(
+ self.format_image_spec((source_pool,
+ status['source_pool_namespace'],
+ status['source_image_name'])),
+ self.format_image_spec((dest_pool,
+ status['dest_pool_namespace'],
+ status['dest_image_name']))),
+ refs), ''
+
+ def queue_migration_commit(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_commit: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
+ raise rbd.InvalidArgument("Image {} has not completed migration".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ return 0, self.add_task(ioctx,
+ "Committing image migration for {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def queue_migration_abort(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_abort: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ return 0, self.add_task(ioctx,
+ "Aborting image migration for {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def task_cancel(self, task_id):
+ self.log.info("task_cancel: {}".format(task_id))
+
+ task = self.tasks_by_id.get(task_id)
+ if not task or not is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ return -errno.ENOENT, '', "No such task {}".format(task_id)
+
+ task.cancel()
+
+ remove_in_memory = True
+ if self.in_progress_task and self.in_progress_task.task_id == task_id:
+ self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
+ remove_in_memory = False
+
+ # complete any associated event in the progress module
+ self.complete_progress(task)
+
+ # remove from rbd_task omap
+ with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+ self.remove_task(ioctx, task, remove_in_memory)
+
+ return 0, "", ""
+
+ def task_list(self, task_id):
+ self.log.info("task_list: {}".format(task_id))
+
+ if task_id:
+ task = self.tasks_by_id.get(task_id)
+ if not task or not is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ return -errno.ENOENT, '', "No such task {}".format(task_id)
+
+ result = task.to_dict()
+ else:
+ result = []
+ for sequence in sorted(self.tasks_by_sequence.keys()):
+ task = self.tasks_by_sequence[sequence]
+ if is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ result.append(task.to_dict())
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def handle_command(self, inbuf, prefix, cmd):
+ with self.lock:
+ if prefix == 'add flatten':
+ return self.queue_flatten(cmd['image_spec'])
+ elif prefix == 'add remove':
+ return self.queue_remove(cmd['image_spec'])
+ elif prefix == 'add trash remove':
+ return self.queue_trash_remove(cmd['image_id_spec'])
+ elif prefix == 'add migration execute':
+ return self.queue_migration_execute(cmd['image_spec'])
+ elif prefix == 'add migration commit':
+ return self.queue_migration_commit(cmd['image_spec'])
+ elif prefix == 'add migration abort':
+ return self.queue_migration_abort(cmd['image_spec'])
+ elif prefix == 'cancel':
+ return self.task_cancel(cmd['task_id'])
+ elif prefix == 'list':
+ return self.task_list(cmd.get('task_id'))
+
+ raise NotImplementedError(cmd['prefix'])
diff --git a/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/src/pybind/mgr/rbd_support/trash_purge_schedule.py
new file mode 100644
index 000000000..bf5d8ae70
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/trash_purge_schedule.py
@@ -0,0 +1,287 @@
+import errno
+import json
+import rados
+import rbd
+import re
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+
+
+class TrashPurgeScheduleHandler:
+ MODULE_OPTION_NAME = "trash_purge_schedule"
+ SCHEDULE_OID = "rbd_trash_purge_schedule"
+ REFRESH_DELAY_SECONDS = 60.0
+
+ lock = Lock()
+ condition = Condition(lock)
+ thread = None
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+ self.last_refresh_pools = datetime(1970, 1, 1)
+
+ self.init_schedule_queue()
+
+ self.thread = Thread(target=self.run)
+ self.thread.start()
+
+ def run(self):
+ try:
+ self.log.info("TrashPurgeScheduleHandler: starting")
+ while True:
+ refresh_delay = self.refresh_pools()
+ with self.lock:
+ (ns_spec, wait_time) = self.dequeue()
+ if not ns_spec:
+ self.condition.wait(min(wait_time, refresh_delay))
+ continue
+ pool_id, namespace = ns_spec
+ self.trash_purge(pool_id, namespace)
+ with self.lock:
+ self.enqueue(datetime.now(), pool_id, namespace)
+
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def trash_purge(self, pool_id, namespace):
+ try:
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ ioctx.set_namespace(namespace)
+ rbd.RBD().trash_purge(ioctx, datetime.now())
+ except Exception as e:
+ self.log.error("exception when purgin {}/{}: {}".format(
+ pool_id, namespace, e))
+
+
+ def init_schedule_queue(self):
+ self.queue = {}
+ self.pools = {}
+ self.refresh_pools()
+ self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
+
+ def load_schedules(self):
+ self.log.info("TrashPurgeScheduleHandler: load_schedules")
+
+ schedules = Schedules(self)
+ schedules.load()
+ self.schedules = schedules
+
+ def refresh_pools(self):
+ elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
+ if elapsed < self.REFRESH_DELAY_SECONDS:
+ return self.REFRESH_DELAY_SECONDS - elapsed
+
+ self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
+
+ with self.lock:
+ self.load_schedules()
+ if not self.schedules:
+ self.log.debug("TrashPurgeScheduleHandler: no schedules")
+ self.pools = {}
+ self.queue = {}
+ self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ pools = {}
+
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(pool_id, pool_name)):
+ continue
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_pool(ioctx, pools)
+
+ with self.lock:
+ self.refresh_queue(pools)
+ self.pools = pools
+
+ self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ def load_pool(self, ioctx, pools):
+ pool_id = str(ioctx.get_pool_id())
+ pool_name = ioctx.get_pool_name()
+ pools[pool_id] = {}
+ pool_namespaces = ['']
+
+ self.log.debug("load_pool: {}".format(pool_name))
+
+ try:
+ pool_namespaces += rbd.RBD().namespace_list(ioctx)
+ except rbd.OperationNotSupported:
+ self.log.debug("namespaces not supported")
+ except Exception as e:
+ self.log.error("exception when scanning pool {}: {}".format(
+ pool_name, e))
+
+ for namespace in pool_namespaces:
+ pools[pool_id][namespace] = pool_name
+
+ def rebuild_queue(self):
+ now = datetime.now()
+
+ # don't remove from queue "due" images
+ now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+ for schedule_time in list(self.queue):
+ if schedule_time > now_string:
+ del self.queue[schedule_time]
+
+ if not self.schedules:
+ return
+
+ for pool_id, namespaces in self.pools.items():
+ for namespace in namespaces:
+ self.enqueue(now, pool_id, namespace)
+
+ self.condition.notify()
+
+ def refresh_queue(self, current_pools):
+ now = datetime.now()
+
+ for pool_id, namespaces in self.pools.items():
+ for namespace in namespaces:
+ if pool_id not in current_pools or \
+ namespace not in current_pools[pool_id]:
+ self.remove_from_queue(pool_id, namespace)
+
+ for pool_id, namespaces in current_pools.items():
+ for namespace in namespaces:
+ if pool_id not in self.pools or \
+ namespace not in self.pools[pool_id]:
+ self.enqueue(now, pool_id, namespace)
+
+ self.condition.notify()
+
+ def enqueue(self, now, pool_id, namespace):
+ schedule = self.schedules.find(pool_id, namespace)
+ if not schedule:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
+ pool_id, namespace))
+ return
+
+ schedule_time = schedule.next_run(now)
+ if schedule_time not in self.queue:
+ self.queue[schedule_time] = []
+ self.log.debug(
+ "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
+ pool_id, namespace, schedule_time))
+ ns_spec = (pool_id, namespace)
+ if ns_spec not in self.queue[schedule_time]:
+ self.queue[schedule_time].append((pool_id, namespace))
+
+ def dequeue(self):
+ if not self.queue:
+ return None, 1000
+
+ now = datetime.now()
+ schedule_time = sorted(self.queue)[0]
+
+ if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+ wait_time = (datetime.strptime(schedule_time,
+ "%Y-%m-%d %H:%M:%S") - now)
+ return None, wait_time.total_seconds()
+
+ namespaces = self.queue[schedule_time]
+ namespace = namespaces.pop(0)
+ if not namespaces:
+ del self.queue[schedule_time]
+ return namespace, 0
+
+ def remove_from_queue(self, pool_id, namespace):
+ self.log.debug(
+ "TrashPurgeScheduleHandler: descheduling {}/{}".format(
+ pool_id, namespace))
+
+ empty_slots = []
+ for schedule_time, namespaces in self.queue.items():
+ if (pool_id, namespace) in namespaces:
+ namespaces.remove((pool_id, namespace))
+ if not namespaces:
+ empty_slots.append(schedule_time)
+ for schedule_time in empty_slots:
+ del self.queue[schedule_time]
+
+ def add_schedule(self, level_spec, interval, start_time):
+ self.log.debug(
+ "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.add(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def remove_schedule(self, level_spec, interval, start_time):
+ self.log.debug(
+ "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.remove(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def list(self, level_spec):
+ self.log.debug(
+ "TrashPurgeScheduleHandler: list: level_spec={}".format(
+ level_spec.name))
+
+ with self.lock:
+ result = self.schedules.to_list(level_spec)
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def status(self, level_spec):
+ self.log.debug(
+ "TrashPurgeScheduleHandler: status: level_spec={}".format(
+ level_spec.name))
+
+ scheduled = []
+ with self.lock:
+ for schedule_time in sorted(self.queue):
+ for pool_id, namespace in self.queue[schedule_time]:
+ if not level_spec.matches(pool_id, namespace):
+ continue
+ pool_name = self.pools[pool_id][namespace]
+ scheduled.append({
+ 'schedule_time' : schedule_time,
+ 'pool_id' : pool_id,
+ 'pool_name' : pool_name,
+ 'namespace' : namespace
+ })
+ return 0, json.dumps({'scheduled' : scheduled}, indent=4,
+ sort_keys=True), ""
+
+ def handle_command(self, inbuf, prefix, cmd):
+ level_spec_name = cmd.get('level_spec', "")
+
+ try:
+ level_spec = LevelSpec.from_name(self, level_spec_name,
+ allow_image_level=False)
+ except ValueError as e:
+ return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
+ level_spec_name, e)
+
+ if prefix == 'add':
+ return self.add_schedule(level_spec, cmd['interval'],
+ cmd.get('start_time'))
+ elif prefix == 'remove':
+ return self.remove_schedule(level_spec, cmd.get('interval'),
+ cmd.get('start_time'))
+ elif prefix == 'list':
+ return self.list(level_spec)
+ elif prefix == 'status':
+ return self.status(level_spec)
+
+ raise NotImplementedError(cmd['prefix'])