From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/rbd_support/schedule.py | 579 +++++++++++++++++++++++++++++++++ 1 file changed, 579 insertions(+) create mode 100644 src/pybind/mgr/rbd_support/schedule.py (limited to 'src/pybind/mgr/rbd_support/schedule.py') diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py new file mode 100644 index 000000000..c6ce99182 --- /dev/null +++ b/src/pybind/mgr/rbd_support/schedule.py @@ -0,0 +1,579 @@ +import datetime +import json +import rados +import rbd +import re + +from dateutil.parser import parse +from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING + +from .common import get_rbd_pools +if TYPE_CHECKING: + from .module import Module + +SCHEDULE_INTERVAL = "interval" +SCHEDULE_START_TIME = "start_time" + + +class LevelSpec: + + def __init__(self, + name: str, + id: str, + pool_id: Optional[str], + namespace: Optional[str], + image_id: Optional[str] = None) -> None: + self.name = name + self.id = id + self.pool_id = pool_id + self.namespace = namespace + self.image_id = image_id + + def __eq__(self, level_spec: Any) -> bool: + return self.id == level_spec.id + + def is_child_of(self, level_spec: 'LevelSpec') -> bool: + if level_spec.is_global(): + return not self.is_global() + if level_spec.pool_id != self.pool_id: + return False + if level_spec.namespace is None: + return self.namespace is not None + if level_spec.namespace != self.namespace: + return False + if level_spec.image_id is None: + return self.image_id is not None + return False + + def is_global(self) -> bool: + return self.pool_id is None + + def get_pool_id(self) -> Optional[str]: + return self.pool_id + + def matches(self, + pool_id: str, + namespace: str, + image_id: Optional[str] = None) -> bool: + if self.pool_id and self.pool_id != pool_id: + return False + if self.namespace and self.namespace != namespace: + return False + if self.image_id and self.image_id != image_id: + return False + return True + + def intersects(self, level_spec: 'LevelSpec') -> bool: + if self.pool_id is None or level_spec.pool_id is None: + return True + if self.pool_id != level_spec.pool_id: + return False + if self.namespace is None or level_spec.namespace is None: + return True + if self.namespace != level_spec.namespace: + return False + if self.image_id is None or level_spec.image_id is None: + return True + if self.image_id != level_spec.image_id: + return False + return True + + @classmethod + def make_global(cls) -> 'LevelSpec': + return LevelSpec("", "", None, None, None) + + @classmethod + def from_pool_spec(cls, + pool_id: int, + pool_name: str, + namespace: Optional[str] = None) -> 'LevelSpec': + if namespace is None: + id = "{}".format(pool_id) + name = "{}/".format(pool_name) + else: + id = "{}/{}".format(pool_id, namespace) + name = "{}/{}/".format(pool_name, namespace) + return LevelSpec(name, id, str(pool_id), namespace, None) + + @classmethod + def from_name(cls, + module: 'Module', + name: str, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None, + allow_image_level: bool = True) -> 'LevelSpec': + # parse names like: + # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image' + match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$', + name) + if not match: + raise ValueError("failed to parse {}".format(name)) + if match.group(3) and not allow_image_level: + raise ValueError( + "invalid name {}: image level is not allowed".format(name)) + + id = "" + pool_id = None + namespace = None + image_name = None + image_id = None + if match.group(1): + pool_name = match.group(1) + try: + pool_id = module.rados.pool_lookup(pool_name) + if pool_id is None: + raise ValueError("pool {} does not exist".format(pool_name)) + if pool_id not in get_rbd_pools(module): + raise ValueError("{} is not an RBD pool".format(pool_name)) + pool_id = str(pool_id) + id += pool_id + if match.group(2) is not None or match.group(3): + id += "/" + with module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + id += namespace + ioctx.set_namespace(namespace) + if namespace_validator: + namespace_validator(ioctx) + if match.group(3): + image_name = match.group(3) + try: + with rbd.Image(ioctx, image_name, + read_only=True) as image: + image_id = image.id() + id += "/" + image_id + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_name)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_name)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_name)) + + # normalize possible input name like 'rbd//image' + if not namespace and image_name: + name = "{}/{}".format(pool_name, image_name) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + @classmethod + def from_id(cls, + handler: Any, + id: str, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None) -> 'LevelSpec': + # parse ids like: + # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id' + match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id) + if not match: + raise ValueError("failed to parse: {}".format(id)) + + name = "" + pool_id = None + namespace = None + image_id = None + if match.group(1): + pool_id = match.group(1) + try: + pool_name = handler.module.rados.pool_reverse_lookup( + int(pool_id)) + if pool_name is None: + raise ValueError("pool {} does not exist".format(pool_name)) + name += pool_name + "/" + if match.group(2) is not None or match.group(3): + with handler.module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + name += namespace + "/" + if namespace_validator: + ioctx.set_namespace(namespace) + elif not match.group(3): + name += "/" + if match.group(3): + image_id = match.group(3) + try: + with rbd.Image(ioctx, image_id=image_id, + read_only=True) as image: + image_name = image.get_name() + name += image_name + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_id)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_id)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_id)) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + +class Interval: + + def __init__(self, minutes: int) -> None: + self.minutes = minutes + + def __eq__(self, interval: Any) -> bool: + return self.minutes == interval.minutes + + def __hash__(self) -> int: + return hash(self.minutes) + + def to_string(self) -> str: + if self.minutes % (60 * 24) == 0: + interval = int(self.minutes / (60 * 24)) + units = 'd' + elif self.minutes % 60 == 0: + interval = int(self.minutes / 60) + units = 'h' + else: + interval = int(self.minutes) + units = 'm' + + return "{}{}".format(interval, units) + + @classmethod + def from_string(cls, interval: str) -> 'Interval': + match = re.match(r'^(\d+)(d|h|m)?$', interval) + if not match: + raise ValueError("Invalid interval ({})".format(interval)) + + minutes = int(match.group(1)) + if match.group(2) == 'd': + minutes *= 60 * 24 + elif match.group(2) == 'h': + minutes *= 60 + + return Interval(minutes) + + +class StartTime: + + def __init__(self, + hour: int, + minute: int, + tzinfo: Optional[datetime.tzinfo]) -> None: + self.time = datetime.time(hour, minute, tzinfo=tzinfo) + self.minutes = self.time.hour * 60 + self.time.minute + if self.time.tzinfo: + utcoffset = cast(datetime.timedelta, self.time.utcoffset()) + self.minutes += int(utcoffset.seconds / 60) + + def __eq__(self, start_time: Any) -> bool: + return self.minutes == start_time.minutes + + def __hash__(self) -> int: + return hash(self.minutes) + + def to_string(self) -> str: + return self.time.isoformat() + + @classmethod + def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']: + if not start_time: + return None + + try: + t = parse(start_time).timetz() + except ValueError as e: + raise ValueError("Invalid start time {}: {}".format(start_time, e)) + + return StartTime(t.hour, t.minute, tzinfo=t.tzinfo) + + +class Schedule: + + def __init__(self, name: str) -> None: + self.name = name + self.items: Set[Tuple[Interval, Optional[StartTime]]] = set() + + def __len__(self) -> int: + return len(self.items) + + def add(self, + interval: Interval, + start_time: Optional[StartTime] = None) -> None: + self.items.add((interval, start_time)) + + def remove(self, + interval: Interval, + start_time: Optional[StartTime] = None) -> None: + self.items.discard((interval, start_time)) + + def next_run(self, now: datetime.datetime) -> str: + schedule_time = None + for interval, opt_start in self.items: + period = datetime.timedelta(minutes=interval.minutes) + start_time = datetime.datetime(1970, 1, 1) + if opt_start: + start = cast(StartTime, opt_start) + start_time += datetime.timedelta(minutes=start.minutes) + time = start_time + \ + (int((now - start_time) / period) + 1) * period + if schedule_time is None or time < schedule_time: + schedule_time = time + if schedule_time is None: + raise ValueError('no items is added') + return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00") + + def to_list(self) -> List[Dict[str, Optional[str]]]: + def item_to_dict(interval: Interval, + start_time: Optional[StartTime]) -> Dict[str, Optional[str]]: + if start_time: + schedule_start_time: Optional[str] = start_time.to_string() + else: + schedule_start_time = None + return {SCHEDULE_INTERVAL: interval.to_string(), + SCHEDULE_START_TIME: schedule_start_time} + return [item_to_dict(interval, start_time) + for interval, start_time in self.items] + + def to_json(self) -> str: + return json.dumps(self.to_list(), indent=4, sort_keys=True) + + @classmethod + def from_json(cls, name: str, val: str) -> 'Schedule': + try: + items = json.loads(val) + schedule = Schedule(name) + for item in items: + interval = Interval.from_string(item[SCHEDULE_INTERVAL]) + start_time = item[SCHEDULE_START_TIME] and \ + StartTime.from_string(item[SCHEDULE_START_TIME]) or None + schedule.add(interval, start_time) + return schedule + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON ({})".format(str(e))) + except KeyError as e: + raise ValueError( + "Invalid schedule format (missing key {})".format(str(e))) + except TypeError as e: + raise ValueError("Invalid schedule format ({})".format(str(e))) + + +class Schedules: + + def __init__(self, handler: Any) -> None: + self.handler = handler + self.level_specs: Dict[str, LevelSpec] = {} + self.schedules: Dict[str, Schedule] = {} + + # Previous versions incorrectly stored the global config in + # the localized module option. Check the config is here and fix it. + schedule_cfg = self.handler.module.get_module_option( + self.handler.MODULE_OPTION_NAME, '') + if not schedule_cfg: + schedule_cfg = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME, '') + if schedule_cfg: + self.handler.module.set_module_option( + self.handler.MODULE_OPTION_NAME, schedule_cfg) + self.handler.module.set_localized_module_option( + self.handler.MODULE_OPTION_NAME, None) + + def __len__(self) -> int: + return len(self.schedules) + + def load(self, + namespace_validator: Optional[Callable] = None, + image_validator: Optional[Callable] = None) -> None: + self.level_specs = {} + self.schedules = {} + + schedule_cfg = self.handler.module.get_module_option( + self.handler.MODULE_OPTION_NAME, '') + if schedule_cfg: + try: + level_spec = LevelSpec.make_global() + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, schedule_cfg) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode configured schedule {}".format( + schedule_cfg)) + + for pool_id, pool_name in get_rbd_pools(self.handler.module).items(): + try: + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_from_pool(ioctx, namespace_validator, + image_validator) + except rados.ConnectionShutdown: + raise + except rados.Error as e: + self.handler.log.error( + "Failed to load schedules for pool {}: {}".format( + pool_name, e)) + + def load_from_pool(self, + ioctx: rados.Ioctx, + namespace_validator: Optional[Callable], + image_validator: Optional[Callable]) -> None: + pool_name = ioctx.get_pool_name() + stale_keys = [] + start_after = '' + try: + while True: + with rados.ReadOpCtx() as read_op: + self.handler.log.info( + "load_schedules: {}, start_after={}".format( + pool_name, start_after)) + it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) + ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID) + + it = list(it) + for k, v in it: + start_after = k + v = v.decode() + self.handler.log.info( + "load_schedule: {} {}".format(k, v)) + try: + try: + level_spec = LevelSpec.from_id( + self.handler, k, namespace_validator, + image_validator) + except ValueError: + self.handler.log.debug( + "Stale schedule key %s in pool %s", + k, pool_name) + stale_keys.append(k) + continue + + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, v) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode schedule: pool={}, {} {}".format( + pool_name, k, v)) + if not it: + break + + except StopIteration: + pass + except rados.ObjectNotFound: + pass + + if stale_keys: + with rados.WriteOpCtx() as write_op: + ioctx.remove_omap_keys(write_op, stale_keys) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None: + if level_spec.is_global(): + schedule_cfg = schedule and schedule.to_json() or None + self.handler.module.set_module_option( + self.handler.MODULE_OPTION_NAME, schedule_cfg) + return + + pool_id = level_spec.get_pool_id() + assert pool_id + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + with rados.WriteOpCtx() as write_op: + if schedule: + ioctx.set_omap(write_op, (level_spec.id, ), + (schedule.to_json(), )) + else: + ioctx.remove_omap_keys(write_op, (level_spec.id, )) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + def add(self, + level_spec: LevelSpec, + interval: str, + start_time: Optional[str]) -> None: + schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name)) + schedule.add(Interval.from_string(interval), + StartTime.from_string(start_time)) + self.schedules[level_spec.id] = schedule + self.level_specs[level_spec.id] = level_spec + self.save(level_spec, schedule) + + def remove(self, + level_spec: LevelSpec, + interval: Optional[str], + start_time: Optional[str]) -> None: + schedule = self.schedules.pop(level_spec.id, None) + if schedule: + if interval is None: + schedule = None + else: + try: + schedule.remove(Interval.from_string(interval), + StartTime.from_string(start_time)) + finally: + if schedule: + self.schedules[level_spec.id] = schedule + if not schedule: + del self.level_specs[level_spec.id] + self.save(level_spec, schedule) + + def find(self, + pool_id: str, + namespace: str, + image_id: Optional[str] = None) -> Optional['Schedule']: + levels = [pool_id, namespace] + if image_id: + levels.append(image_id) + nr_levels = len(levels) + while nr_levels >= 0: + # an empty spec id implies global schedule + level_spec_id = "/".join(levels[:nr_levels]) + found = self.schedules.get(level_spec_id) + if found is not None: + return found + nr_levels -= 1 + return None + + def intersects(self, level_spec: LevelSpec) -> bool: + for ls in self.level_specs.values(): + if ls.intersects(level_spec): + return True + return False + + def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]: + if level_spec.id in self.schedules: + parent: Optional[LevelSpec] = level_spec + else: + # try to find existing parent + parent = None + for level_spec_id in self.schedules: + ls = self.level_specs[level_spec_id] + if ls == level_spec: + parent = ls + break + if level_spec.is_child_of(ls) and \ + (not parent or ls.is_child_of(parent)): + parent = ls + if not parent: + # set to non-existing parent so we still could list its children + parent = level_spec + + result = {} + for level_spec_id, schedule in self.schedules.items(): + ls = self.level_specs[level_spec_id] + if ls == parent or ls == level_spec or ls.is_child_of(level_spec): + result[level_spec_id] = { + 'name': schedule.name, + 'schedule': schedule.to_list(), + } + return result -- cgit v1.2.3