summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rbd_support/trash_purge_schedule.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rbd_support/trash_purge_schedule.py')
-rw-r--r--src/pybind/mgr/rbd_support/trash_purge_schedule.py282
1 files changed, 282 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/src/pybind/mgr/rbd_support/trash_purge_schedule.py
new file mode 100644
index 000000000..abc50ec39
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/trash_purge_schedule.py
@@ -0,0 +1,282 @@
+import json
+import rados
+import rbd
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+from typing import Any, Dict, List, Optional, Tuple
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Schedules
+
+
+class TrashPurgeScheduleHandler:
+ MODULE_OPTION_NAME = "trash_purge_schedule"
+ SCHEDULE_OID = "rbd_trash_purge_schedule"
+ REFRESH_DELAY_SECONDS = 60.0
+
+ def __init__(self, module: Any) -> None:
+ self.lock = Lock()
+ self.condition = Condition(self.lock)
+ self.module = module
+ self.log = module.log
+ self.last_refresh_pools = datetime(1970, 1, 1)
+
+ self.stop_thread = False
+ self.thread = Thread(target=self.run)
+
+ def setup(self) -> None:
+ self.init_schedule_queue()
+ self.thread.start()
+
+ def shutdown(self) -> None:
+ self.log.info("TrashPurgeScheduleHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("TrashPurgeScheduleHandler: joining thread")
+ self.thread.join()
+ self.log.info("TrashPurgeScheduleHandler: shut down")
+
+ def run(self) -> None:
+ try:
+ self.log.info("TrashPurgeScheduleHandler: starting")
+ while not self.stop_thread:
+ refresh_delay = self.refresh_pools()
+ with self.lock:
+ (ns_spec, wait_time) = self.dequeue()
+ if not ns_spec:
+ self.condition.wait(min(wait_time, refresh_delay))
+ continue
+ pool_id, namespace = ns_spec
+ self.trash_purge(pool_id, namespace)
+ with self.lock:
+ self.enqueue(datetime.now(), pool_id, namespace)
+
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
+ self.module.client_blocklisted.set()
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def trash_purge(self, pool_id: str, namespace: str) -> None:
+ try:
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ ioctx.set_namespace(namespace)
+ rbd.RBD().trash_purge(ioctx, datetime.now())
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ raise
+ except Exception as e:
+ self.log.error("exception when purging {}/{}: {}".format(
+ pool_id, namespace, e))
+
+ def init_schedule_queue(self) -> None:
+ self.queue: Dict[str, List[Tuple[str, str]]] = {}
+ # pool_id => {namespace => pool_name}
+ self.pools: Dict[str, Dict[str, str]] = {}
+ self.schedules = Schedules(self)
+ self.refresh_pools()
+ self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
+
+ def load_schedules(self) -> None:
+ self.log.info("TrashPurgeScheduleHandler: load_schedules")
+ self.schedules.load()
+
+ def refresh_pools(self) -> float:
+ elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
+ if elapsed < self.REFRESH_DELAY_SECONDS:
+ return self.REFRESH_DELAY_SECONDS - elapsed
+
+ self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
+
+ with self.lock:
+ self.load_schedules()
+ if not self.schedules:
+ self.log.debug("TrashPurgeScheduleHandler: no schedules")
+ self.pools = {}
+ self.queue = {}
+ self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ pools: Dict[str, Dict[str, str]] = {}
+
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(pool_id, pool_name)):
+ continue
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_pool(ioctx, pools)
+
+ with self.lock:
+ self.refresh_queue(pools)
+ self.pools = pools
+
+ self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
+ pool_id = str(ioctx.get_pool_id())
+ pool_name = ioctx.get_pool_name()
+ pools[pool_id] = {}
+ pool_namespaces = ['']
+
+ self.log.debug("load_pool: {}".format(pool_name))
+
+ try:
+ pool_namespaces += rbd.RBD().namespace_list(ioctx)
+ except rbd.OperationNotSupported:
+ self.log.debug("namespaces not supported")
+ except rbd.ConnectionShutdown:
+ raise
+ except Exception as e:
+ self.log.error("exception when scanning pool {}: {}".format(
+ pool_name, e))
+
+ for namespace in pool_namespaces:
+ pools[pool_id][namespace] = pool_name
+
+ def rebuild_queue(self) -> None:
+ now = datetime.now()
+
+ # don't remove from queue "due" images
+ now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+ for schedule_time in list(self.queue):
+ if schedule_time > now_string:
+ del self.queue[schedule_time]
+
+ if not self.schedules:
+ return
+
+ for pool_id, namespaces in self.pools.items():
+ for namespace in namespaces:
+ self.enqueue(now, pool_id, namespace)
+
+ self.condition.notify()
+
+ def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
+ now = datetime.now()
+
+ for pool_id, namespaces in self.pools.items():
+ for namespace in namespaces:
+ if pool_id not in current_pools or \
+ namespace not in current_pools[pool_id]:
+ self.remove_from_queue(pool_id, namespace)
+
+ for pool_id, namespaces in current_pools.items():
+ for namespace in namespaces:
+ if pool_id not in self.pools or \
+ namespace not in self.pools[pool_id]:
+ self.enqueue(now, pool_id, namespace)
+
+ self.condition.notify()
+
+ def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
+ schedule = self.schedules.find(pool_id, namespace)
+ if not schedule:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
+ pool_id, namespace))
+ return
+
+ schedule_time = schedule.next_run(now)
+ if schedule_time not in self.queue:
+ self.queue[schedule_time] = []
+ self.log.debug(
+ "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
+ pool_id, namespace, schedule_time))
+ ns_spec = (pool_id, namespace)
+ if ns_spec not in self.queue[schedule_time]:
+ self.queue[schedule_time].append((pool_id, namespace))
+
+ def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]:
+ if not self.queue:
+ return None, 1000.0
+
+ now = datetime.now()
+ schedule_time = sorted(self.queue)[0]
+
+ if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+ wait_time = (datetime.strptime(schedule_time,
+ "%Y-%m-%d %H:%M:%S") - now)
+ return None, wait_time.total_seconds()
+
+ namespaces = self.queue[schedule_time]
+ namespace = namespaces.pop(0)
+ if not namespaces:
+ del self.queue[schedule_time]
+ return namespace, 0.0
+
+ def remove_from_queue(self, pool_id: str, namespace: str) -> None:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: descheduling {}/{}".format(
+ pool_id, namespace))
+
+ empty_slots = []
+ for schedule_time, namespaces in self.queue.items():
+ if (pool_id, namespace) in namespaces:
+ namespaces.remove((pool_id, namespace))
+ if not namespaces:
+ empty_slots.append(schedule_time)
+ for schedule_time in empty_slots:
+ del self.queue[schedule_time]
+
+ def add_schedule(self,
+ level_spec: LevelSpec,
+ interval: str,
+ start_time: Optional[str]) -> Tuple[int, str, str]:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.add(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def remove_schedule(self,
+ level_spec: LevelSpec,
+ interval: Optional[str],
+ start_time: Optional[str]) -> Tuple[int, str, str]:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.remove(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: list: level_spec={}".format(
+ level_spec.name))
+
+ with self.lock:
+ result = self.schedules.to_list(level_spec)
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: status: level_spec={}".format(
+ level_spec.name))
+
+ scheduled = []
+ with self.lock:
+ for schedule_time in sorted(self.queue):
+ for pool_id, namespace in self.queue[schedule_time]:
+ if not level_spec.matches(pool_id, namespace):
+ continue
+ pool_name = self.pools[pool_id][namespace]
+ scheduled.append({
+ 'schedule_time': schedule_time,
+ 'pool_id': pool_id,
+ 'pool_name': pool_name,
+ 'namespace': namespace
+ })
+ return 0, json.dumps({'scheduled': scheduled}, indent=4,
+ sort_keys=True), ""