summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rbd_support/schedule.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rbd_support/schedule.py')
-rw-r--r--src/pybind/mgr/rbd_support/schedule.py579
1 files changed, 579 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py
new file mode 100644
index 000000000..c6ce99182
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/schedule.py
@@ -0,0 +1,579 @@
+import datetime
+import json
+import rados
+import rbd
+import re
+
+from dateutil.parser import parse
+from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
+
+from .common import get_rbd_pools
+if TYPE_CHECKING:
+ from .module import Module
+
+SCHEDULE_INTERVAL = "interval"
+SCHEDULE_START_TIME = "start_time"
+
+
+class LevelSpec:
+
+ def __init__(self,
+ name: str,
+ id: str,
+ pool_id: Optional[str],
+ namespace: Optional[str],
+ image_id: Optional[str] = None) -> None:
+ self.name = name
+ self.id = id
+ self.pool_id = pool_id
+ self.namespace = namespace
+ self.image_id = image_id
+
+ def __eq__(self, level_spec: Any) -> bool:
+ return self.id == level_spec.id
+
+ def is_child_of(self, level_spec: 'LevelSpec') -> bool:
+ if level_spec.is_global():
+ return not self.is_global()
+ if level_spec.pool_id != self.pool_id:
+ return False
+ if level_spec.namespace is None:
+ return self.namespace is not None
+ if level_spec.namespace != self.namespace:
+ return False
+ if level_spec.image_id is None:
+ return self.image_id is not None
+ return False
+
+ def is_global(self) -> bool:
+ return self.pool_id is None
+
+ def get_pool_id(self) -> Optional[str]:
+ return self.pool_id
+
+ def matches(self,
+ pool_id: str,
+ namespace: str,
+ image_id: Optional[str] = None) -> bool:
+ if self.pool_id and self.pool_id != pool_id:
+ return False
+ if self.namespace and self.namespace != namespace:
+ return False
+ if self.image_id and self.image_id != image_id:
+ return False
+ return True
+
+ def intersects(self, level_spec: 'LevelSpec') -> bool:
+ if self.pool_id is None or level_spec.pool_id is None:
+ return True
+ if self.pool_id != level_spec.pool_id:
+ return False
+ if self.namespace is None or level_spec.namespace is None:
+ return True
+ if self.namespace != level_spec.namespace:
+ return False
+ if self.image_id is None or level_spec.image_id is None:
+ return True
+ if self.image_id != level_spec.image_id:
+ return False
+ return True
+
+ @classmethod
+ def make_global(cls) -> 'LevelSpec':
+ return LevelSpec("", "", None, None, None)
+
+ @classmethod
+ def from_pool_spec(cls,
+ pool_id: int,
+ pool_name: str,
+ namespace: Optional[str] = None) -> 'LevelSpec':
+ if namespace is None:
+ id = "{}".format(pool_id)
+ name = "{}/".format(pool_name)
+ else:
+ id = "{}/{}".format(pool_id, namespace)
+ name = "{}/{}/".format(pool_name, namespace)
+ return LevelSpec(name, id, str(pool_id), namespace, None)
+
+ @classmethod
+ def from_name(cls,
+ module: 'Module',
+ name: str,
+ namespace_validator: Optional[Callable] = None,
+ image_validator: Optional[Callable] = None,
+ allow_image_level: bool = True) -> 'LevelSpec':
+ # parse names like:
+ # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
+ match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
+ name)
+ if not match:
+ raise ValueError("failed to parse {}".format(name))
+ if match.group(3) and not allow_image_level:
+ raise ValueError(
+ "invalid name {}: image level is not allowed".format(name))
+
+ id = ""
+ pool_id = None
+ namespace = None
+ image_name = None
+ image_id = None
+ if match.group(1):
+ pool_name = match.group(1)
+ try:
+ pool_id = module.rados.pool_lookup(pool_name)
+ if pool_id is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ if pool_id not in get_rbd_pools(module):
+ raise ValueError("{} is not an RBD pool".format(pool_name))
+ pool_id = str(pool_id)
+ id += pool_id
+ if match.group(2) is not None or match.group(3):
+ id += "/"
+ with module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ id += namespace
+ ioctx.set_namespace(namespace)
+ if namespace_validator:
+ namespace_validator(ioctx)
+ if match.group(3):
+ image_name = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_name,
+ read_only=True) as image:
+ image_id = image.id()
+ id += "/" + image_id
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_name))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_name))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_name))
+
+ # normalize possible input name like 'rbd//image'
+ if not namespace and image_name:
+ name = "{}/{}".format(pool_name, image_name)
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+ @classmethod
+ def from_id(cls,
+ handler: Any,
+ id: str,
+ namespace_validator: Optional[Callable] = None,
+ image_validator: Optional[Callable] = None) -> 'LevelSpec':
+ # parse ids like:
+ # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
+ match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
+ if not match:
+ raise ValueError("failed to parse: {}".format(id))
+
+ name = ""
+ pool_id = None
+ namespace = None
+ image_id = None
+ if match.group(1):
+ pool_id = match.group(1)
+ try:
+ pool_name = handler.module.rados.pool_reverse_lookup(
+ int(pool_id))
+ if pool_name is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ name += pool_name + "/"
+ if match.group(2) is not None or match.group(3):
+ with handler.module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ name += namespace + "/"
+ if namespace_validator:
+ ioctx.set_namespace(namespace)
+ elif not match.group(3):
+ name += "/"
+ if match.group(3):
+ image_id = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_id=image_id,
+ read_only=True) as image:
+ image_name = image.get_name()
+ name += image_name
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_id))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_id))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_id))
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+
+class Interval:
+
+ def __init__(self, minutes: int) -> None:
+ self.minutes = minutes
+
+ def __eq__(self, interval: Any) -> bool:
+ return self.minutes == interval.minutes
+
+ def __hash__(self) -> int:
+ return hash(self.minutes)
+
+ def to_string(self) -> str:
+ if self.minutes % (60 * 24) == 0:
+ interval = int(self.minutes / (60 * 24))
+ units = 'd'
+ elif self.minutes % 60 == 0:
+ interval = int(self.minutes / 60)
+ units = 'h'
+ else:
+ interval = int(self.minutes)
+ units = 'm'
+
+ return "{}{}".format(interval, units)
+
+ @classmethod
+ def from_string(cls, interval: str) -> 'Interval':
+ match = re.match(r'^(\d+)(d|h|m)?$', interval)
+ if not match:
+ raise ValueError("Invalid interval ({})".format(interval))
+
+ minutes = int(match.group(1))
+ if match.group(2) == 'd':
+ minutes *= 60 * 24
+ elif match.group(2) == 'h':
+ minutes *= 60
+
+ return Interval(minutes)
+
+
+class StartTime:
+
+ def __init__(self,
+ hour: int,
+ minute: int,
+ tzinfo: Optional[datetime.tzinfo]) -> None:
+ self.time = datetime.time(hour, minute, tzinfo=tzinfo)
+ self.minutes = self.time.hour * 60 + self.time.minute
+ if self.time.tzinfo:
+ utcoffset = cast(datetime.timedelta, self.time.utcoffset())
+ self.minutes += int(utcoffset.seconds / 60)
+
+ def __eq__(self, start_time: Any) -> bool:
+ return self.minutes == start_time.minutes
+
+ def __hash__(self) -> int:
+ return hash(self.minutes)
+
+ def to_string(self) -> str:
+ return self.time.isoformat()
+
+ @classmethod
+ def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']:
+ if not start_time:
+ return None
+
+ try:
+ t = parse(start_time).timetz()
+ except ValueError as e:
+ raise ValueError("Invalid start time {}: {}".format(start_time, e))
+
+ return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
+
+
+class Schedule:
+
+ def __init__(self, name: str) -> None:
+ self.name = name
+ self.items: Set[Tuple[Interval, Optional[StartTime]]] = set()
+
+ def __len__(self) -> int:
+ return len(self.items)
+
+ def add(self,
+ interval: Interval,
+ start_time: Optional[StartTime] = None) -> None:
+ self.items.add((interval, start_time))
+
+ def remove(self,
+ interval: Interval,
+ start_time: Optional[StartTime] = None) -> None:
+ self.items.discard((interval, start_time))
+
+ def next_run(self, now: datetime.datetime) -> str:
+ schedule_time = None
+ for interval, opt_start in self.items:
+ period = datetime.timedelta(minutes=interval.minutes)
+ start_time = datetime.datetime(1970, 1, 1)
+ if opt_start:
+ start = cast(StartTime, opt_start)
+ start_time += datetime.timedelta(minutes=start.minutes)
+ time = start_time + \
+ (int((now - start_time) / period) + 1) * period
+ if schedule_time is None or time < schedule_time:
+ schedule_time = time
+ if schedule_time is None:
+ raise ValueError('no items is added')
+ return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
+
+ def to_list(self) -> List[Dict[str, Optional[str]]]:
+ def item_to_dict(interval: Interval,
+ start_time: Optional[StartTime]) -> Dict[str, Optional[str]]:
+ if start_time:
+ schedule_start_time: Optional[str] = start_time.to_string()
+ else:
+ schedule_start_time = None
+ return {SCHEDULE_INTERVAL: interval.to_string(),
+ SCHEDULE_START_TIME: schedule_start_time}
+ return [item_to_dict(interval, start_time)
+ for interval, start_time in self.items]
+
+ def to_json(self) -> str:
+ return json.dumps(self.to_list(), indent=4, sort_keys=True)
+
+ @classmethod
+ def from_json(cls, name: str, val: str) -> 'Schedule':
+ try:
+ items = json.loads(val)
+ schedule = Schedule(name)
+ for item in items:
+ interval = Interval.from_string(item[SCHEDULE_INTERVAL])
+ start_time = item[SCHEDULE_START_TIME] and \
+ StartTime.from_string(item[SCHEDULE_START_TIME]) or None
+ schedule.add(interval, start_time)
+ return schedule
+ except json.JSONDecodeError as e:
+ raise ValueError("Invalid JSON ({})".format(str(e)))
+ except KeyError as e:
+ raise ValueError(
+ "Invalid schedule format (missing key {})".format(str(e)))
+ except TypeError as e:
+ raise ValueError("Invalid schedule format ({})".format(str(e)))
+
+
+class Schedules:
+
+ def __init__(self, handler: Any) -> None:
+ self.handler = handler
+ self.level_specs: Dict[str, LevelSpec] = {}
+ self.schedules: Dict[str, Schedule] = {}
+
+ # Previous versions incorrectly stored the global config in
+ # the localized module option. Check the config is here and fix it.
+ schedule_cfg = self.handler.module.get_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+ if not schedule_cfg:
+ schedule_cfg = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+ if schedule_cfg:
+ self.handler.module.set_module_option(
+ self.handler.MODULE_OPTION_NAME, schedule_cfg)
+ self.handler.module.set_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, None)
+
+ def __len__(self) -> int:
+ return len(self.schedules)
+
+ def load(self,
+ namespace_validator: Optional[Callable] = None,
+ image_validator: Optional[Callable] = None) -> None:
+ self.level_specs = {}
+ self.schedules = {}
+
+ schedule_cfg = self.handler.module.get_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+ if schedule_cfg:
+ try:
+ level_spec = LevelSpec.make_global()
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, schedule_cfg)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode configured schedule {}".format(
+ schedule_cfg))
+
+ for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
+ try:
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_from_pool(ioctx, namespace_validator,
+ image_validator)
+ except rados.ConnectionShutdown:
+ raise
+ except rados.Error as e:
+ self.handler.log.error(
+ "Failed to load schedules for pool {}: {}".format(
+ pool_name, e))
+
+ def load_from_pool(self,
+ ioctx: rados.Ioctx,
+ namespace_validator: Optional[Callable],
+ image_validator: Optional[Callable]) -> None:
+ pool_name = ioctx.get_pool_name()
+ stale_keys = []
+ start_after = ''
+ try:
+ while True:
+ with rados.ReadOpCtx() as read_op:
+ self.handler.log.info(
+ "load_schedules: {}, start_after={}".format(
+ pool_name, start_after))
+ it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+ ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
+
+ it = list(it)
+ for k, v in it:
+ start_after = k
+ v = v.decode()
+ self.handler.log.info(
+ "load_schedule: {} {}".format(k, v))
+ try:
+ try:
+ level_spec = LevelSpec.from_id(
+ self.handler, k, namespace_validator,
+ image_validator)
+ except ValueError:
+ self.handler.log.debug(
+ "Stale schedule key %s in pool %s",
+ k, pool_name)
+ stale_keys.append(k)
+ continue
+
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, v)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode schedule: pool={}, {} {}".format(
+ pool_name, k, v))
+ if not it:
+ break
+
+ except StopIteration:
+ pass
+ except rados.ObjectNotFound:
+ pass
+
+ if stale_keys:
+ with rados.WriteOpCtx() as write_op:
+ ioctx.remove_omap_keys(write_op, stale_keys)
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+ def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None:
+ if level_spec.is_global():
+ schedule_cfg = schedule and schedule.to_json() or None
+ self.handler.module.set_module_option(
+ self.handler.MODULE_OPTION_NAME, schedule_cfg)
+ return
+
+ pool_id = level_spec.get_pool_id()
+ assert pool_id
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ with rados.WriteOpCtx() as write_op:
+ if schedule:
+ ioctx.set_omap(write_op, (level_spec.id, ),
+ (schedule.to_json(), ))
+ else:
+ ioctx.remove_omap_keys(write_op, (level_spec.id, ))
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+ def add(self,
+ level_spec: LevelSpec,
+ interval: str,
+ start_time: Optional[str]) -> None:
+ schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
+ schedule.add(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ self.schedules[level_spec.id] = schedule
+ self.level_specs[level_spec.id] = level_spec
+ self.save(level_spec, schedule)
+
+ def remove(self,
+ level_spec: LevelSpec,
+ interval: Optional[str],
+ start_time: Optional[str]) -> None:
+ schedule = self.schedules.pop(level_spec.id, None)
+ if schedule:
+ if interval is None:
+ schedule = None
+ else:
+ try:
+ schedule.remove(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ finally:
+ if schedule:
+ self.schedules[level_spec.id] = schedule
+ if not schedule:
+ del self.level_specs[level_spec.id]
+ self.save(level_spec, schedule)
+
+ def find(self,
+ pool_id: str,
+ namespace: str,
+ image_id: Optional[str] = None) -> Optional['Schedule']:
+ levels = [pool_id, namespace]
+ if image_id:
+ levels.append(image_id)
+ nr_levels = len(levels)
+ while nr_levels >= 0:
+ # an empty spec id implies global schedule
+ level_spec_id = "/".join(levels[:nr_levels])
+ found = self.schedules.get(level_spec_id)
+ if found is not None:
+ return found
+ nr_levels -= 1
+ return None
+
+ def intersects(self, level_spec: LevelSpec) -> bool:
+ for ls in self.level_specs.values():
+ if ls.intersects(level_spec):
+ return True
+ return False
+
+ def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]:
+ if level_spec.id in self.schedules:
+ parent: Optional[LevelSpec] = level_spec
+ else:
+ # try to find existing parent
+ parent = None
+ for level_spec_id in self.schedules:
+ ls = self.level_specs[level_spec_id]
+ if ls == level_spec:
+ parent = ls
+ break
+ if level_spec.is_child_of(ls) and \
+ (not parent or ls.is_child_of(parent)):
+ parent = ls
+ if not parent:
+ # set to non-existing parent so we still could list its children
+ parent = level_spec
+
+ result = {}
+ for level_spec_id, schedule in self.schedules.items():
+ ls = self.level_specs[level_spec_id]
+ if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
+ result[level_spec_id] = {
+ 'name': schedule.name,
+ 'schedule': schedule.to_list(),
+ }
+ return result