diff options
Diffstat (limited to 'src/pybind/mgr/progress/module.py')
-rw-r--r-- | src/pybind/mgr/progress/module.py | 873 |
1 files changed, 873 insertions, 0 deletions
diff --git a/src/pybind/mgr/progress/module.py b/src/pybind/mgr/progress/module.py new file mode 100644 index 000000000..422aba962 --- /dev/null +++ b/src/pybind/mgr/progress/module.py @@ -0,0 +1,873 @@ +try: + from typing import List, Dict, Union, Any, Optional + from typing import TYPE_CHECKING +except ImportError: + TYPE_CHECKING = False + +from mgr_module import MgrModule, OSDMap, Option +from mgr_util import to_pretty_timedelta +from datetime import timedelta +import os +import threading +import datetime +import uuid +import time +import logging +import json + + +ENCODING_VERSION = 2 + +# keep a global reference to the module so we can use it from Event methods +_module = None # type: Optional["Module"] + + +class Event(object): + """ + A generic "event" that has a start time, completion percentage, + and a list of "refs" that are (type, id) tuples describing which + objects (osds, pools) this relates to. + """ + def __init__(self, id: str, + message: str, + refs: List[str], + add_to_ceph_s: bool, + started_at: Optional[float] = None): + self._message = message + self._refs = refs + self.started_at = started_at if started_at else time.time() + self.id = id + self._add_to_ceph_s = add_to_ceph_s + + def _refresh(self): + global _module + assert _module + _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message, + self.progress)) + _module.update_progress_event( + self.id, self.twoline_progress(6), self.progress, self._add_to_ceph_s) + + @property + def message(self): + # type: () -> str + return self._message + + @property + def refs(self): + # type: () -> List[str] + return self._refs + + @property + def add_to_ceph_s(self): + # type: () -> bool + return self._add_to_ceph_s + + @property + def progress(self): + # type: () -> float + raise NotImplementedError() + + @property + def duration_str(self): + duration = time.time() - self.started_at + return "(%s)" % ( + to_pretty_timedelta(timedelta(seconds=duration))) + + @property + def failed(self): + return False + + @property + def failure_message(self): + return None + + def summary(self): + # type: () -> str + return "{0} {1} {2}".format(self.progress, self.message, + self.duration_str) + + def _progress_str(self, width): + inner_width = width - 2 + out = "[" + done_chars = int(self.progress * inner_width) + out += done_chars * '=' + out += (inner_width - done_chars) * '.' + out += "]" + + return out + + def twoline_progress(self, indent=4): + """ + e.g. + + - Eating my delicious strudel (since: 30s) + [===============..............] (remaining: 04m) + + """ + time_remaining = self.estimated_time_remaining() + if time_remaining: + remaining = "(remaining: %s)" % ( + to_pretty_timedelta(timedelta(seconds=time_remaining))) + else: + remaining = '' + return "{0} {1}\n{2}{3} {4}".format(self._message, + self.duration_str, + " " * indent, + self._progress_str(30), + remaining) + + def to_json(self): + # type: () -> Dict[str, Any] + return { + "id": self.id, + "message": self.message, + "duration": self.duration_str, + "refs": self._refs, + "progress": self.progress, + "started_at": self.started_at, + "time_remaining": self.estimated_time_remaining() + } + + def estimated_time_remaining(self): + elapsed = time.time() - self.started_at + progress = self.progress + if progress == 0.0: + return None + return int(elapsed * (1 - progress) / progress) + + +class GhostEvent(Event): + """ + The ghost of a completed event: these are the fields that we persist + after the event is complete. + """ + + def __init__(self, my_id, message, refs, add_to_ceph_s, started_at, finished_at=None, + failed=False, failure_message=None): + super().__init__(my_id, message, refs, add_to_ceph_s, started_at) + self.finished_at = finished_at if finished_at else time.time() + + if failed: + self._failed = True + self._failure_message = failure_message + else: + self._failed = False + + @property + def progress(self): + return 1.0 + + @property + def failed(self): + return self._failed + + @property + def failure_message(self): + return self._failure_message if self._failed else None + + def to_json(self): + d = { + "id": self.id, + "message": self.message, + "refs": self._refs, + "started_at": self.started_at, + "finished_at": self.finished_at, + "add_to_ceph_s:": self.add_to_ceph_s + } + if self._failed: + d["failed"] = True + d["failure_message"] = self._failure_message + return d + + +class GlobalRecoveryEvent(Event): + """ + An event whoese completion is determined by active+clean/total_pg_num + """ + + def __init__(self, message, refs, add_to_ceph_s, start_epoch, active_clean_num): + # type: (str, List[Any], bool, int, int) -> None + super().__init__(str(uuid.uuid4()), message, refs, add_to_ceph_s) + self._add_to_ceph_s = add_to_ceph_s + self._progress = 0.0 + self._start_epoch = start_epoch + self._active_clean_num = active_clean_num + self._refresh() + + def global_event_update_progress(self, log): + # type: (logging.Logger) -> None + "Update progress of Global Recovery Event" + global _module + assert _module + skipped_pgs = 0 + active_clean_pgs = _module.get("active_clean_pgs") + total_pg_num = active_clean_pgs["total_num_pgs"] + new_active_clean_pgs = active_clean_pgs["pg_stats"] + new_active_clean_num = len(new_active_clean_pgs) + for pg in new_active_clean_pgs: + # Disregard PGs that are not being reported + # if the states are active+clean. Since it is + # possible that some pgs might not have any movement + # even before the start of the event. + if pg['reported_epoch'] < self._start_epoch: + log.debug("Skipping pg {0} since reported_epoch {1} < start_epoch {2}" + .format(pg['pgid'], pg['reported_epoch'], self._start_epoch)) + skipped_pgs += 1 + continue + + if self._active_clean_num != new_active_clean_num: + # Have this case to know when need to update + # the progress + try: + # Might be that total_pg_num is 0 + self._progress = float(new_active_clean_num) / (total_pg_num - skipped_pgs) + except ZeroDivisionError: + self._progress = 0.0 + else: + # No need to update since there is no change + return + + log.debug("Updated progress to %s", self.summary()) + self._refresh() + + @property + def progress(self): + return self._progress + + +class RemoteEvent(Event): + """ + An event that was published by another module: we know nothing about + this, rely on the other module to continuously update us with + progress information as it emerges. + """ + + def __init__(self, my_id, message, refs, add_to_ceph_s): + # type: (str, str, List[str], bool) -> None + super().__init__(my_id, message, refs, add_to_ceph_s) + self._progress = 0.0 + self._failed = False + self._refresh() + + def set_progress(self, progress): + # type: (float) -> None + self._progress = progress + self._refresh() + + def set_failed(self, message): + self._progress = 1.0 + self._failed = True + self._failure_message = message + self._refresh() + + def set_message(self, message): + self._message = message + self._refresh() + + @property + def progress(self): + return self._progress + + @property + def failed(self): + return self._failed + + @property + def failure_message(self): + return self._failure_message if self._failed else None + + +class PgRecoveryEvent(Event): + """ + An event whose completion is determined by the recovery of a set of + PGs to a healthy state. + + Always call update() immediately after construction. + """ + + def __init__(self, message, refs, which_pgs, which_osds, start_epoch, add_to_ceph_s): + # type: (str, List[Any], List[PgId], List[str], int, bool) -> None + super().__init__(str(uuid.uuid4()), message, refs, add_to_ceph_s) + self._pgs = which_pgs + self._which_osds = which_osds + self._original_pg_count = len(self._pgs) + self._original_bytes_recovered = None # type: Optional[Dict[PgId, float]] + self._progress = 0.0 + + self._start_epoch = start_epoch + self._refresh() + + @property + def which_osds(self): + return self. _which_osds + + def pg_update(self, pg_progress: Dict, log: Any) -> None: + # FIXME: O(pg_num) in python + # Sanity check to see if there are any missing PGs and to assign + # empty array and dictionary if there hasn't been any recovery + pg_to_state: Dict[str, Any] = pg_progress["pgs"] + pg_ready: bool = pg_progress["pg_ready"] + + if self._original_bytes_recovered is None: + self._original_bytes_recovered = {} + missing_pgs = [] + for pg in self._pgs: + pg_str = str(pg) + if pg_str in pg_to_state: + self._original_bytes_recovered[pg] = \ + pg_to_state[pg_str]['num_bytes_recovered'] + else: + missing_pgs.append(pg) + if pg_ready: + for pg in missing_pgs: + self._pgs.remove(pg) + + complete_accumulate = 0.0 + + # Calculating progress as the number of PGs recovered divided by the + # original where partially completed PGs count for something + # between 0.0-1.0. This is perhaps less faithful than looking at the + # total number of bytes recovered, but it does a better job of + # representing the work still to do if there are a number of very + # few-bytes PGs that still need the housekeeping of their recovery + # to be done. This is subjective... + + complete = set() + for pg in self._pgs: + pg_str = str(pg) + try: + info = pg_to_state[pg_str] + except KeyError: + # The PG is gone! Probably a pool was deleted. Drop it. + complete.add(pg) + continue + # Only checks the state of each PGs when it's epoch >= the OSDMap's epoch + if info['reported_epoch'] < self._start_epoch: + continue + + state = info['state'] + + states = state.split("+") + + if "active" in states and "clean" in states: + complete.add(pg) + else: + if info['num_bytes'] == 0: + # Empty PGs are considered 0% done until they are + # in the correct state. + pass + else: + recovered = info['num_bytes_recovered'] + total_bytes = info['num_bytes'] + if total_bytes > 0: + ratio = float(recovered - + self._original_bytes_recovered[pg]) / \ + total_bytes + # Since the recovered bytes (over time) could perhaps + # exceed the contents of the PG (moment in time), we + # must clamp this + ratio = min(ratio, 1.0) + ratio = max(ratio, 0.0) + + else: + # Dataless PGs (e.g. containing only OMAPs) count + # as half done. + ratio = 0.5 + complete_accumulate += ratio + + self._pgs = list(set(self._pgs) ^ complete) + completed_pgs = self._original_pg_count - len(self._pgs) + completed_pgs = max(completed_pgs, 0) + try: + prog = (completed_pgs + complete_accumulate)\ + / self._original_pg_count + except ZeroDivisionError: + prog = 0.0 + + self._progress = min(max(prog, 0.0), 1.0) + + self._refresh() + log.info("Updated progress to %s", self.summary()) + + @property + def progress(self): + # type: () -> float + return self._progress + + +class PgId(object): + def __init__(self, pool_id, ps): + # type: (str, int) -> None + self.pool_id = pool_id + self.ps = ps + + def __cmp__(self, other): + return (self.pool_id, self.ps) == (other.pool_id, other.ps) + + def __lt__(self, other): + return (self.pool_id, self.ps) < (other.pool_id, other.ps) + + def __str__(self): + return "{0}.{1:x}".format(self.pool_id, self.ps) + + +class Module(MgrModule): + COMMANDS = [ + {"cmd": "progress", + "desc": "Show progress of recovery operations", + "perm": "r"}, + {"cmd": "progress json", + "desc": "Show machine readable progress information", + "perm": "r"}, + {"cmd": "progress clear", + "desc": "Reset progress tracking", + "perm": "rw"}, + {"cmd": "progress on", + "desc": "Enable progress tracking", + "perm": "rw"}, + {"cmd": "progress off", + "desc": "Disable progress tracking", + "perm": "rw"} + + ] + + MODULE_OPTIONS = [ + Option( + 'max_completed_events', + default=50, + type='int', + desc='number of past completed events to remember', + runtime=True + ), + Option( + 'sleep_interval', + default=5, + type='secs', + desc='how long the module is going to sleep', + runtime=True + ), + Option( + 'enabled', + default=True, + type='bool', + ) + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + + self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent, GlobalRecoveryEvent]] + self._completed_events = [] # type: List[GhostEvent] + + self._old_osd_map = None # type: Optional[OSDMap] + + self._ready = threading.Event() + self._shutdown = threading.Event() + + self._latest_osdmap = None # type: Optional[OSDMap] + + self._dirty = False + + global _module + _module = self + + # only for mypy + if TYPE_CHECKING: + self.max_completed_events = 0 + self.sleep_interval = 0 + self.enabled = True + + def config_notify(self): + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) + + def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked): + # type: (OSDMap, Dict, OSDMap, str, str) -> None + # A function that will create or complete an event when an + # OSD is marked in or out according to the affected PGs + affected_pgs = [] + for pool in old_dump['pools']: + pool_id = pool['pool'] # type: str + for ps in range(0, pool['pg_num']): + + # Was this OSD affected by the OSD coming in/out? + # Compare old and new osds using + # data from the json dump + old_up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps) + old_osds = set(old_up_acting['acting']) + new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps) + new_osds = set(new_up_acting['acting']) + + # Check the osd_id being in the acting set for both old + # and new maps to cover both out and in cases + was_on_out_or_in_osd = osd_id in old_osds or osd_id in new_osds + if not was_on_out_or_in_osd: + continue + + self.log.debug("pool_id, ps = {0}, {1}".format( + pool_id, ps + )) + + self.log.debug( + "old_up_acting: {0}".format(json.dumps(old_up_acting, indent=4, sort_keys=True))) + + # Has this OSD been assigned a new location? + # (it might not be if there is no suitable place to move + # after an OSD is marked in/out) + + is_relocated = old_osds != new_osds + + self.log.debug( + "new_up_acting: {0}".format(json.dumps(new_up_acting, + indent=4, + sort_keys=True))) + + if was_on_out_or_in_osd and is_relocated: + # This PG is now in motion, track its progress + affected_pgs.append(PgId(pool_id, ps)) + + # In the case that we ignored some PGs, log the reason why (we may + # not end up creating a progress event) + + self.log.warning("{0} PGs affected by osd.{1} being marked {2}".format( + len(affected_pgs), osd_id, marked)) + + # In the case of the osd coming back in, we might need to cancel + # previous recovery event for that osd + if marked == "in": + for ev_id in list(self._events): + try: + ev = self._events[ev_id] + if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds: + self.log.info("osd.{0} came back in, cancelling event".format( + osd_id + )) + self._complete(ev) + except KeyError: + self.log.warning("_osd_in_out: ev {0} does not exist".format(ev_id)) + + if len(affected_pgs) > 0: + r_ev = PgRecoveryEvent( + "Rebalancing after osd.{0} marked {1}".format(osd_id, marked), + refs=[("osd", osd_id)], + which_pgs=affected_pgs, + which_osds=[osd_id], + start_epoch=self.get_osdmap().get_epoch(), + add_to_ceph_s=False + ) + r_ev.pg_update(self.get("pg_progress"), self.log) + self._events[r_ev.id] = r_ev + + def _osdmap_changed(self, old_osdmap, new_osdmap): + # type: (OSDMap, OSDMap) -> None + old_dump = old_osdmap.dump() + new_dump = new_osdmap.dump() + + old_osds = dict([(o['osd'], o) for o in old_dump['osds']]) + + for osd in new_dump['osds']: + osd_id = osd['osd'] + new_weight = osd['in'] + if osd_id in old_osds: + old_weight = old_osds[osd_id]['in'] + + if new_weight == 0.0 and old_weight > new_weight: + self.log.warning("osd.{0} marked out".format(osd_id)) + self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "out") + elif new_weight >= 1.0 and old_weight == 0.0: + # Only consider weight>=1.0 as "in" to avoid spawning + # individual recovery events on every adjustment + # in a gradual weight-in + self.log.warning("osd.{0} marked in".format(osd_id)) + self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in") + + def _pg_state_changed(self): + + # This function both constructs and updates + # the global recovery event if one of the + # PGs is not at active+clean state + active_clean_pgs = self.get("active_clean_pgs") + total_pg_num = active_clean_pgs["total_num_pgs"] + active_clean_num = len(active_clean_pgs["pg_stats"]) + try: + # There might be a case where there is no pg_num + progress = float(active_clean_num) / total_pg_num + except ZeroDivisionError: + return + if progress < 1.0: + self.log.warning(("Starting Global Recovery Event," + "%d pgs not in active + clean state"), + total_pg_num - active_clean_num) + ev = GlobalRecoveryEvent("Global Recovery Event", + refs=[("global", "")], + add_to_ceph_s=True, + start_epoch=self.get_osdmap().get_epoch(), + active_clean_num=active_clean_num) + ev.global_event_update_progress(self.log) + self._events[ev.id] = ev + + def _process_osdmap(self): + old_osdmap = self._latest_osdmap + self._latest_osdmap = self.get_osdmap() + assert old_osdmap + assert self._latest_osdmap + self.log.info(("Processing OSDMap change %d..%d"), + old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()) + + self._osdmap_changed(old_osdmap, self._latest_osdmap) + + def _process_pg_summary(self): + # if there are no events we will skip this here to avoid + # expensive get calls + if len(self._events) == 0: + return + + global_event = False + data = self.get("pg_progress") + for ev_id in list(self._events): + try: + ev = self._events[ev_id] + # Check for types of events + # we have to update + if isinstance(ev, PgRecoveryEvent): + ev.pg_update(data, self.log) + self.maybe_complete(ev) + elif isinstance(ev, GlobalRecoveryEvent): + global_event = True + ev.global_event_update_progress(self.log) + self.maybe_complete(ev) + except KeyError: + self.log.warning("_process_pg_summary: ev {0} does not exist".format(ev_id)) + continue + + if not global_event: + # If there is no global event + # we create one + self._pg_state_changed() + + def maybe_complete(self, event): + # type: (Event) -> None + if event.progress >= 1.0: + self._complete(event) + + def _save(self): + self.log.info("Writing back {0} completed events".format( + len(self._completed_events) + )) + # TODO: bound the number we store. + encoded = json.dumps({ + "events": [ev.to_json() for ev in self._completed_events], + "version": ENCODING_VERSION, + "compat_version": ENCODING_VERSION + }) + self.set_store("completed", encoded) + + def _load(self): + stored = self.get_store("completed") + + if stored is None: + self.log.info("No stored events to load") + return + + decoded = json.loads(stored) + if decoded['compat_version'] > ENCODING_VERSION: + raise RuntimeError("Cannot decode version {0}".format( + decoded['compat_version'])) + + if decoded['compat_version'] < ENCODING_VERSION: + # we need to add the "started_at" and "finished_at" attributes to the events + for ev in decoded['events']: + ev['started_at'] = None + ev['finished_at'] = None + + for ev in decoded['events']: + self._completed_events.append(GhostEvent(ev['id'], ev['message'], + ev['refs'], ev['started_at'], + ev['finished_at'], + ev.get('failed', False), + ev.get('failure_message'))) + + self._prune_completed_events() + + def _prune_completed_events(self): + length = len(self._completed_events) + if length > self.max_completed_events: + self._completed_events = self._completed_events[length - self.max_completed_events : length] + self._dirty = True + + def serve(self): + self.config_notify() + self.clear_all_progress_events() + self.log.info("Loading...") + + self._load() + self.log.info("Loaded {0} historic events".format(self._completed_events)) + + self._latest_osdmap = self.get_osdmap() + self.log.info("Loaded OSDMap, ready.") + + self._ready.set() + + while not self._shutdown.is_set(): + # Lazy periodic write back of completed events + if self._dirty: + self._save() + self._dirty = False + + if self.enabled: + self._process_osdmap() + self._process_pg_summary() + + self._shutdown.wait(timeout=self.sleep_interval) + + self._shutdown.wait() + + def shutdown(self): + self._shutdown.set() + self.clear_all_progress_events() + + def update(self, ev_id, ev_msg, ev_progress, refs=None, add_to_ceph_s=False): + # type: (str, str, float, Optional[list], bool) -> None + """ + For calling from other mgr modules + """ + if not self.enabled: + return + + if refs is None: + refs = [] + try: + ev = self._events[ev_id] + assert isinstance(ev, RemoteEvent) + except KeyError: + # if key doesn't exist we create an event + ev = RemoteEvent(ev_id, ev_msg, refs, add_to_ceph_s) + self._events[ev_id] = ev + self.log.info("update: starting ev {0} ({1})".format( + ev_id, ev_msg)) + else: + self.log.debug("update: {0} on {1}".format( + ev_progress, ev_msg)) + + ev.set_progress(ev_progress) + ev.set_message(ev_msg) + + def _complete(self, ev): + # type: (Event) -> None + duration = (time.time() - ev.started_at) + self.log.info("Completed event {0} ({1}) in {2} seconds".format( + ev.id, ev.message, int(round(duration)) + )) + self.complete_progress_event(ev.id) + + self._completed_events.append( + GhostEvent(ev.id, ev.message, ev.refs, ev.add_to_ceph_s, ev.started_at, + failed=ev.failed, failure_message=ev.failure_message)) + assert ev.id + del self._events[ev.id] + self._prune_completed_events() + self._dirty = True + + def complete(self, ev_id): + """ + For calling from other mgr modules + """ + if not self.enabled: + return + try: + ev = self._events[ev_id] + assert isinstance(ev, RemoteEvent) + ev.set_progress(1.0) + self.log.info("complete: finished ev {0} ({1})".format(ev_id, + ev.message)) + self._complete(ev) + except KeyError: + self.log.warning("complete: ev {0} does not exist".format(ev_id)) + pass + + def fail(self, ev_id, message): + """ + For calling from other mgr modules to mark an event as failed (and + complete) + """ + try: + ev = self._events[ev_id] + assert isinstance(ev, RemoteEvent) + ev.set_failed(message) + self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id, + ev.message, + message)) + self._complete(ev) + except KeyError: + self.log.warning("fail: ev {0} does not exist".format(ev_id)) + + def on(self): + self.set_module_option('enabled', "true") + + def off(self): + self.set_module_option('enabled', "false") + + def _handle_ls(self): + if len(self._events) or len(self._completed_events): + out = "" + chrono_order = sorted(self._events.values(), + key=lambda x: x.started_at, reverse=True) + for ev in chrono_order: + out += ev.twoline_progress() + out += "\n" + + if len(self._completed_events): + # TODO: limit number of completed events to show + out += "\n" + for ghost_ev in self._completed_events: + out += "[{0}]: {1}\n".format("Complete" if not ghost_ev.failed else "Failed", + ghost_ev.twoline_progress()) + + return 0, out, "" + else: + return 0, "", "Nothing in progress" + + def _json(self): + return { + 'events': [ev.to_json() for ev in self._events.values()], + 'completed': [ev.to_json() for ev in self._completed_events] + } + + def clear(self): + self._events = {} + self._completed_events = [] + self._dirty = True + self._save() + self.clear_all_progress_events() + + def _handle_clear(self): + self.clear() + return 0, "", "" + + def handle_command(self, _, cmd): + if cmd['prefix'] == "progress": + return self._handle_ls() + elif cmd['prefix'] == "progress clear": + # The clear command isn't usually needed - it's to enable + # the admin to "kick" this module if it seems to have done + # something wrong (e.g. we have a bug causing a progress event + # that never finishes) + return self._handle_clear() + elif cmd['prefix'] == "progress json": + return 0, json.dumps(self._json(), indent=4, sort_keys=True), "" + elif cmd['prefix'] == "progress on": + if self.enabled: + return 0, "", "progress already enabled!" + self.on() + return 0, "", "progress enabled" + elif cmd['prefix'] == "progress off": + if not self.enabled: + return 0, "", "progress already disabled!" + self.off() + self.clear() + return 0, "", "progress disabled" + else: + raise NotImplementedError(cmd['prefix']) |