diff options
Diffstat (limited to 'src/pybind/mgr/rbd_support')
-rw-r--r-- | src/pybind/mgr/rbd_support/__init__.py | 2 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/common.py | 48 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py | 617 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/module.py | 321 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/perf.py | 524 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/schedule.py | 579 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/task.py | 857 | ||||
-rw-r--r-- | src/pybind/mgr/rbd_support/trash_purge_schedule.py | 282 |
8 files changed, 3230 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/__init__.py b/src/pybind/mgr/rbd_support/__init__.py new file mode 100644 index 000000000..ee85dc9d3 --- /dev/null +++ b/src/pybind/mgr/rbd_support/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .module import Module diff --git a/src/pybind/mgr/rbd_support/common.py b/src/pybind/mgr/rbd_support/common.py new file mode 100644 index 000000000..a6c041bf7 --- /dev/null +++ b/src/pybind/mgr/rbd_support/common.py @@ -0,0 +1,48 @@ +import re + +from typing import Dict, Optional, Tuple, TYPE_CHECKING, Union + + +GLOBAL_POOL_KEY = (None, None) + + +class NotAuthorizedError(Exception): + pass + + +if TYPE_CHECKING: + from rbd_support.module import Module + + +def is_authorized(module: 'Module', + pool: Optional[str], + namespace: Optional[str]) -> bool: + return module.is_authorized({"pool": pool or '', + "namespace": namespace or ''}) + + +def authorize_request(module: 'Module', + pool: Optional[str], + namespace: Optional[str]) -> None: + if not is_authorized(module, pool, namespace): + raise NotAuthorizedError("not authorized on pool={}, namespace={}".format( + pool, namespace)) + + +PoolKeyT = Union[Tuple[str, str], Tuple[None, None]] + + +def extract_pool_key(pool_spec: Optional[str]) -> PoolKeyT: + if not pool_spec: + return GLOBAL_POOL_KEY + + match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec) + if not match: + raise ValueError("Invalid pool spec: {}".format(pool_spec)) + return (match.group(1), match.group(2) or '') + + +def get_rbd_pools(module: 'Module') -> Dict[int, str]: + osd_map = module.get('osd_map') + return {pool['pool']: pool['pool_name'] for pool in osd_map['pools'] + if 'rbd' in pool.get('application_metadata', {})} diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py new file mode 100644 index 000000000..e5b19f362 --- /dev/null +++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py @@ -0,0 +1,617 @@ +import errno +import json +import rados +import rbd +import traceback + +from datetime import datetime +from threading import Condition, Lock, Thread +from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union + +from .common import get_rbd_pools +from .schedule import LevelSpec, Schedules + + +def namespace_validator(ioctx: rados.Ioctx) -> None: + mode = rbd.RBD().mirror_mode_get(ioctx) + if mode != rbd.RBD_MIRROR_MODE_IMAGE: + raise ValueError("namespace {} is not in mirror image mode".format( + ioctx.get_namespace())) + + +def image_validator(image: rbd.Image) -> None: + mode = image.mirror_image_get_mode() + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + raise rbd.InvalidArgument("Invalid mirror image mode") + + +class ImageSpec(NamedTuple): + pool_id: str + namespace: str + image_id: str + + +class CreateSnapshotRequests: + + def __init__(self, handler: Any) -> None: + self.lock = Lock() + self.condition = Condition(self.lock) + self.handler = handler + self.rados = handler.module.rados + self.log = handler.log + self.pending: Set[ImageSpec] = set() + self.queue: List[ImageSpec] = [] + self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {} + + def wait_for_pending(self) -> None: + with self.lock: + while self.pending: + self.log.debug( + "CreateSnapshotRequests.wait_for_pending: " + "{} images".format(len(self.pending))) + self.condition.wait() + self.log.debug("CreateSnapshotRequests.wait_for_pending: done") + + def add(self, pool_id: str, namespace: str, image_id: str) -> None: + image_spec = ImageSpec(pool_id, namespace, image_id) + + self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format( + pool_id, namespace, image_id)) + + max_concurrent = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE) + + with self.lock: + if image_spec in self.pending: + self.log.info( + "CreateSnapshotRequests.add: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "previous request is still in progress")) + return + self.pending.add(image_spec) + + if len(self.pending) > max_concurrent: + self.queue.append(image_spec) + return + + self.open_image(image_spec) + + def open_image(self, image_spec: ImageSpec) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format( + pool_id, namespace, image_id)) + + try: + ioctx = self.get_ioctx(image_spec) + + def cb(comp: rados.Completion, image: rbd.Image) -> None: + self.handle_open_image(image_spec, comp, image) + + rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id) + except Exception as e: + self.log.error( + "exception when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_open_image(self, + image_spec: ImageSpec, + comp: rados.Completion, + image: rbd.Image) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.finish(image_spec) + return + + self.get_mirror_mode(image_spec, image) + + def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp: rados.Completion, mode: Optional[int]) -> None: + self.handle_get_mirror_mode(image_spec, image, comp, mode) + + try: + image.aio_mirror_image_get_mode(cb) + except Exception as e: + self.log.error( + "exception when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_mode(self, + image_spec: ImageSpec, + image: rbd.Image, + comp: rados.Completion, + mode: Optional[int]) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format( + pool_id, namespace, image_id, comp.get_return_value(), mode)) + + if mode is None: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "snapshot mirroring is not enabled")) + self.close_image(image_spec, image) + return + + self.get_mirror_info(image_spec, image) + + def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp: rados.Completion, info: Optional[Dict[str, Union[str, int]]]) -> None: + self.handle_get_mirror_info(image_spec, image, comp, info) + + try: + image.aio_mirror_image_get_info(cb) + except Exception as e: + self.log.error( + "exception when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_info(self, + image_spec: ImageSpec, + image: rbd.Image, + comp: rados.Completion, + info: Optional[Dict[str, Union[str, int]]]) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format( + pool_id, namespace, image_id, comp.get_return_value(), info)) + + if info is None: + if comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + if not info['primary']: + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "is not primary")) + self.close_image(image_spec, image) + return + + self.create_snapshot(image_spec, image) + + def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp: rados.Completion, snap_id: Optional[int]) -> None: + self.handle_create_snapshot(image_spec, image, comp, snap_id) + + try: + image.aio_mirror_image_create_snapshot(0, cb) + except Exception as e: + self.log.error( + "exception when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_create_snapshot(self, + image_spec: ImageSpec, + image: rbd.Image, + comp: rados.Completion, + snap_id: Optional[int]) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format( + pool_id, namespace, image_id, comp.get_return_value(), snap_id)) + + if snap_id is None and comp.get_return_value() != -errno.ENOENT: + self.log.error( + "error when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.close_image(image_spec, image) + + def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.close_image {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp: rados.Completion) -> None: + self.handle_close_image(image_spec, comp) + + try: + image.aio_close(cb) + except Exception as e: + self.log.error( + "exception when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_close_image(self, + image_spec: ImageSpec, + comp: rados.Completion) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + self.log.error( + "error when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.finish(image_spec) + + def finish(self, image_spec: ImageSpec) -> None: + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format( + pool_id, namespace, image_id)) + + self.put_ioctx(image_spec) + + with self.lock: + self.pending.remove(image_spec) + self.condition.notify() + if not self.queue: + return + image_spec = self.queue.pop(0) + + self.open_image(image_spec) + + def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx: + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs.get(nspec, (None, None)) + if not ioctx: + ioctx = self.rados.open_ioctx2(int(pool_id)) + ioctx.set_namespace(namespace) + images = set() + self.ioctxs[nspec] = (ioctx, images) + assert images is not None + images.add(image_spec) + + return ioctx + + def put_ioctx(self, image_spec: ImageSpec) -> None: + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs[nspec] + images.remove(image_spec) + if not images: + del self.ioctxs[nspec] + + +class MirrorSnapshotScheduleHandler: + MODULE_OPTION_NAME = "mirror_snapshot_schedule" + MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create" + SCHEDULE_OID = "rbd_mirror_snapshot_schedule" + REFRESH_DELAY_SECONDS = 60.0 + + def __init__(self, module: Any) -> None: + self.lock = Lock() + self.condition = Condition(self.lock) + self.module = module + self.log = module.log + self.last_refresh_images = datetime(1970, 1, 1) + self.create_snapshot_requests = CreateSnapshotRequests(self) + + self.stop_thread = False + self.thread = Thread(target=self.run) + + def setup(self) -> None: + self.init_schedule_queue() + self.thread.start() + + def shutdown(self) -> None: + self.log.info("MirrorSnapshotScheduleHandler: shutting down") + self.stop_thread = True + if self.thread.is_alive(): + self.log.debug("MirrorSnapshotScheduleHandler: joining thread") + self.thread.join() + self.create_snapshot_requests.wait_for_pending() + self.log.info("MirrorSnapshotScheduleHandler: shut down") + + def run(self) -> None: + try: + self.log.info("MirrorSnapshotScheduleHandler: starting") + while not self.stop_thread: + refresh_delay = self.refresh_images() + with self.lock: + (image_spec, wait_time) = self.dequeue() + if not image_spec: + self.condition.wait(min(wait_time, refresh_delay)) + continue + pool_id, namespace, image_id = image_spec + self.create_snapshot_requests.add(pool_id, namespace, image_id) + with self.lock: + self.enqueue(datetime.now(), pool_id, namespace, image_id) + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted") + self.module.client_blocklisted.set() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def init_schedule_queue(self) -> None: + # schedule_time => image_spec + self.queue: Dict[str, List[ImageSpec]] = {} + # pool_id => {namespace => image_id} + self.images: Dict[str, Dict[str, Dict[str, str]]] = {} + self.schedules = Schedules(self) + self.refresh_images() + self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized") + + def load_schedules(self) -> None: + self.log.info("MirrorSnapshotScheduleHandler: load_schedules") + self.schedules.load(namespace_validator, image_validator) + + def refresh_images(self) -> float: + elapsed = (datetime.now() - self.last_refresh_images).total_seconds() + if elapsed < self.REFRESH_DELAY_SECONDS: + return self.REFRESH_DELAY_SECONDS - elapsed + + self.log.debug("MirrorSnapshotScheduleHandler: refresh_images") + + with self.lock: + self.load_schedules() + if not self.schedules: + self.log.debug("MirrorSnapshotScheduleHandler: no schedules") + self.images = {} + self.queue = {} + self.last_refresh_images = datetime.now() + return self.REFRESH_DELAY_SECONDS + + images: Dict[str, Dict[str, Dict[str, str]]] = {} + + for pool_id, pool_name in get_rbd_pools(self.module).items(): + if not self.schedules.intersects( + LevelSpec.from_pool_spec(pool_id, pool_name)): + continue + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_pool_images(ioctx, images) + + with self.lock: + self.refresh_queue(images) + self.images = images + + self.last_refresh_images = datetime.now() + return self.REFRESH_DELAY_SECONDS + + def load_pool_images(self, + ioctx: rados.Ioctx, + images: Dict[str, Dict[str, Dict[str, str]]]) -> None: + pool_id = str(ioctx.get_pool_id()) + pool_name = ioctx.get_pool_name() + images[pool_id] = {} + + self.log.debug("load_pool_images: pool={}".format(pool_name)) + + try: + namespaces = [''] + rbd.RBD().namespace_list(ioctx) + for namespace in namespaces: + if not self.schedules.intersects( + LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)): + continue + self.log.debug("load_pool_images: pool={}, namespace={}".format( + pool_name, namespace)) + images[pool_id][namespace] = {} + ioctx.set_namespace(namespace) + mirror_images = dict(rbd.RBD().mirror_image_info_list( + ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) + if not mirror_images: + continue + image_names = dict( + [(x['id'], x['name']) for x in filter( + lambda x: x['id'] in mirror_images, + rbd.RBD().list2(ioctx))]) + for image_id, info in mirror_images.items(): + if not info['primary']: + continue + image_name = image_names.get(image_id) + if not image_name: + continue + if namespace: + name = "{}/{}/{}".format(pool_name, namespace, + image_name) + else: + name = "{}/{}".format(pool_name, image_name) + self.log.debug( + "load_pool_images: adding image {}".format(name)) + images[pool_id][namespace][image_id] = name + except rbd.ConnectionShutdown: + raise + except Exception as e: + self.log.error( + "load_pool_images: exception when scanning pool {}: {}".format( + pool_name, e)) + + def rebuild_queue(self) -> None: + now = datetime.now() + + # don't remove from queue "due" images + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] + + if not self.schedules: + return + + for pool_id in self.images: + for namespace in self.images[pool_id]: + for image_id in self.images[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, image_id) + + self.condition.notify() + + def refresh_queue(self, + current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None: + now = datetime.now() + + for pool_id in self.images: + for namespace in self.images[pool_id]: + for image_id in self.images[pool_id][namespace]: + if pool_id not in current_images or \ + namespace not in current_images[pool_id] or \ + image_id not in current_images[pool_id][namespace]: + self.remove_from_queue(pool_id, namespace, image_id) + + for pool_id in current_images: + for namespace in current_images[pool_id]: + for image_id in current_images[pool_id][namespace]: + if pool_id not in self.images or \ + namespace not in self.images[pool_id] or \ + image_id not in self.images[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, image_id) + + self.condition.notify() + + def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None: + schedule = self.schedules.find(pool_id, namespace, image_id) + if not schedule: + self.log.debug( + "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format( + pool_id, namespace, image_id)) + return + + schedule_time = schedule.next_run(now) + if schedule_time not in self.queue: + self.queue[schedule_time] = [] + self.log.debug( + "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format( + pool_id, namespace, image_id, schedule_time)) + image_spec = ImageSpec(pool_id, namespace, image_id) + if image_spec not in self.queue[schedule_time]: + self.queue[schedule_time].append(image_spec) + + def dequeue(self) -> Tuple[Optional[ImageSpec], float]: + if not self.queue: + return None, 1000.0 + + now = datetime.now() + schedule_time = sorted(self.queue)[0] + + if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: + wait_time = (datetime.strptime(schedule_time, + "%Y-%m-%d %H:%M:%S") - now) + return None, wait_time.total_seconds() + + images = self.queue[schedule_time] + image = images.pop(0) + if not images: + del self.queue[schedule_time] + return image, 0.0 + + def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None: + self.log.debug( + "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format( + pool_id, namespace, image_id)) + + empty_slots = [] + image_spec = ImageSpec(pool_id, namespace, image_id) + for schedule_time, images in self.queue.items(): + if image_spec in images: + images.remove(image_spec) + if not images: + empty_slots.append(schedule_time) + for schedule_time in empty_slots: + del self.queue[schedule_time] + + def add_schedule(self, + level_spec: LevelSpec, + interval: str, + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.add(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def remove_schedule(self, + level_spec: LevelSpec, + interval: Optional[str], + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.remove(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "MirrorSnapshotScheduleHandler: list: level_spec={}".format( + level_spec.name)) + + with self.lock: + result = self.schedules.to_list(level_spec) + + return 0, json.dumps(result, indent=4, sort_keys=True), "" + + def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "MirrorSnapshotScheduleHandler: status: level_spec={}".format( + level_spec.name)) + + scheduled_images = [] + with self.lock: + for schedule_time in sorted(self.queue): + for pool_id, namespace, image_id in self.queue[schedule_time]: + if not level_spec.matches(pool_id, namespace, image_id): + continue + image_name = self.images[pool_id][namespace][image_id] + scheduled_images.append({ + 'schedule_time': schedule_time, + 'image': image_name + }) + return 0, json.dumps({'scheduled_images': scheduled_images}, + indent=4, sort_keys=True), "" diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py new file mode 100644 index 000000000..369face03 --- /dev/null +++ b/src/pybind/mgr/rbd_support/module.py @@ -0,0 +1,321 @@ +""" +RBD support module +""" + +import enum +import errno +import functools +import inspect +import rados +import rbd +import traceback +from typing import cast, Any, Callable, Optional, Tuple, TypeVar + +from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option +from threading import Thread, Event + +from .common import NotAuthorizedError +from .mirror_snapshot_schedule import image_validator, namespace_validator, \ + LevelSpec, MirrorSnapshotScheduleHandler +from .perf import PerfHandler, OSD_PERF_QUERY_COUNTERS +from .task import TaskHandler +from .trash_purge_schedule import TrashPurgeScheduleHandler + + +class ImageSortBy(enum.Enum): + write_ops = 'write_ops' + write_bytes = 'write_bytes' + write_latency = 'write_latency' + read_ops = 'read_ops' + read_bytes = 'read_bytes' + read_latency = 'read_latency' + + +FuncT = TypeVar('FuncT', bound=Callable) + + +def with_latest_osdmap(func: FuncT) -> FuncT: + @functools.wraps(func) + def wrapper(self: 'Module', *args: Any, **kwargs: Any) -> Tuple[int, str, str]: + if not self.module_ready: + return (-errno.EAGAIN, "", + "rbd_support module is not ready, try again") + # ensure we have latest pools available + self.rados.wait_for_latest_osdmap() + try: + try: + return func(self, *args, **kwargs) + except NotAuthorizedError: + raise + except Exception: + # log the full traceback but don't send it to the CLI user + self.log.exception("Fatal runtime error: ") + raise + except (rados.ConnectionShutdown, rbd.ConnectionShutdown) as ex: + self.log.debug("with_latest_osdmap: client blocklisted") + self.client_blocklisted.set() + return -errno.EAGAIN, "", str(ex) + except rados.Error as ex: + return -ex.errno, "", str(ex) + except rbd.OSError as ex: + return -ex.errno, "", str(ex) + except rbd.Error as ex: + return -errno.EINVAL, "", str(ex) + except KeyError as ex: + return -errno.ENOENT, "", str(ex) + except ValueError as ex: + return -errno.EINVAL, "", str(ex) + except NotAuthorizedError as ex: + return -errno.EACCES, "", str(ex) + + wrapper.__signature__ = inspect.signature(func) # type: ignore[attr-defined] + return cast(FuncT, wrapper) + + +class Module(MgrModule): + MODULE_OPTIONS = [ + Option(name=MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME), + Option(name=MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, + type='int', + default=10), + Option(name=TrashPurgeScheduleHandler.MODULE_OPTION_NAME), + ] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(Module, self).__init__(*args, **kwargs) + self.client_blocklisted = Event() + self.module_ready = False + self.init_handlers() + self.recovery_thread = Thread(target=self.run) + self.recovery_thread.start() + + def init_handlers(self) -> None: + self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self) + self.perf = PerfHandler(self) + self.task = TaskHandler(self) + self.trash_purge_schedule = TrashPurgeScheduleHandler(self) + + def setup_handlers(self) -> None: + self.log.info("starting setup") + # new RADOS client is created and registered in the MgrMap + # implicitly here as 'rados' is a property attribute. + self.rados.wait_for_latest_osdmap() + self.mirror_snapshot_schedule.setup() + self.perf.setup() + self.task.setup() + self.trash_purge_schedule.setup() + self.log.info("setup complete") + self.module_ready = True + + def run(self) -> None: + self.log.info("recovery thread starting") + try: + while True: + try: + self.setup_handlers() + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("setup_handlers: client blocklisted") + self.log.info("recovering from double blocklisting") + else: + # block until RADOS client is blocklisted + self.client_blocklisted.wait() + self.log.info("recovering from blocklisting") + self.shutdown() + self.client_blocklisted.clear() + self.init_handlers() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def shutdown(self) -> None: + self.module_ready = False + self.mirror_snapshot_schedule.shutdown() + self.trash_purge_schedule.shutdown() + self.task.shutdown() + self.perf.shutdown() + # shut down client and deregister it from MgrMap + super().shutdown() + + @CLIWriteCommand('rbd mirror snapshot schedule add') + @with_latest_osdmap + def mirror_snapshot_schedule_add(self, + level_spec: str, + interval: str, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Add rbd mirror snapshot schedule + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator) + return self.mirror_snapshot_schedule.add_schedule(spec, interval, start_time) + + @CLIWriteCommand('rbd mirror snapshot schedule remove') + @with_latest_osdmap + def mirror_snapshot_schedule_remove(self, + level_spec: str, + interval: Optional[str] = None, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Remove rbd mirror snapshot schedule + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator) + return self.mirror_snapshot_schedule.remove_schedule(spec, interval, start_time) + + @CLIReadCommand('rbd mirror snapshot schedule list') + @with_latest_osdmap + def mirror_snapshot_schedule_list(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + List rbd mirror snapshot schedule + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator) + return self.mirror_snapshot_schedule.list(spec) + + @CLIReadCommand('rbd mirror snapshot schedule status') + @with_latest_osdmap + def mirror_snapshot_schedule_status(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + Show rbd mirror snapshot schedule status + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator) + return self.mirror_snapshot_schedule.status(spec) + + @CLIReadCommand('rbd perf image stats') + @with_latest_osdmap + def perf_image_stats(self, + pool_spec: Optional[str] = None, + sort_by: Optional[ImageSortBy] = None) -> Tuple[int, str, str]: + """ + Retrieve current RBD IO performance stats + """ + with self.perf.lock: + sort_by_name = sort_by.name if sort_by else OSD_PERF_QUERY_COUNTERS[0] + return self.perf.get_perf_stats(pool_spec, sort_by_name) + + @CLIReadCommand('rbd perf image counters') + @with_latest_osdmap + def perf_image_counters(self, + pool_spec: Optional[str] = None, + sort_by: Optional[ImageSortBy] = None) -> Tuple[int, str, str]: + """ + Retrieve current RBD IO performance counters + """ + with self.perf.lock: + sort_by_name = sort_by.name if sort_by else OSD_PERF_QUERY_COUNTERS[0] + return self.perf.get_perf_counters(pool_spec, sort_by_name) + + @CLIWriteCommand('rbd task add flatten') + @with_latest_osdmap + def task_add_flatten(self, image_spec: str) -> Tuple[int, str, str]: + """ + Flatten a cloned image asynchronously in the background + """ + with self.task.lock: + return self.task.queue_flatten(image_spec) + + @CLIWriteCommand('rbd task add remove') + @with_latest_osdmap + def task_add_remove(self, image_spec: str) -> Tuple[int, str, str]: + """ + Remove an image asynchronously in the background + """ + with self.task.lock: + return self.task.queue_remove(image_spec) + + @CLIWriteCommand('rbd task add trash remove') + @with_latest_osdmap + def task_add_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]: + """ + Remove an image from the trash asynchronously in the background + """ + with self.task.lock: + return self.task.queue_trash_remove(image_id_spec) + + @CLIWriteCommand('rbd task add migration execute') + @with_latest_osdmap + def task_add_migration_execute(self, image_spec: str) -> Tuple[int, str, str]: + """ + Execute an image migration asynchronously in the background + """ + with self.task.lock: + return self.task.queue_migration_execute(image_spec) + + @CLIWriteCommand('rbd task add migration commit') + @with_latest_osdmap + def task_add_migration_commit(self, image_spec: str) -> Tuple[int, str, str]: + """ + Commit an executed migration asynchronously in the background + """ + with self.task.lock: + return self.task.queue_migration_commit(image_spec) + + @CLIWriteCommand('rbd task add migration abort') + @with_latest_osdmap + def task_add_migration_abort(self, image_spec: str) -> Tuple[int, str, str]: + """ + Abort a prepared migration asynchronously in the background + """ + with self.task.lock: + return self.task.queue_migration_abort(image_spec) + + @CLIWriteCommand('rbd task cancel') + @with_latest_osdmap + def task_cancel(self, task_id: str) -> Tuple[int, str, str]: + """ + Cancel a pending or running asynchronous task + """ + with self.task.lock: + return self.task.task_cancel(task_id) + + @CLIReadCommand('rbd task list') + @with_latest_osdmap + def task_list(self, task_id: Optional[str] = None) -> Tuple[int, str, str]: + """ + List pending or running asynchronous tasks + """ + with self.task.lock: + return self.task.task_list(task_id) + + @CLIWriteCommand('rbd trash purge schedule add') + @with_latest_osdmap + def trash_purge_schedule_add(self, + level_spec: str, + interval: str, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Add rbd trash purge schedule + """ + spec = LevelSpec.from_name(self, level_spec, allow_image_level=False) + return self.trash_purge_schedule.add_schedule(spec, interval, start_time) + + @CLIWriteCommand('rbd trash purge schedule remove') + @with_latest_osdmap + def trash_purge_schedule_remove(self, + level_spec: str, + interval: Optional[str] = None, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Remove rbd trash purge schedule + """ + spec = LevelSpec.from_name(self, level_spec, allow_image_level=False) + return self.trash_purge_schedule.remove_schedule(spec, interval, start_time) + + @CLIReadCommand('rbd trash purge schedule list') + @with_latest_osdmap + def trash_purge_schedule_list(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + List rbd trash purge schedule + """ + spec = LevelSpec.from_name(self, level_spec, allow_image_level=False) + return self.trash_purge_schedule.list(spec) + + @CLIReadCommand('rbd trash purge schedule status') + @with_latest_osdmap + def trash_purge_schedule_status(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + Show rbd trash purge schedule status + """ + spec = LevelSpec.from_name(self, level_spec, allow_image_level=False) + return self.trash_purge_schedule.status(spec) diff --git a/src/pybind/mgr/rbd_support/perf.py b/src/pybind/mgr/rbd_support/perf.py new file mode 100644 index 000000000..20815721d --- /dev/null +++ b/src/pybind/mgr/rbd_support/perf.py @@ -0,0 +1,524 @@ +import errno +import json +import rados +import rbd +import time +import traceback + +from datetime import datetime, timedelta +from threading import Condition, Lock, Thread +from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, Union + +from .common import (GLOBAL_POOL_KEY, authorize_request, extract_pool_key, + get_rbd_pools, PoolKeyT) + +QUERY_POOL_ID = "pool_id" +QUERY_POOL_ID_MAP = "pool_id_map" +QUERY_IDS = "query_ids" +QUERY_SUM_POOL_COUNTERS = "pool_counters" +QUERY_RAW_POOL_COUNTERS = "raw_pool_counters" +QUERY_LAST_REQUEST = "last_request" + +OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$' +OSD_PERF_QUERY_COUNTERS = ['write_ops', + 'read_ops', + 'write_bytes', + 'read_bytes', + 'write_latency', + 'read_latency'] +OSD_PERF_QUERY_COUNTERS_INDICES = { + OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))} + +OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5] +OSD_PERF_QUERY_MAX_RESULTS = 256 + +POOL_REFRESH_INTERVAL = timedelta(minutes=5) +QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) +STATS_RATE_INTERVAL = timedelta(minutes=1) + +REPORT_MAX_RESULTS = 64 + + +# {(pool_id, namespace)...} +ResolveImageNamesT = Set[Tuple[int, str]] + +# (time, [value,...]) +PerfCounterT = Tuple[int, List[int]] +# current, previous +RawImageCounterT = Tuple[PerfCounterT, Optional[PerfCounterT]] +# image_id => perf_counter +RawImagesCounterT = Dict[str, RawImageCounterT] +# namespace_counters => raw_images +RawNamespacesCountersT = Dict[str, RawImagesCounterT] +# pool_id => namespaces_counters +RawPoolCountersT = Dict[int, RawNamespacesCountersT] + +SumImageCounterT = List[int] +# image_id => sum_image +SumImagesCounterT = Dict[str, SumImageCounterT] +# namespace => sum_images +SumNamespacesCountersT = Dict[str, SumImagesCounterT] +# pool_id, sum_namespaces +SumPoolCountersT = Dict[int, SumNamespacesCountersT] + +ExtractDataFuncT = Callable[[int, Optional[RawImageCounterT], SumImageCounterT], float] + + +class PerfHandler: + + @classmethod + def prepare_regex(cls, value: Any) -> str: + return '^({})$'.format(value) + + @classmethod + def prepare_osd_perf_query(cls, + pool_id: Optional[int], + namespace: Optional[str], + counter_type: str) -> Dict[str, Any]: + pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL + namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL + if pool_id: + pool_id_regex = cls.prepare_regex(pool_id) + if namespace: + namespace_regex = cls.prepare_regex(namespace) + + return { + 'key_descriptor': [ + {'type': 'pool_id', 'regex': pool_id_regex}, + {'type': 'namespace', 'regex': namespace_regex}, + {'type': 'object_name', + 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'}, + ], + 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS, + 'limit': {'order_by': counter_type, + 'max_count': OSD_PERF_QUERY_MAX_RESULTS}, + } + + @classmethod + def pool_spec_search_keys(cls, pool_key: str) -> List[str]: + return [pool_key[0:len(pool_key) - x] + for x in range(0, len(pool_key) + 1)] + + @classmethod + def submatch_pool_key(cls, pool_key: PoolKeyT, search_key: str) -> bool: + return ((pool_key[1] == search_key[1] or not search_key[1]) + and (pool_key[0] == search_key[0] or not search_key[0])) + + def __init__(self, module: Any) -> None: + self.user_queries: Dict[PoolKeyT, Dict[str, Any]] = {} + self.image_cache: Dict[str, str] = {} + + self.lock = Lock() + self.query_condition = Condition(self.lock) + self.refresh_condition = Condition(self.lock) + + self.image_name_cache: Dict[Tuple[int, str], Dict[str, str]] = {} + self.image_name_refresh_time = datetime.fromtimestamp(0) + + self.module = module + self.log = module.log + + self.stop_thread = False + self.thread = Thread(target=self.run) + + def setup(self) -> None: + self.thread.start() + + def shutdown(self) -> None: + self.log.info("PerfHandler: shutting down") + self.stop_thread = True + if self.thread.is_alive(): + self.log.debug("PerfHandler: joining thread") + self.thread.join() + self.log.info("PerfHandler: shut down") + + def run(self) -> None: + try: + self.log.info("PerfHandler: starting") + while not self.stop_thread: + with self.lock: + self.scrub_expired_queries() + self.process_raw_osd_perf_counters() + self.refresh_condition.notify() + + stats_period = self.module.get_ceph_option("mgr_stats_period") + self.query_condition.wait(stats_period) + + self.log.debug("PerfHandler: tick") + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("PerfHandler: client blocklisted") + self.module.client_blocklisted.set() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def merge_raw_osd_perf_counters(self, + pool_key: PoolKeyT, + query: Dict[str, Any], + now_ts: int, + resolve_image_names: ResolveImageNamesT) -> RawPoolCountersT: + pool_id_map = query[QUERY_POOL_ID_MAP] + + # collect and combine the raw counters from all sort orders + raw_pool_counters: Dict[int, Dict[str, Dict[str, Any]]] = query.setdefault(QUERY_RAW_POOL_COUNTERS, {}) + for query_id in query[QUERY_IDS]: + res = self.module.get_osd_perf_counters(query_id) + for counter in res['counters']: + # replace pool id from object name if it exists + k = counter['k'] + pool_id = int(k[2][0]) if k[2][0] else int(k[0][0]) + namespace = k[1][0] + image_id = k[2][1] + + # ignore metrics from non-matching pools/namespaces + if pool_id not in pool_id_map: + continue + if pool_key[1] is not None and pool_key[1] != namespace: + continue + + # flag the pool (and namespace) for refresh if we cannot find + # image name in the cache + resolve_image_key = (pool_id, namespace) + if image_id not in self.image_name_cache.get(resolve_image_key, {}): + resolve_image_names.add(resolve_image_key) + + # copy the 'sum' counter values for each image (ignore count) + # if we haven't already processed it for this round + raw_namespaces = raw_pool_counters.setdefault(pool_id, {}) + raw_images = raw_namespaces.setdefault(namespace, {}) + raw_image = raw_images.get(image_id) + # save the last two perf counters for each image + new_current = (now_ts, [int(x[0]) for x in counter['c']]) + if raw_image: + old_current, _ = raw_image + if old_current[0] < now_ts: + raw_images[image_id] = (new_current, old_current) + else: + raw_images[image_id] = (new_current, None) + + self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters)) + return raw_pool_counters + + def sum_osd_perf_counters(self, + query: Dict[str, dict], + raw_pool_counters: RawPoolCountersT, + now_ts: int) -> SumPoolCountersT: + # update the cumulative counters for each image + sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {}) + for pool_id, raw_namespaces in raw_pool_counters.items(): + sum_namespaces = sum_pool_counters.setdefault(pool_id, {}) + for namespace, raw_images in raw_namespaces.items(): + sum_namespace = sum_namespaces.setdefault(namespace, {}) + for image_id, raw_image in raw_images.items(): + # zero-out non-updated raw counters + if not raw_image[0]: + continue + old_current, _ = raw_image + if old_current[0] < now_ts: + new_current = (now_ts, [0] * len(old_current[1])) + raw_images[image_id] = (new_current, old_current) + continue + + counters = old_current[1] + + # copy raw counters if this is a newly discovered image or + # increment existing counters + sum_image = sum_namespace.setdefault(image_id, None) + if sum_image: + for i in range(len(counters)): + sum_image[i] += counters[i] + else: + sum_namespace[image_id] = [x for x in counters] + + self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters)) + return sum_pool_counters + + def refresh_image_names(self, resolve_image_names: ResolveImageNamesT) -> None: + for pool_id, namespace in resolve_image_names: + image_key = (pool_id, namespace) + images = self.image_name_cache.setdefault(image_key, {}) + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + ioctx.set_namespace(namespace) + for image_meta in rbd.RBD().list2(ioctx): + images[image_meta['id']] = image_meta['name'] + self.log.debug("resolve_image_names: {}={}".format(image_key, images)) + + def scrub_missing_images(self) -> None: + for pool_key, query in self.user_queries.items(): + raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {}) + sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {}) + for pool_id, sum_namespaces in sum_pool_counters.items(): + raw_namespaces = raw_pool_counters.get(pool_id, {}) + for namespace, sum_images in sum_namespaces.items(): + raw_images = raw_namespaces.get(namespace, {}) + + image_key = (pool_id, namespace) + image_names = self.image_name_cache.get(image_key, {}) + for image_id in list(sum_images.keys()): + # scrub image counters if we failed to resolve image name + if image_id not in image_names: + self.log.debug("scrub_missing_images: dropping {}/{}".format( + image_key, image_id)) + del sum_images[image_id] + if image_id in raw_images: + del raw_images[image_id] + + def process_raw_osd_perf_counters(self) -> None: + now = datetime.now() + now_ts = int(now.strftime("%s")) + + # clear the image name cache if we need to refresh all active pools + if self.image_name_cache and \ + self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now: + self.log.debug("process_raw_osd_perf_counters: expiring image name cache") + self.image_name_cache = {} + + resolve_image_names: Set[Tuple[int, str]] = set() + for pool_key, query in self.user_queries.items(): + if not query[QUERY_IDS]: + continue + + raw_pool_counters = self.merge_raw_osd_perf_counters( + pool_key, query, now_ts, resolve_image_names) + self.sum_osd_perf_counters(query, raw_pool_counters, now_ts) + + if resolve_image_names: + self.image_name_refresh_time = now + self.refresh_image_names(resolve_image_names) + self.scrub_missing_images() + elif not self.image_name_cache: + self.scrub_missing_images() + + def resolve_pool_id(self, pool_name: str) -> int: + pool_id = self.module.rados.pool_lookup(pool_name) + if not pool_id: + raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name), + errno.ENOENT) + return pool_id + + def scrub_expired_queries(self) -> None: + # perf counters need to be periodically refreshed to continue + # to be registered + expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL + for pool_key in list(self.user_queries.keys()): + user_query = self.user_queries[pool_key] + if user_query[QUERY_LAST_REQUEST] < expire_time: + self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS]) + del self.user_queries[pool_key] + + def register_osd_perf_queries(self, + pool_id: Optional[int], + namespace: Optional[str]) -> List[int]: + query_ids = [] + try: + for counter in OSD_PERF_QUERY_COUNTERS: + query = self.prepare_osd_perf_query(pool_id, namespace, counter) + self.log.debug("register_osd_perf_queries: {}".format(query)) + + query_id = self.module.add_osd_perf_query(query) + if query_id is None: + raise RuntimeError('Failed to add OSD perf query: {}'.format(query)) + query_ids.append(query_id) + + except Exception: + for query_id in query_ids: + self.module.remove_osd_perf_query(query_id) + raise + + return query_ids + + def unregister_osd_perf_queries(self, pool_key: PoolKeyT, query_ids: List[int]) -> None: + self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format( + pool_key, query_ids)) + for query_id in query_ids: + self.module.remove_osd_perf_query(query_id) + query_ids[:] = [] + + def register_query(self, pool_key: PoolKeyT) -> Dict[str, Any]: + if pool_key not in self.user_queries: + pool_name, namespace = pool_key + pool_id = None + if pool_name: + pool_id = self.resolve_pool_id(cast(str, pool_name)) + + user_query = { + QUERY_POOL_ID: pool_id, + QUERY_POOL_ID_MAP: {pool_id: pool_name}, + QUERY_IDS: self.register_osd_perf_queries(pool_id, namespace), + QUERY_LAST_REQUEST: datetime.now() + } + + self.user_queries[pool_key] = user_query + + # force an immediate stat pull if this is a new query + self.query_condition.notify() + self.refresh_condition.wait(5) + + else: + user_query = self.user_queries[pool_key] + + # ensure query doesn't expire + user_query[QUERY_LAST_REQUEST] = datetime.now() + + if pool_key == GLOBAL_POOL_KEY: + # refresh the global pool id -> name map upon each + # processing period + user_query[QUERY_POOL_ID_MAP] = { + pool_id: pool_name for pool_id, pool_name + in get_rbd_pools(self.module).items()} + + self.log.debug("register_query: pool_key={}, query_ids={}".format( + pool_key, user_query[QUERY_IDS])) + + return user_query + + def extract_stat(self, + index: int, + raw_image: Optional[RawImageCounterT], + sum_image: Any) -> float: + # require two raw counters between a fixed time window + if not raw_image or not raw_image[0] or not raw_image[1]: + return 0 + + current_counter, previous_counter = cast(Tuple[PerfCounterT, PerfCounterT], raw_image) + current_time = current_counter[0] + previous_time = previous_counter[0] + if current_time <= previous_time or \ + current_time - previous_time > STATS_RATE_INTERVAL.total_seconds(): + return 0 + + current_value = current_counter[1][index] + instant_rate = float(current_value) / (current_time - previous_time) + + # convert latencies from sum to average per op + ops_index = None + if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency': + ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops'] + elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency': + ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops'] + + if ops_index is not None: + ops = max(1, self.extract_stat(ops_index, raw_image, sum_image)) + instant_rate /= ops + + return instant_rate + + def extract_counter(self, + index: int, + raw_image: Optional[RawImageCounterT], + sum_image: List[int]) -> int: + if sum_image: + return sum_image[index] + return 0 + + def generate_report(self, + query: Dict[str, Union[Dict[str, str], + Dict[int, Dict[str, dict]]]], + sort_by: str, + extract_data: ExtractDataFuncT) -> Tuple[Dict[int, str], + List[Dict[str, List[float]]]]: + pool_id_map = cast(Dict[int, str], query[QUERY_POOL_ID_MAP]) + sum_pool_counters = cast(SumPoolCountersT, + query.setdefault(QUERY_SUM_POOL_COUNTERS, + cast(SumPoolCountersT, {}))) + # pool_id => {namespace => {image_id => [counter..] } + raw_pool_counters = cast(RawPoolCountersT, + query.setdefault(QUERY_RAW_POOL_COUNTERS, + cast(RawPoolCountersT, {}))) + + sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by) + + # pre-sort and limit the response + results = [] + for pool_id, sum_namespaces in sum_pool_counters.items(): + if pool_id not in pool_id_map: + continue + raw_namespaces: RawNamespacesCountersT = raw_pool_counters.get(pool_id, {}) + for namespace, sum_images in sum_namespaces.items(): + raw_images = raw_namespaces.get(namespace, {}) + for image_id, sum_image in sum_images.items(): + raw_image = raw_images.get(image_id) + + # always sort by recent IO activity + results.append(((pool_id, namespace, image_id), + self.extract_stat(sort_by_index, raw_image, + sum_image))) + results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS] + + # build the report in sorted order + pool_descriptors: Dict[str, int] = {} + counters = [] + for key, _ in results: + pool_id = key[0] + pool_name = pool_id_map[pool_id] + + namespace = key[1] + image_id = key[2] + image_names = self.image_name_cache.get((pool_id, namespace), {}) + image_name = image_names[image_id] + + raw_namespaces = raw_pool_counters.get(pool_id, {}) + raw_images = raw_namespaces.get(namespace, {}) + raw_image = raw_images.get(image_id) + + sum_namespaces = sum_pool_counters[pool_id] + sum_images = sum_namespaces[namespace] + sum_image = sum_images.get(image_id, []) + + pool_descriptor = pool_name + if namespace: + pool_descriptor += "/{}".format(namespace) + pool_index = pool_descriptors.setdefault(pool_descriptor, + len(pool_descriptors)) + image_descriptor = "{}/{}".format(pool_index, image_name) + data = [extract_data(i, raw_image, sum_image) + for i in range(len(OSD_PERF_QUERY_COUNTERS))] + + # skip if no data to report + if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]: + continue + + counters.append({image_descriptor: data}) + + return {idx: descriptor for descriptor, idx + in pool_descriptors.items()}, \ + counters + + def get_perf_data(self, + report: str, + pool_spec: Optional[str], + sort_by: str, + extract_data: ExtractDataFuncT) -> Tuple[int, str, str]: + self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format( + report, pool_spec, sort_by)) + self.scrub_expired_queries() + + pool_key = extract_pool_key(pool_spec) + authorize_request(self.module, pool_key[0], pool_key[1]) + user_query = self.register_query(pool_key) + + now = datetime.now() + pool_descriptors, counters = self.generate_report( + user_query, sort_by, extract_data) + + report = { + 'timestamp': time.mktime(now.timetuple()), + '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS, + 'pool_descriptors': pool_descriptors, + '{}s'.format(report): counters + } + + return 0, json.dumps(report), "" + + def get_perf_stats(self, + pool_spec: Optional[str], + sort_by: str) -> Tuple[int, str, str]: + return self.get_perf_data( + "stat", pool_spec, sort_by, self.extract_stat) + + def get_perf_counters(self, + pool_spec: Optional[str], + sort_by: str) -> Tuple[int, str, str]: + return self.get_perf_data( + "counter", pool_spec, sort_by, self.extract_counter) diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py new file mode 100644 index 000000000..c6ce99182 --- /dev/null +++ b/src/pybind/mgr/rbd_support/schedule.py @@ -0,0 +1,579 @@ +import datetime +import json +import rados +import rbd +import re + +from dateutil.parser import parse +from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING + +from .common import get_rbd_pools +if TYPE_CHECKING: + from .module import Module + +SCHEDULE_INTERVAL = "interval" +SCHEDULE_START_TIME = "start_time" + + +class LevelSpec: + + def __init__(self, + name: str, + id: str, + pool_id: Optional[str], + namespace: Optional[str], + image_id: Optional[str] = None) -> None: + self.name = name + self.id = id + self.pool_id = pool_id + self.namespace = namespace + self.image_id = image_id + + def __eq__(self, level_spec: Any) -> bool: + return self.id == level_spec.id + + def is_child_of(self, level_spec: 'LevelSpec') -> bool: + if level_spec.is_global(): + return not self.is_global() + if level_spec.pool_id != self.pool_id: + return False + if level_spec.namespace is None: + return self.namespace is not None + if level_spec.namespace != self.namespace: + return False + if level_spec.image_id is None: + return self.image_id is not None + return False + + def is_global(self) -> bool: + return self.pool_id is None + + def get_pool_id(self) -> Optional[str]: + return self.pool_id + + def matches(self, + pool_id: str, + namespace: str, + image_id: Optional[str] = None) -> bool: + if self.pool_id and self.pool_id != pool_id: + return False + if self.namespace and self.namespace != namespace: + return False + if self.image_id and self.image_id != image_id: + return False + return True + + def intersects(self, level_spec: 'LevelSpec') -> bool: + if self.pool_id is None or level_spec.pool_id is None: + return True + if self.pool_id != level_spec.pool_id: + return False + if self.namespace is None or level_spec.namespace is None: + return True + if self.namespace != level_spec.namespace: + return False + if self.image_id is None or level_spec.image_id is None: + return True + if self.image_id != level_spec.image_id: + return False + return True + + @classmethod + def make_global(cls) -> 'LevelSpec': + return LevelSpec("", "", None, None, None) + + @classmethod + def from_pool_spec(cls, + pool_id: int, + pool_name: str, + namespace: Optional[str] = None) -> 'LevelSpec': + if namespace is None: + id = "{}".format(pool_id) + name = "{}/".format(pool_name) + else: + id = "{}/{}".format(pool_id, namespace) + name = "{}/{}/".format(pool_name, namespace) + return LevelSpec(name, id, str(pool_id), namespace, None) + + @classmethod + def from_name(cls, + module: 'Module', + name: str, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None, + allow_image_level: bool = True) -> 'LevelSpec': + # parse names like: + # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image' + match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$', + name) + if not match: + raise ValueError("failed to parse {}".format(name)) + if match.group(3) and not allow_image_level: + raise ValueError( + "invalid name {}: image level is not allowed".format(name)) + + id = "" + pool_id = None + namespace = None + image_name = None + image_id = None + if match.group(1): + pool_name = match.group(1) + try: + pool_id = module.rados.pool_lookup(pool_name) + if pool_id is None: + raise ValueError("pool {} does not exist".format(pool_name)) + if pool_id not in get_rbd_pools(module): + raise ValueError("{} is not an RBD pool".format(pool_name)) + pool_id = str(pool_id) + id += pool_id + if match.group(2) is not None or match.group(3): + id += "/" + with module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + id += namespace + ioctx.set_namespace(namespace) + if namespace_validator: + namespace_validator(ioctx) + if match.group(3): + image_name = match.group(3) + try: + with rbd.Image(ioctx, image_name, + read_only=True) as image: + image_id = image.id() + id += "/" + image_id + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_name)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_name)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_name)) + + # normalize possible input name like 'rbd//image' + if not namespace and image_name: + name = "{}/{}".format(pool_name, image_name) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + @classmethod + def from_id(cls, + handler: Any, + id: str, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None) -> 'LevelSpec': + # parse ids like: + # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id' + match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id) + if not match: + raise ValueError("failed to parse: {}".format(id)) + + name = "" + pool_id = None + namespace = None + image_id = None + if match.group(1): + pool_id = match.group(1) + try: + pool_name = handler.module.rados.pool_reverse_lookup( + int(pool_id)) + if pool_name is None: + raise ValueError("pool {} does not exist".format(pool_name)) + name += pool_name + "/" + if match.group(2) is not None or match.group(3): + with handler.module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + name += namespace + "/" + if namespace_validator: + ioctx.set_namespace(namespace) + elif not match.group(3): + name += "/" + if match.group(3): + image_id = match.group(3) + try: + with rbd.Image(ioctx, image_id=image_id, + read_only=True) as image: + image_name = image.get_name() + name += image_name + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_id)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_id)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_id)) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + +class Interval: + + def __init__(self, minutes: int) -> None: + self.minutes = minutes + + def __eq__(self, interval: Any) -> bool: + return self.minutes == interval.minutes + + def __hash__(self) -> int: + return hash(self.minutes) + + def to_string(self) -> str: + if self.minutes % (60 * 24) == 0: + interval = int(self.minutes / (60 * 24)) + units = 'd' + elif self.minutes % 60 == 0: + interval = int(self.minutes / 60) + units = 'h' + else: + interval = int(self.minutes) + units = 'm' + + return "{}{}".format(interval, units) + + @classmethod + def from_string(cls, interval: str) -> 'Interval': + match = re.match(r'^(\d+)(d|h|m)?$', interval) + if not match: + raise ValueError("Invalid interval ({})".format(interval)) + + minutes = int(match.group(1)) + if match.group(2) == 'd': + minutes *= 60 * 24 + elif match.group(2) == 'h': + minutes *= 60 + + return Interval(minutes) + + +class StartTime: + + def __init__(self, + hour: int, + minute: int, + tzinfo: Optional[datetime.tzinfo]) -> None: + self.time = datetime.time(hour, minute, tzinfo=tzinfo) + self.minutes = self.time.hour * 60 + self.time.minute + if self.time.tzinfo: + utcoffset = cast(datetime.timedelta, self.time.utcoffset()) + self.minutes += int(utcoffset.seconds / 60) + + def __eq__(self, start_time: Any) -> bool: + return self.minutes == start_time.minutes + + def __hash__(self) -> int: + return hash(self.minutes) + + def to_string(self) -> str: + return self.time.isoformat() + + @classmethod + def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']: + if not start_time: + return None + + try: + t = parse(start_time).timetz() + except ValueError as e: + raise ValueError("Invalid start time {}: {}".format(start_time, e)) + + return StartTime(t.hour, t.minute, tzinfo=t.tzinfo) + + +class Schedule: + + def __init__(self, name: str) -> None: + self.name = name + self.items: Set[Tuple[Interval, Optional[StartTime]]] = set() + + def __len__(self) -> int: + return len(self.items) + + def add(self, + interval: Interval, + start_time: Optional[StartTime] = None) -> None: + self.items.add((interval, start_time)) + + def remove(self, + interval: Interval, + start_time: Optional[StartTime] = None) -> None: + self.items.discard((interval, start_time)) + + def next_run(self, now: datetime.datetime) -> str: + schedule_time = None + for interval, opt_start in self.items: + period = datetime.timedelta(minutes=interval.minutes) + start_time = datetime.datetime(1970, 1, 1) + if opt_start: + start = cast(StartTime, opt_start) + start_time += datetime.timedelta(minutes=start.minutes) + time = start_time + \ + (int((now - start_time) / period) + 1) * period + if schedule_time is None or time < schedule_time: + schedule_time = time + if schedule_time is None: + raise ValueError('no items is added') + return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00") + + def to_list(self) -> List[Dict[str, Optional[str]]]: + def item_to_dict(interval: Interval, + start_time: Optional[StartTime]) -> Dict[str, Optional[str]]: + if start_time: + schedule_start_time: Optional[str] = start_time.to_string() + else: + schedule_start_time = None + return {SCHEDULE_INTERVAL: interval.to_string(), + SCHEDULE_START_TIME: schedule_start_time} + return [item_to_dict(interval, start_time) + for interval, start_time in self.items] + + def to_json(self) -> str: + return json.dumps(self.to_list(), indent=4, sort_keys=True) + + @classmethod + def from_json(cls, name: str, val: str) -> 'Schedule': + try: + items = json.loads(val) + schedule = Schedule(name) + for item in items: + interval = Interval.from_string(item[SCHEDULE_INTERVAL]) + start_time = item[SCHEDULE_START_TIME] and \ + StartTime.from_string(item[SCHEDULE_START_TIME]) or None + schedule.add(interval, start_time) + return schedule + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON ({})".format(str(e))) + except KeyError as e: + raise ValueError( + "Invalid schedule format (missing key {})".format(str(e))) + except TypeError as e: + raise ValueError("Invalid schedule format ({})".format(str(e))) + + +class Schedules: + + def __init__(self, handler: Any) -> None: + self.handler = handler + self.level_specs: Dict[str, LevelSpec] = {} + self.schedules: Dict[str, Schedule] = {} + + # Previous versions incorrectly stored the global config in + # the localized module option. Check the config is here and fix it. + schedule_cfg = self.handler.module.get_module_option( + self.handler.MODULE_OPTION_NAME, '') + if not schedule_cfg: + schedule_cfg = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME, '') + if schedule_cfg: + self.handler.module.set_module_option( + self.handler.MODULE_OPTION_NAME, schedule_cfg) + self.handler.module.set_localized_module_option( + self.handler.MODULE_OPTION_NAME, None) + + def __len__(self) -> int: + return len(self.schedules) + + def load(self, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None) -> None: + self.level_specs = {} + self.schedules = {} + + schedule_cfg = self.handler.module.get_module_option( + self.handler.MODULE_OPTION_NAME, '') + if schedule_cfg: + try: + level_spec = LevelSpec.make_global() + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, schedule_cfg) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode configured schedule {}".format( + schedule_cfg)) + + for pool_id, pool_name in get_rbd_pools(self.handler.module).items(): + try: + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_from_pool(ioctx, namespace_validator, + image_validator) + except rados.ConnectionShutdown: + raise + except rados.Error as e: + self.handler.log.error( + "Failed to load schedules for pool {}: {}".format( + pool_name, e)) + + def load_from_pool(self, + ioctx: rados.Ioctx, + namespace_validator: Optional[Callable], + image_validator: Optional[Callable]) -> None: + pool_name = ioctx.get_pool_name() + stale_keys = [] + start_after = '' + try: + while True: + with rados.ReadOpCtx() as read_op: + self.handler.log.info( + "load_schedules: {}, start_after={}".format( + pool_name, start_after)) + it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) + ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID) + + it = list(it) + for k, v in it: + start_after = k + v = v.decode() + self.handler.log.info( + "load_schedule: {} {}".format(k, v)) + try: + try: + level_spec = LevelSpec.from_id( + self.handler, k, namespace_validator, + image_validator) + except ValueError: + self.handler.log.debug( + "Stale schedule key %s in pool %s", + k, pool_name) + stale_keys.append(k) + continue + + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, v) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode schedule: pool={}, {} {}".format( + pool_name, k, v)) + if not it: + break + + except StopIteration: + pass + except rados.ObjectNotFound: + pass + + if stale_keys: + with rados.WriteOpCtx() as write_op: + ioctx.remove_omap_keys(write_op, stale_keys) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None: + if level_spec.is_global(): + schedule_cfg = schedule and schedule.to_json() or None + self.handler.module.set_module_option( + self.handler.MODULE_OPTION_NAME, schedule_cfg) + return + + pool_id = level_spec.get_pool_id() + assert pool_id + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + with rados.WriteOpCtx() as write_op: + if schedule: + ioctx.set_omap(write_op, (level_spec.id, ), + (schedule.to_json(), )) + else: + ioctx.remove_omap_keys(write_op, (level_spec.id, )) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + def add(self, + level_spec: LevelSpec, + interval: str, + start_time: Optional[str]) -> None: + schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name)) + schedule.add(Interval.from_string(interval), + StartTime.from_string(start_time)) + self.schedules[level_spec.id] = schedule + self.level_specs[level_spec.id] = level_spec + self.save(level_spec, schedule) + + def remove(self, + level_spec: LevelSpec, + interval: Optional[str], + start_time: Optional[str]) -> None: + schedule = self.schedules.pop(level_spec.id, None) + if schedule: + if interval is None: + schedule = None + else: + try: + schedule.remove(Interval.from_string(interval), + StartTime.from_string(start_time)) + finally: + if schedule: + self.schedules[level_spec.id] = schedule + if not schedule: + del self.level_specs[level_spec.id] + self.save(level_spec, schedule) + + def find(self, + pool_id: str, + namespace: str, + image_id: Optional[str] = None) -> Optional['Schedule']: + levels = [pool_id, namespace] + if image_id: + levels.append(image_id) + nr_levels = len(levels) + while nr_levels >= 0: + # an empty spec id implies global schedule + level_spec_id = "/".join(levels[:nr_levels]) + found = self.schedules.get(level_spec_id) + if found is not None: + return found + nr_levels -= 1 + return None + + def intersects(self, level_spec: LevelSpec) -> bool: + for ls in self.level_specs.values(): + if ls.intersects(level_spec): + return True + return False + + def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]: + if level_spec.id in self.schedules: + parent: Optional[LevelSpec] = level_spec + else: + # try to find existing parent + parent = None + for level_spec_id in self.schedules: + ls = self.level_specs[level_spec_id] + if ls == level_spec: + parent = ls + break + if level_spec.is_child_of(ls) and \ + (not parent or ls.is_child_of(parent)): + parent = ls + if not parent: + # set to non-existing parent so we still could list its children + parent = level_spec + + result = {} + for level_spec_id, schedule in self.schedules.items(): + ls = self.level_specs[level_spec_id] + if ls == parent or ls == level_spec or ls.is_child_of(level_spec): + result[level_spec_id] = { + 'name': schedule.name, + 'schedule': schedule.to_list(), + } + return result diff --git a/src/pybind/mgr/rbd_support/task.py b/src/pybind/mgr/rbd_support/task.py new file mode 100644 index 000000000..101d480dc --- /dev/null +++ b/src/pybind/mgr/rbd_support/task.py @@ -0,0 +1,857 @@ +import errno +import json +import rados +import rbd +import re +import traceback +import uuid + +from contextlib import contextmanager +from datetime import datetime, timedelta +from functools import partial, wraps +from threading import Condition, Lock, Thread +from typing import cast, Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar + +from .common import (authorize_request, extract_pool_key, get_rbd_pools, + is_authorized, GLOBAL_POOL_KEY) + + +RBD_TASK_OID = "rbd_task" + +TASK_SEQUENCE = "sequence" +TASK_ID = "id" +TASK_REFS = "refs" +TASK_MESSAGE = "message" +TASK_RETRY_ATTEMPTS = "retry_attempts" +TASK_RETRY_TIME = "retry_time" +TASK_RETRY_MESSAGE = "retry_message" +TASK_IN_PROGRESS = "in_progress" +TASK_PROGRESS = "progress" +TASK_CANCELED = "canceled" + +TASK_REF_POOL_NAME = "pool_name" +TASK_REF_POOL_NAMESPACE = "pool_namespace" +TASK_REF_IMAGE_NAME = "image_name" +TASK_REF_IMAGE_ID = "image_id" +TASK_REF_ACTION = "action" + +TASK_REF_ACTION_FLATTEN = "flatten" +TASK_REF_ACTION_REMOVE = "remove" +TASK_REF_ACTION_TRASH_REMOVE = "trash remove" +TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute" +TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit" +TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort" + +VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN, + TASK_REF_ACTION_REMOVE, + TASK_REF_ACTION_TRASH_REMOVE, + TASK_REF_ACTION_MIGRATION_EXECUTE, + TASK_REF_ACTION_MIGRATION_COMMIT, + TASK_REF_ACTION_MIGRATION_ABORT] + +TASK_RETRY_INTERVAL = timedelta(seconds=30) +TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300) +MAX_COMPLETED_TASKS = 50 + + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable[..., Any]) + + +class Throttle: + def __init__(self: Any, throttle_period: timedelta) -> None: + self.throttle_period = throttle_period + self.time_of_last_call = datetime.min + + def __call__(self: 'Throttle', fn: FuncT) -> FuncT: + @wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> Any: + now = datetime.now() + if self.time_of_last_call + self.throttle_period <= now: + self.time_of_last_call = now + return fn(*args, **kwargs) + return cast(FuncT, wrapper) + + +TaskRefsT = Dict[str, str] + + +class Task: + def __init__(self, sequence: int, task_id: str, message: str, refs: TaskRefsT): + self.sequence = sequence + self.task_id = task_id + self.message = message + self.refs = refs + self.retry_message: Optional[str] = None + self.retry_attempts = 0 + self.retry_time: Optional[datetime] = None + self.in_progress = False + self.progress = 0.0 + self.canceled = False + self.failed = False + self.progress_posted = False + + def __str__(self) -> str: + return self.to_json() + + @property + def sequence_key(self) -> bytes: + return "{0:016X}".format(self.sequence).encode() + + def cancel(self) -> None: + self.canceled = True + self.fail("Operation canceled") + + def fail(self, message: str) -> None: + self.failed = True + self.failure_message = message + + def to_dict(self) -> Dict[str, Any]: + d = {TASK_SEQUENCE: self.sequence, + TASK_ID: self.task_id, + TASK_MESSAGE: self.message, + TASK_REFS: self.refs + } + if self.retry_message: + d[TASK_RETRY_MESSAGE] = self.retry_message + if self.retry_attempts: + d[TASK_RETRY_ATTEMPTS] = self.retry_attempts + if self.retry_time: + d[TASK_RETRY_TIME] = self.retry_time.isoformat() + if self.in_progress: + d[TASK_IN_PROGRESS] = True + d[TASK_PROGRESS] = self.progress + if self.canceled: + d[TASK_CANCELED] = True + return d + + def to_json(self) -> str: + return str(json.dumps(self.to_dict())) + + @classmethod + def from_json(cls, val: str) -> 'Task': + try: + d = json.loads(val) + action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION) + if action not in VALID_TASK_ACTIONS: + raise ValueError("Invalid task action: {}".format(action)) + + return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS]) + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON ({})".format(str(e))) + except KeyError as e: + raise ValueError("Invalid task format (missing key {})".format(str(e))) + + +# pool_name, namespace, image_name +ImageSpecT = Tuple[str, str, str] +# pool_name, namespace +PoolSpecT = Tuple[str, str] +MigrationStatusT = Dict[str, str] + + +class TaskHandler: + lock = Lock() + condition = Condition(lock) + + in_progress_task = None + tasks_by_sequence: Dict[int, Task] = dict() + tasks_by_id: Dict[str, Task] = dict() + + completed_tasks: List[Task] = [] + + sequence = 0 + + def __init__(self, module: Any) -> None: + self.module = module + self.log = module.log + + self.stop_thread = False + self.thread = Thread(target=self.run) + + def setup(self) -> None: + with self.lock: + self.init_task_queue() + self.thread.start() + + @property + def default_pool_name(self) -> str: + return self.module.get_ceph_option("rbd_default_pool") + + def extract_pool_spec(self, pool_spec: str) -> PoolSpecT: + pool_spec = extract_pool_key(pool_spec) + if pool_spec == GLOBAL_POOL_KEY: + pool_spec = (self.default_pool_name, '') + return cast(PoolSpecT, pool_spec) + + def extract_image_spec(self, image_spec: str) -> ImageSpecT: + match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$', + image_spec or '') + if not match: + raise ValueError("Invalid image spec: {}".format(image_spec)) + return (match.group(1) or self.default_pool_name, match.group(2) or '', + match.group(3)) + + def shutdown(self) -> None: + self.log.info("TaskHandler: shutting down") + self.stop_thread = True + if self.thread.is_alive(): + self.log.debug("TaskHandler: joining thread") + self.thread.join() + self.log.info("TaskHandler: shut down") + + def run(self) -> None: + try: + self.log.info("TaskHandler: starting") + while not self.stop_thread: + with self.lock: + now = datetime.now() + for sequence in sorted([sequence for sequence, task + in self.tasks_by_sequence.items() + if not task.retry_time or task.retry_time <= now]): + self.execute_task(sequence) + + self.condition.wait(5) + self.log.debug("TaskHandler: tick") + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("TaskHandler: client blocklisted") + self.module.client_blocklisted.set() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + @contextmanager + def open_ioctx(self, spec: PoolSpecT) -> Iterator[rados.Ioctx]: + try: + with self.module.rados.open_ioctx(spec[0]) as ioctx: + ioctx.set_namespace(spec[1]) + yield ioctx + except rados.ObjectNotFound: + self.log.error("Failed to locate pool {}".format(spec[0])) + raise + + @classmethod + def format_image_spec(cls, image_spec: ImageSpecT) -> str: + image = image_spec[2] + if image_spec[1]: + image = "{}/{}".format(image_spec[1], image) + if image_spec[0]: + image = "{}/{}".format(image_spec[0], image) + return image + + def init_task_queue(self) -> None: + for pool_id, pool_name in get_rbd_pools(self.module).items(): + try: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_task_queue(ioctx, pool_name) + + try: + namespaces = rbd.RBD().namespace_list(ioctx) + except rbd.OperationNotSupported: + self.log.debug("Namespaces not supported") + continue + + for namespace in namespaces: + ioctx.set_namespace(namespace) + self.load_task_queue(ioctx, pool_name) + + except rados.ObjectNotFound: + # pool DNE + pass + + if self.tasks_by_sequence: + self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1] + + self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format( + self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id))) + + def load_task_queue(self, ioctx: rados.Ioctx, pool_name: str) -> None: + pool_spec = pool_name + if ioctx.nspace: + pool_spec += "/{}".format(ioctx.nspace) + + start_after = '' + try: + while True: + with rados.ReadOpCtx() as read_op: + self.log.info("load_task_task: {}, start_after={}".format( + pool_spec, start_after)) + it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) + ioctx.operate_read_op(read_op, RBD_TASK_OID) + + it = list(it) + for k, v in it: + start_after = k + v = v.decode() + self.log.info("load_task_task: task={}".format(v)) + + try: + task = Task.from_json(v) + self.append_task(task) + except ValueError: + self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v)) + + if not it: + break + + except StopIteration: + pass + except rados.ObjectNotFound: + # rbd_task DNE + pass + + def append_task(self, task: Task) -> None: + self.tasks_by_sequence[task.sequence] = task + self.tasks_by_id[task.task_id] = task + + def task_refs_match(self, task_refs: TaskRefsT, refs: TaskRefsT) -> bool: + if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs: + task_refs = task_refs.copy() + del task_refs[TASK_REF_IMAGE_ID] + + self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs)) + return task_refs == refs + + def find_task(self, refs: TaskRefsT) -> Optional[Task]: + self.log.debug("find_task: refs={}".format(refs)) + + # search for dups and return the original + for task_id in reversed(sorted(self.tasks_by_id.keys())): + task = self.tasks_by_id[task_id] + if self.task_refs_match(task.refs, refs): + return task + + # search for a completed task (message replay) + for task in reversed(self.completed_tasks): + if self.task_refs_match(task.refs, refs): + return task + else: + return None + + def add_task(self, + ioctx: rados.Ioctx, + message: str, + refs: TaskRefsT) -> str: + self.log.debug("add_task: message={}, refs={}".format(message, refs)) + + # ensure unique uuid across all pools + while True: + task_id = str(uuid.uuid4()) + if task_id not in self.tasks_by_id: + break + + self.sequence += 1 + task = Task(self.sequence, task_id, message, refs) + + # add the task to the rbd_task omap + task_json = task.to_json() + omap_keys = (task.sequence_key, ) + omap_vals = (str.encode(task_json), ) + self.log.info("adding task: %s %s", + omap_keys[0].decode(), + omap_vals[0].decode()) + + with rados.WriteOpCtx() as write_op: + ioctx.set_omap(write_op, omap_keys, omap_vals) + ioctx.operate_write_op(write_op, RBD_TASK_OID) + self.append_task(task) + + self.condition.notify() + return task_json + + def remove_task(self, + ioctx: Optional[rados.Ioctx], + task: Task, + remove_in_memory: bool = True) -> None: + self.log.info("remove_task: task={}".format(str(task))) + if ioctx: + try: + with rados.WriteOpCtx() as write_op: + omap_keys = (task.sequence_key, ) + ioctx.remove_omap_keys(write_op, omap_keys) + ioctx.operate_write_op(write_op, RBD_TASK_OID) + except rados.ObjectNotFound: + pass + + if remove_in_memory: + try: + del self.tasks_by_id[task.task_id] + del self.tasks_by_sequence[task.sequence] + + # keep a record of the last N tasks to help avoid command replay + # races + if not task.failed and not task.canceled: + self.log.debug("remove_task: moving to completed tasks") + self.completed_tasks.append(task) + self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:] + + except KeyError: + pass + + def execute_task(self, sequence: int) -> None: + task = self.tasks_by_sequence[sequence] + self.log.info("execute_task: task={}".format(str(task))) + + pool_valid = False + try: + with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: + pool_valid = True + + action = task.refs[TASK_REF_ACTION] + execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten, + TASK_REF_ACTION_REMOVE: self.execute_remove, + TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove, + TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute, + TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit, + TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort + }.get(action) + if not execute_fn: + self.log.error("Invalid task action: {}".format(action)) + else: + task.in_progress = True + self.in_progress_task = task + + self.lock.release() + try: + execute_fn(ioctx, task) + + except rbd.OperationCanceled: + self.log.info("Operation canceled: task={}".format( + str(task))) + + finally: + self.lock.acquire() + + task.in_progress = False + self.in_progress_task = None + + self.complete_progress(task) + self.remove_task(ioctx, task) + + except rados.ObjectNotFound as e: + self.log.error("execute_task: {}".format(e)) + if pool_valid: + task.retry_message = "{}".format(e) + self.update_progress(task, 0) + else: + # pool DNE -- remove in-memory task + self.complete_progress(task) + self.remove_task(None, task) + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + raise + + except (rados.Error, rbd.Error) as e: + self.log.error("execute_task: {}".format(e)) + task.retry_message = "{}".format(e) + self.update_progress(task, 0) + + finally: + task.in_progress = False + task.retry_attempts += 1 + task.retry_time = datetime.now() + min( + TASK_RETRY_INTERVAL * task.retry_attempts, + TASK_MAX_RETRY_INTERVAL) + + def progress_callback(self, task: Task, current: int, total: int) -> int: + progress = float(current) / float(total) + self.log.debug("progress_callback: task={}, progress={}".format( + str(task), progress)) + + # avoid deadlocking when a new command comes in during a progress callback + if not self.lock.acquire(False): + return 0 + + try: + if not self.in_progress_task or self.in_progress_task.canceled: + return -rbd.ECANCELED + self.in_progress_task.progress = progress + finally: + self.lock.release() + + if not task.progress_posted: + # delayed creation of progress event until first callback + self.post_progress(task, progress) + else: + self.throttled_update_progress(task, progress) + + return 0 + + def execute_flatten(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_flatten: task={}".format(str(task))) + + try: + with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image: + image.flatten(on_progress=partial(self.progress_callback, task)) + except rbd.InvalidArgument: + task.fail("Image does not have parent") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def execute_remove(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_remove: task={}".format(str(task))) + + try: + rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def execute_trash_remove(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_trash_remove: task={}".format(str(task))) + + try: + rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def execute_migration_execute(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_migration_execute: task={}".format(str(task))) + + try: + rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + except rbd.InvalidArgument: + task.fail("Image is not migrating") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def execute_migration_commit(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_migration_commit: task={}".format(str(task))) + + try: + rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + except rbd.InvalidArgument: + task.fail("Image is not migrating or migration not executed") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def execute_migration_abort(self, ioctx: rados.Ioctx, task: Task) -> None: + self.log.info("execute_migration_abort: task={}".format(str(task))) + + try: + rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + task.fail("Image does not exist") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + except rbd.InvalidArgument: + task.fail("Image is not migrating") + self.log.info("{}: task={}".format(task.failure_message, str(task))) + + def complete_progress(self, task: Task) -> None: + if not task.progress_posted: + # ensure progress event exists before we complete/fail it + self.post_progress(task, 0) + + self.log.debug("complete_progress: task={}".format(str(task))) + try: + if task.failed: + self.module.remote("progress", "fail", task.task_id, + task.failure_message) + else: + self.module.remote("progress", "complete", task.task_id) + except ImportError: + # progress module is disabled + pass + + def _update_progress(self, task: Task, progress: float) -> None: + self.log.debug("update_progress: task={}, progress={}".format(str(task), progress)) + try: + refs = {"origin": "rbd_support"} + refs.update(task.refs) + + self.module.remote("progress", "update", task.task_id, + task.message, progress, refs) + except ImportError: + # progress module is disabled + pass + + def post_progress(self, task: Task, progress: float) -> None: + self._update_progress(task, progress) + task.progress_posted = True + + def update_progress(self, task: Task, progress: float) -> None: + if task.progress_posted: + self._update_progress(task, progress) + + @Throttle(timedelta(seconds=1)) + def throttled_update_progress(self, task: Task, progress: float) -> None: + self.update_progress(task, progress) + + def queue_flatten(self, image_spec: str) -> Tuple[int, str, str]: + image_spec = self.extract_image_spec(image_spec) + + authorize_request(self.module, image_spec[0], image_spec[1]) + self.log.info("queue_flatten: {}".format(image_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + + with self.open_ioctx(image_spec[:2]) as ioctx: + try: + with rbd.Image(ioctx, image_spec[2]) as image: + refs[TASK_REF_IMAGE_ID] = image.id() + + try: + parent_image_id = image.parent_id() + except rbd.ImageNotFound: + parent_image_id = None + + except rbd.ImageNotFound: + pass + + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + if TASK_REF_IMAGE_ID not in refs: + raise rbd.ImageNotFound("Image {} does not exist".format( + self.format_image_spec(image_spec)), errno=errno.ENOENT) + if not parent_image_id: + raise rbd.ImageNotFound("Image {} does not have a parent".format( + self.format_image_spec(image_spec)), errno=errno.ENOENT) + + return 0, self.add_task(ioctx, + "Flattening image {}".format( + self.format_image_spec(image_spec)), + refs), "" + + def queue_remove(self, image_spec: str) -> Tuple[int, str, str]: + image_spec = self.extract_image_spec(image_spec) + + authorize_request(self.module, image_spec[0], image_spec[1]) + self.log.info("queue_remove: {}".format(image_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + + with self.open_ioctx(image_spec[:2]) as ioctx: + try: + with rbd.Image(ioctx, image_spec[2]) as image: + refs[TASK_REF_IMAGE_ID] = image.id() + snaps = list(image.list_snaps()) + + except rbd.ImageNotFound: + pass + + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + if TASK_REF_IMAGE_ID not in refs: + raise rbd.ImageNotFound("Image {} does not exist".format( + self.format_image_spec(image_spec)), errno=errno.ENOENT) + if snaps: + raise rbd.ImageBusy("Image {} has snapshots".format( + self.format_image_spec(image_spec)), errno=errno.EBUSY) + + return 0, self.add_task(ioctx, + "Removing image {}".format( + self.format_image_spec(image_spec)), + refs), '' + + def queue_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]: + image_id_spec = self.extract_image_spec(image_id_spec) + + authorize_request(self.module, image_id_spec[0], image_id_spec[1]) + self.log.info("queue_trash_remove: {}".format(image_id_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, + TASK_REF_POOL_NAME: image_id_spec[0], + TASK_REF_POOL_NAMESPACE: image_id_spec[1], + TASK_REF_IMAGE_ID: image_id_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + # verify that image exists in trash + with self.open_ioctx(image_id_spec[:2]) as ioctx: + rbd.RBD().trash_get(ioctx, image_id_spec[2]) + + return 0, self.add_task(ioctx, + "Removing image {} from trash".format( + self.format_image_spec(image_id_spec)), + refs), '' + + def get_migration_status(self, + ioctx: rados.Ioctx, + image_spec: ImageSpecT) -> Optional[MigrationStatusT]: + try: + return rbd.RBD().migration_status(ioctx, image_spec[2]) + except (rbd.InvalidArgument, rbd.ImageNotFound): + return None + + def validate_image_migrating(self, + image_spec: ImageSpecT, + migration_status: Optional[MigrationStatusT]) -> None: + if not migration_status: + raise rbd.InvalidArgument("Image {} is not migrating".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + def resolve_pool_name(self, pool_id: str) -> str: + osd_map = self.module.get('osd_map') + for pool in osd_map['pools']: + if pool['pool'] == pool_id: + return pool['pool_name'] + return '<unknown>' + + def queue_migration_execute(self, image_spec: str) -> Tuple[int, str, str]: + image_spec = self.extract_image_spec(image_spec) + + authorize_request(self.module, image_spec[0], image_spec[1]) + self.log.info("queue_migration_execute: {}".format(image_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + + with self.open_ioctx(image_spec[:2]) as ioctx: + status = self.get_migration_status(ioctx, image_spec) + if status: + refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] + + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + self.validate_image_migrating(image_spec, status) + assert status + if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED, + rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]: + raise rbd.InvalidArgument("Image {} is not in ready state".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + source_pool = self.resolve_pool_name(status['source_pool_id']) + dest_pool = self.resolve_pool_name(status['dest_pool_id']) + return 0, self.add_task(ioctx, + "Migrating image {} to {}".format( + self.format_image_spec((source_pool, + status['source_pool_namespace'], + status['source_image_name'])), + self.format_image_spec((dest_pool, + status['dest_pool_namespace'], + status['dest_image_name']))), + refs), '' + + def queue_migration_commit(self, image_spec: str) -> Tuple[int, str, str]: + image_spec = self.extract_image_spec(image_spec) + + authorize_request(self.module, image_spec[0], image_spec[1]) + self.log.info("queue_migration_commit: {}".format(image_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + + with self.open_ioctx(image_spec[:2]) as ioctx: + status = self.get_migration_status(ioctx, image_spec) + if status: + refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] + + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + self.validate_image_migrating(image_spec, status) + assert status + if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED: + raise rbd.InvalidArgument("Image {} has not completed migration".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + return 0, self.add_task(ioctx, + "Committing image migration for {}".format( + self.format_image_spec(image_spec)), + refs), '' + + def queue_migration_abort(self, image_spec: str) -> Tuple[int, str, str]: + image_spec = self.extract_image_spec(image_spec) + + authorize_request(self.module, image_spec[0], image_spec[1]) + self.log.info("queue_migration_abort: {}".format(image_spec)) + + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + + with self.open_ioctx(image_spec[:2]) as ioctx: + status = self.get_migration_status(ioctx, image_spec) + if status: + refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] + + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + + self.validate_image_migrating(image_spec, status) + return 0, self.add_task(ioctx, + "Aborting image migration for {}".format( + self.format_image_spec(image_spec)), + refs), '' + + def task_cancel(self, task_id: str) -> Tuple[int, str, str]: + self.log.info("task_cancel: {}".format(task_id)) + + task = self.tasks_by_id.get(task_id) + if not task or not is_authorized(self.module, + task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE]): + return -errno.ENOENT, '', "No such task {}".format(task_id) + + task.cancel() + + remove_in_memory = True + if self.in_progress_task and self.in_progress_task.task_id == task_id: + self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task))) + remove_in_memory = False + + # complete any associated event in the progress module + self.complete_progress(task) + + # remove from rbd_task omap + with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: + self.remove_task(ioctx, task, remove_in_memory) + + return 0, "", "" + + def task_list(self, task_id: Optional[str]) -> Tuple[int, str, str]: + self.log.info("task_list: {}".format(task_id)) + + if task_id: + task = self.tasks_by_id.get(task_id) + if not task or not is_authorized(self.module, + task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE]): + return -errno.ENOENT, '', "No such task {}".format(task_id) + + return 0, json.dumps(task.to_dict(), indent=4, sort_keys=True), "" + else: + tasks = [] + for sequence in sorted(self.tasks_by_sequence.keys()): + task = self.tasks_by_sequence[sequence] + if is_authorized(self.module, + task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE]): + tasks.append(task.to_dict()) + + return 0, json.dumps(tasks, indent=4, sort_keys=True), "" diff --git a/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/src/pybind/mgr/rbd_support/trash_purge_schedule.py new file mode 100644 index 000000000..abc50ec39 --- /dev/null +++ b/src/pybind/mgr/rbd_support/trash_purge_schedule.py @@ -0,0 +1,282 @@ +import json +import rados +import rbd +import traceback + +from datetime import datetime +from threading import Condition, Lock, Thread +from typing import Any, Dict, List, Optional, Tuple + +from .common import get_rbd_pools +from .schedule import LevelSpec, Schedules + + +class TrashPurgeScheduleHandler: + MODULE_OPTION_NAME = "trash_purge_schedule" + SCHEDULE_OID = "rbd_trash_purge_schedule" + REFRESH_DELAY_SECONDS = 60.0 + + def __init__(self, module: Any) -> None: + self.lock = Lock() + self.condition = Condition(self.lock) + self.module = module + self.log = module.log + self.last_refresh_pools = datetime(1970, 1, 1) + + self.stop_thread = False + self.thread = Thread(target=self.run) + + def setup(self) -> None: + self.init_schedule_queue() + self.thread.start() + + def shutdown(self) -> None: + self.log.info("TrashPurgeScheduleHandler: shutting down") + self.stop_thread = True + if self.thread.is_alive(): + self.log.debug("TrashPurgeScheduleHandler: joining thread") + self.thread.join() + self.log.info("TrashPurgeScheduleHandler: shut down") + + def run(self) -> None: + try: + self.log.info("TrashPurgeScheduleHandler: starting") + while not self.stop_thread: + refresh_delay = self.refresh_pools() + with self.lock: + (ns_spec, wait_time) = self.dequeue() + if not ns_spec: + self.condition.wait(min(wait_time, refresh_delay)) + continue + pool_id, namespace = ns_spec + self.trash_purge(pool_id, namespace) + with self.lock: + self.enqueue(datetime.now(), pool_id, namespace) + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("TrashPurgeScheduleHandler: client blocklisted") + self.module.client_blocklisted.set() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def trash_purge(self, pool_id: str, namespace: str) -> None: + try: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + ioctx.set_namespace(namespace) + rbd.RBD().trash_purge(ioctx, datetime.now()) + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + raise + except Exception as e: + self.log.error("exception when purging {}/{}: {}".format( + pool_id, namespace, e)) + + def init_schedule_queue(self) -> None: + self.queue: Dict[str, List[Tuple[str, str]]] = {} + # pool_id => {namespace => pool_name} + self.pools: Dict[str, Dict[str, str]] = {} + self.schedules = Schedules(self) + self.refresh_pools() + self.log.debug("TrashPurgeScheduleHandler: queue is initialized") + + def load_schedules(self) -> None: + self.log.info("TrashPurgeScheduleHandler: load_schedules") + self.schedules.load() + + def refresh_pools(self) -> float: + elapsed = (datetime.now() - self.last_refresh_pools).total_seconds() + if elapsed < self.REFRESH_DELAY_SECONDS: + return self.REFRESH_DELAY_SECONDS - elapsed + + self.log.debug("TrashPurgeScheduleHandler: refresh_pools") + + with self.lock: + self.load_schedules() + if not self.schedules: + self.log.debug("TrashPurgeScheduleHandler: no schedules") + self.pools = {} + self.queue = {} + self.last_refresh_pools = datetime.now() + return self.REFRESH_DELAY_SECONDS + + pools: Dict[str, Dict[str, str]] = {} + + for pool_id, pool_name in get_rbd_pools(self.module).items(): + if not self.schedules.intersects( + LevelSpec.from_pool_spec(pool_id, pool_name)): + continue + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_pool(ioctx, pools) + + with self.lock: + self.refresh_queue(pools) + self.pools = pools + + self.last_refresh_pools = datetime.now() + return self.REFRESH_DELAY_SECONDS + + def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None: + pool_id = str(ioctx.get_pool_id()) + pool_name = ioctx.get_pool_name() + pools[pool_id] = {} + pool_namespaces = [''] + + self.log.debug("load_pool: {}".format(pool_name)) + + try: + pool_namespaces += rbd.RBD().namespace_list(ioctx) + except rbd.OperationNotSupported: + self.log.debug("namespaces not supported") + except rbd.ConnectionShutdown: + raise + except Exception as e: + self.log.error("exception when scanning pool {}: {}".format( + pool_name, e)) + + for namespace in pool_namespaces: + pools[pool_id][namespace] = pool_name + + def rebuild_queue(self) -> None: + now = datetime.now() + + # don't remove from queue "due" images + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] + + if not self.schedules: + return + + for pool_id, namespaces in self.pools.items(): + for namespace in namespaces: + self.enqueue(now, pool_id, namespace) + + self.condition.notify() + + def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None: + now = datetime.now() + + for pool_id, namespaces in self.pools.items(): + for namespace in namespaces: + if pool_id not in current_pools or \ + namespace not in current_pools[pool_id]: + self.remove_from_queue(pool_id, namespace) + + for pool_id, namespaces in current_pools.items(): + for namespace in namespaces: + if pool_id not in self.pools or \ + namespace not in self.pools[pool_id]: + self.enqueue(now, pool_id, namespace) + + self.condition.notify() + + def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None: + schedule = self.schedules.find(pool_id, namespace) + if not schedule: + self.log.debug( + "TrashPurgeScheduleHandler: no schedule for {}/{}".format( + pool_id, namespace)) + return + + schedule_time = schedule.next_run(now) + if schedule_time not in self.queue: + self.queue[schedule_time] = [] + self.log.debug( + "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format( + pool_id, namespace, schedule_time)) + ns_spec = (pool_id, namespace) + if ns_spec not in self.queue[schedule_time]: + self.queue[schedule_time].append((pool_id, namespace)) + + def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]: + if not self.queue: + return None, 1000.0 + + now = datetime.now() + schedule_time = sorted(self.queue)[0] + + if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: + wait_time = (datetime.strptime(schedule_time, + "%Y-%m-%d %H:%M:%S") - now) + return None, wait_time.total_seconds() + + namespaces = self.queue[schedule_time] + namespace = namespaces.pop(0) + if not namespaces: + del self.queue[schedule_time] + return namespace, 0.0 + + def remove_from_queue(self, pool_id: str, namespace: str) -> None: + self.log.debug( + "TrashPurgeScheduleHandler: descheduling {}/{}".format( + pool_id, namespace)) + + empty_slots = [] + for schedule_time, namespaces in self.queue.items(): + if (pool_id, namespace) in namespaces: + namespaces.remove((pool_id, namespace)) + if not namespaces: + empty_slots.append(schedule_time) + for schedule_time in empty_slots: + del self.queue[schedule_time] + + def add_schedule(self, + level_spec: LevelSpec, + interval: str, + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.add(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def remove_schedule(self, + level_spec: LevelSpec, + interval: Optional[str], + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.remove(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "TrashPurgeScheduleHandler: list: level_spec={}".format( + level_spec.name)) + + with self.lock: + result = self.schedules.to_list(level_spec) + + return 0, json.dumps(result, indent=4, sort_keys=True), "" + + def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "TrashPurgeScheduleHandler: status: level_spec={}".format( + level_spec.name)) + + scheduled = [] + with self.lock: + for schedule_time in sorted(self.queue): + for pool_id, namespace in self.queue[schedule_time]: + if not level_spec.matches(pool_id, namespace): + continue + pool_name = self.pools[pool_id][namespace] + scheduled.append({ + 'schedule_time': schedule_time, + 'pool_id': pool_id, + 'pool_name': pool_name, + 'namespace': namespace + }) + return 0, json.dumps({'scheduled': scheduled}, indent=4, + sort_keys=True), "" |