summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/insights/health.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/insights/health.py')
-rw-r--r--src/pybind/mgr/insights/health.py191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/pybind/mgr/insights/health.py b/src/pybind/mgr/insights/health.py
new file mode 100644
index 00000000..5235ca84
--- /dev/null
+++ b/src/pybind/mgr/insights/health.py
@@ -0,0 +1,191 @@
+import json
+import six
+from collections import defaultdict
+import datetime
+
+# freq to write cached state to disk
+PERSIST_PERIOD = datetime.timedelta(seconds = 10)
+# on disk key prefix
+HEALTH_HISTORY_KEY_PREFIX = "health_history/"
+# apply on offset to "now": used for testing
+NOW_OFFSET = None
+
+class HealthEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, set):
+ return list(obj)
+ return json.JSONEncoder.default(self, obj)
+
+class HealthCheckAccumulator(object):
+ """
+ Deuplicated storage of health checks.
+ """
+ def __init__(self, init_checks = None):
+ # check : severity : { summary, detail }
+ # summary and detail are deduplicated
+ self._checks = defaultdict(lambda:
+ defaultdict(lambda: {
+ "summary": set(),
+ "detail": set()
+ }))
+
+ if init_checks:
+ self._update(init_checks)
+
+ def __str__(self):
+ return "check count {}".format(len(self._checks))
+
+ def add(self, checks):
+ """
+ Add health checks to the current state
+
+ Returns:
+ bool: True if the state changed, False otherwise.
+ """
+ changed = False
+
+ for check, info in six.iteritems(checks):
+
+ # only keep the icky stuff
+ severity = info["severity"]
+ if severity == "HEALTH_OK":
+ continue
+
+ summary = info["summary"]["message"]
+ details = map(lambda d: d["message"], info["detail"])
+
+ if self._add_check(check, severity, [summary], details):
+ changed = True
+
+ return changed
+
+ def checks(self):
+ return self._checks
+
+ def merge(self, other):
+ assert isinstance(other, HealthCheckAccumulator)
+ self._update(other._checks)
+
+ def _update(self, checks):
+ """Merge checks with same structure. Does not set dirty bit"""
+ for check in checks:
+ for severity in checks[check]:
+ summaries = set(checks[check][severity]["summary"])
+ details = set(checks[check][severity]["detail"])
+ self._add_check(check, severity, summaries, details)
+
+ def _add_check(self, check, severity, summaries, details):
+ changed = False
+
+ for summary in summaries:
+ if summary not in self._checks[check][severity]["summary"]:
+ changed = True
+ self._checks[check][severity]["summary"].add(summary)
+
+ for detail in details:
+ if detail not in self._checks[check][severity]["detail"]:
+ changed = True
+ self._checks[check][severity]["detail"].add(detail)
+
+ return changed
+
+class HealthHistorySlot(object):
+ """
+ Manage the life cycle of a health history time slot.
+
+ A time slot is a fixed slice of wall clock time (e.g. every hours, from :00
+ to :59), and all health updates that occur during this time are deduplicated
+ together. A slot is initially in a clean state, and becomes dirty when a new
+ health check is observed. The state of a slot should be persisted when
+ need_flush returns true. Once the state has been flushed, reset the dirty
+ bit by calling mark_flushed.
+ """
+ def __init__(self, init_health = dict()):
+ self._checks = HealthCheckAccumulator(init_health.get("checks"))
+ self._slot = self._curr_slot()
+ self._next_flush = None
+
+ def __str__(self):
+ return "key {} next flush {} checks {}".format(
+ self.key(), self._next_flush, self._checks)
+
+ def health(self):
+ return dict(checks = self._checks.checks())
+
+ def key(self):
+ """Identifier in the persist store"""
+ return self._key(self._slot)
+
+ def expired(self):
+ """True if this slot is the current slot, False otherwise"""
+ return self._slot != self._curr_slot()
+
+ def need_flush(self):
+ """True if this slot needs to be flushed, False otherwise"""
+ now = HealthHistorySlot._now()
+ if self._next_flush is not None:
+ if self._next_flush <= now or self.expired():
+ return True
+ return False
+
+ def mark_flushed(self):
+ """Reset the dirty bit. Caller persists state"""
+ assert self._next_flush
+ self._next_flush = None
+
+ def add(self, health):
+ """
+ Add health to the underlying health accumulator. When the slot
+ transitions from clean to dirty a target flush time is computed.
+ """
+ changed = self._checks.add(health["checks"])
+ if changed and not self._next_flush:
+ self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD
+ return changed
+
+ def merge(self, other):
+ assert isinstance(other, HealthHistorySlot)
+ self._checks.merge(other._checks)
+
+ @staticmethod
+ def key_range(hours):
+ """Return the time slot keys for the past N hours"""
+ def inner(curr, hours):
+ slot = curr - datetime.timedelta(hours = hours)
+ return HealthHistorySlot._key(slot)
+ curr = HealthHistorySlot._curr_slot()
+ return map(lambda i: inner(curr, i), range(hours))
+
+ @staticmethod
+ def curr_key():
+ """Key for the current UTC time slot"""
+ return HealthHistorySlot._key(HealthHistorySlot._curr_slot())
+
+ @staticmethod
+ def key_to_time(key):
+ """Return key converted into datetime"""
+ timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):]
+ return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H")
+
+ @staticmethod
+ def _key(dt):
+ """Key format. Example: health_2018_11_05_00"""
+ return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H")
+
+ @staticmethod
+ def _now():
+ """Control now time for easier testing"""
+ now = datetime.datetime.utcnow()
+ if NOW_OFFSET is not None:
+ now = now + NOW_OFFSET
+ return now
+
+ @staticmethod
+ def _curr_slot():
+ """Slot for the current UTC time"""
+ dt = HealthHistorySlot._now()
+ return datetime.datetime(
+ year = dt.year,
+ month = dt.month,
+ day = dt.day,
+ hour = dt.hour)