diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/insights/health.py | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/src/pybind/mgr/insights/health.py b/src/pybind/mgr/insights/health.py new file mode 100644 index 00000000..5235ca84 --- /dev/null +++ b/src/pybind/mgr/insights/health.py @@ -0,0 +1,191 @@ +import json +import six +from collections import defaultdict +import datetime + +# freq to write cached state to disk +PERSIST_PERIOD = datetime.timedelta(seconds = 10) +# on disk key prefix +HEALTH_HISTORY_KEY_PREFIX = "health_history/" +# apply on offset to "now": used for testing +NOW_OFFSET = None + +class HealthEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + +class HealthCheckAccumulator(object): + """ + Deuplicated storage of health checks. + """ + def __init__(self, init_checks = None): + # check : severity : { summary, detail } + # summary and detail are deduplicated + self._checks = defaultdict(lambda: + defaultdict(lambda: { + "summary": set(), + "detail": set() + })) + + if init_checks: + self._update(init_checks) + + def __str__(self): + return "check count {}".format(len(self._checks)) + + def add(self, checks): + """ + Add health checks to the current state + + Returns: + bool: True if the state changed, False otherwise. + """ + changed = False + + for check, info in six.iteritems(checks): + + # only keep the icky stuff + severity = info["severity"] + if severity == "HEALTH_OK": + continue + + summary = info["summary"]["message"] + details = map(lambda d: d["message"], info["detail"]) + + if self._add_check(check, severity, [summary], details): + changed = True + + return changed + + def checks(self): + return self._checks + + def merge(self, other): + assert isinstance(other, HealthCheckAccumulator) + self._update(other._checks) + + def _update(self, checks): + """Merge checks with same structure. Does not set dirty bit""" + for check in checks: + for severity in checks[check]: + summaries = set(checks[check][severity]["summary"]) + details = set(checks[check][severity]["detail"]) + self._add_check(check, severity, summaries, details) + + def _add_check(self, check, severity, summaries, details): + changed = False + + for summary in summaries: + if summary not in self._checks[check][severity]["summary"]: + changed = True + self._checks[check][severity]["summary"].add(summary) + + for detail in details: + if detail not in self._checks[check][severity]["detail"]: + changed = True + self._checks[check][severity]["detail"].add(detail) + + return changed + +class HealthHistorySlot(object): + """ + Manage the life cycle of a health history time slot. + + A time slot is a fixed slice of wall clock time (e.g. every hours, from :00 + to :59), and all health updates that occur during this time are deduplicated + together. A slot is initially in a clean state, and becomes dirty when a new + health check is observed. The state of a slot should be persisted when + need_flush returns true. Once the state has been flushed, reset the dirty + bit by calling mark_flushed. + """ + def __init__(self, init_health = dict()): + self._checks = HealthCheckAccumulator(init_health.get("checks")) + self._slot = self._curr_slot() + self._next_flush = None + + def __str__(self): + return "key {} next flush {} checks {}".format( + self.key(), self._next_flush, self._checks) + + def health(self): + return dict(checks = self._checks.checks()) + + def key(self): + """Identifier in the persist store""" + return self._key(self._slot) + + def expired(self): + """True if this slot is the current slot, False otherwise""" + return self._slot != self._curr_slot() + + def need_flush(self): + """True if this slot needs to be flushed, False otherwise""" + now = HealthHistorySlot._now() + if self._next_flush is not None: + if self._next_flush <= now or self.expired(): + return True + return False + + def mark_flushed(self): + """Reset the dirty bit. Caller persists state""" + assert self._next_flush + self._next_flush = None + + def add(self, health): + """ + Add health to the underlying health accumulator. When the slot + transitions from clean to dirty a target flush time is computed. + """ + changed = self._checks.add(health["checks"]) + if changed and not self._next_flush: + self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD + return changed + + def merge(self, other): + assert isinstance(other, HealthHistorySlot) + self._checks.merge(other._checks) + + @staticmethod + def key_range(hours): + """Return the time slot keys for the past N hours""" + def inner(curr, hours): + slot = curr - datetime.timedelta(hours = hours) + return HealthHistorySlot._key(slot) + curr = HealthHistorySlot._curr_slot() + return map(lambda i: inner(curr, i), range(hours)) + + @staticmethod + def curr_key(): + """Key for the current UTC time slot""" + return HealthHistorySlot._key(HealthHistorySlot._curr_slot()) + + @staticmethod + def key_to_time(key): + """Return key converted into datetime""" + timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):] + return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H") + + @staticmethod + def _key(dt): + """Key format. Example: health_2018_11_05_00""" + return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H") + + @staticmethod + def _now(): + """Control now time for easier testing""" + now = datetime.datetime.utcnow() + if NOW_OFFSET is not None: + now = now + NOW_OFFSET + return now + + @staticmethod + def _curr_slot(): + """Slot for the current UTC time""" + dt = HealthHistorySlot._now() + return datetime.datetime( + year = dt.year, + month = dt.month, + day = dt.day, + hour = dt.hour) |