summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/insights
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/insights
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/insights')
-rw-r--r--src/pybind/mgr/insights/__init__.py6
-rw-r--r--src/pybind/mgr/insights/health.py195
-rw-r--r--src/pybind/mgr/insights/module.py321
-rw-r--r--src/pybind/mgr/insights/tests/__init__.py0
-rw-r--r--src/pybind/mgr/insights/tests/test_health.py275
5 files changed, 797 insertions, 0 deletions
diff --git a/src/pybind/mgr/insights/__init__.py b/src/pybind/mgr/insights/__init__.py
new file mode 100644
index 000000000..99e806328
--- /dev/null
+++ b/src/pybind/mgr/insights/__init__.py
@@ -0,0 +1,6 @@
+import os
+
+if 'UNITTEST' in os.environ:
+ import tests
+
+from .module import Module
diff --git a/src/pybind/mgr/insights/health.py b/src/pybind/mgr/insights/health.py
new file mode 100644
index 000000000..1deb5d3a6
--- /dev/null
+++ b/src/pybind/mgr/insights/health.py
@@ -0,0 +1,195 @@
+import json
+from collections import defaultdict
+import datetime
+
+# freq to write cached state to disk
+PERSIST_PERIOD = datetime.timedelta(seconds=10)
+# on disk key prefix
+HEALTH_HISTORY_KEY_PREFIX = "health_history/"
+# apply on offset to "now": used for testing
+NOW_OFFSET = None
+
+
+class HealthEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, set):
+ return list(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+class HealthCheckAccumulator(object):
+ """
+ Deuplicated storage of health checks.
+ """
+
+ def __init__(self, init_checks=None):
+ # check : severity : { summary, detail }
+ # summary and detail are deduplicated
+ self._checks = defaultdict(lambda:
+ defaultdict(lambda: {
+ "summary": set(),
+ "detail": set()
+ }))
+
+ if init_checks:
+ self._update(init_checks)
+
+ def __str__(self):
+ return "check count {}".format(len(self._checks))
+
+ def add(self, checks):
+ """
+ Add health checks to the current state
+
+ Returns:
+ bool: True if the state changed, False otherwise.
+ """
+ changed = False
+
+ for check, info in checks.items():
+
+ # only keep the icky stuff
+ severity = info["severity"]
+ if severity == "HEALTH_OK":
+ continue
+
+ summary = info["summary"]["message"]
+ details = map(lambda d: d["message"], info["detail"])
+
+ if self._add_check(check, severity, [summary], details):
+ changed = True
+
+ return changed
+
+ def checks(self):
+ return self._checks
+
+ def merge(self, other):
+ assert isinstance(other, HealthCheckAccumulator)
+ self._update(other._checks)
+
+ def _update(self, checks):
+ """Merge checks with same structure. Does not set dirty bit"""
+ for check in checks:
+ for severity in checks[check]:
+ summaries = set(checks[check][severity]["summary"])
+ details = set(checks[check][severity]["detail"])
+ self._add_check(check, severity, summaries, details)
+
+ def _add_check(self, check, severity, summaries, details):
+ changed = False
+
+ for summary in summaries:
+ if summary not in self._checks[check][severity]["summary"]:
+ changed = True
+ self._checks[check][severity]["summary"].add(summary)
+
+ for detail in details:
+ if detail not in self._checks[check][severity]["detail"]:
+ changed = True
+ self._checks[check][severity]["detail"].add(detail)
+
+ return changed
+
+
+class HealthHistorySlot(object):
+ """
+ Manage the life cycle of a health history time slot.
+
+ A time slot is a fixed slice of wall clock time (e.g. every hours, from :00
+ to :59), and all health updates that occur during this time are deduplicated
+ together. A slot is initially in a clean state, and becomes dirty when a new
+ health check is observed. The state of a slot should be persisted when
+ need_flush returns true. Once the state has been flushed, reset the dirty
+ bit by calling mark_flushed.
+ """
+
+ def __init__(self, init_health=dict()):
+ self._checks = HealthCheckAccumulator(init_health.get("checks"))
+ self._slot = self._curr_slot()
+ self._next_flush = None
+
+ def __str__(self):
+ return "key {} next flush {} checks {}".format(
+ self.key(), self._next_flush, self._checks)
+
+ def health(self):
+ return dict(checks=self._checks.checks())
+
+ def key(self):
+ """Identifier in the persist store"""
+ return self._key(self._slot)
+
+ def expired(self):
+ """True if this slot is the current slot, False otherwise"""
+ return self._slot != self._curr_slot()
+
+ def need_flush(self):
+ """True if this slot needs to be flushed, False otherwise"""
+ now = HealthHistorySlot._now()
+ if self._next_flush is not None:
+ if self._next_flush <= now or self.expired():
+ return True
+ return False
+
+ def mark_flushed(self):
+ """Reset the dirty bit. Caller persists state"""
+ assert self._next_flush
+ self._next_flush = None
+
+ def add(self, health):
+ """
+ Add health to the underlying health accumulator. When the slot
+ transitions from clean to dirty a target flush time is computed.
+ """
+ changed = self._checks.add(health["checks"])
+ if changed and not self._next_flush:
+ self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD
+ return changed
+
+ def merge(self, other):
+ assert isinstance(other, HealthHistorySlot)
+ self._checks.merge(other._checks)
+
+ @staticmethod
+ def key_range(hours):
+ """Return the time slot keys for the past N hours"""
+ def inner(curr, hours):
+ slot = curr - datetime.timedelta(hours=hours)
+ return HealthHistorySlot._key(slot)
+ curr = HealthHistorySlot._curr_slot()
+ return map(lambda i: inner(curr, i), range(hours))
+
+ @staticmethod
+ def curr_key():
+ """Key for the current UTC time slot"""
+ return HealthHistorySlot._key(HealthHistorySlot._curr_slot())
+
+ @staticmethod
+ def key_to_time(key):
+ """Return key converted into datetime"""
+ timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):]
+ return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H")
+
+ @staticmethod
+ def _key(dt):
+ """Key format. Example: health_2018_11_05_00"""
+ return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H")
+
+ @staticmethod
+ def _now():
+ """Control now time for easier testing"""
+ now = datetime.datetime.utcnow()
+ if NOW_OFFSET is not None:
+ now = now + NOW_OFFSET
+ return now
+
+ @staticmethod
+ def _curr_slot():
+ """Slot for the current UTC time"""
+ dt = HealthHistorySlot._now()
+ return datetime.datetime(
+ year=dt.year,
+ month=dt.month,
+ day=dt.day,
+ hour=dt.hour)
diff --git a/src/pybind/mgr/insights/module.py b/src/pybind/mgr/insights/module.py
new file mode 100644
index 000000000..5e891069e
--- /dev/null
+++ b/src/pybind/mgr/insights/module.py
@@ -0,0 +1,321 @@
+import datetime
+import json
+import re
+import threading
+
+from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
+from mgr_module import MgrModule, CommandResult, NotifyType
+from . import health as health_util
+
+# hours of crash history to report
+CRASH_HISTORY_HOURS = 24
+# hours of health history to report
+HEALTH_HISTORY_HOURS = 24
+# how many hours of health history to keep
+HEALTH_RETENTION_HOURS = 30
+# health check name for insights health
+INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
+# version tag for persistent data format
+ON_DISK_VERSION = 1
+
+
+class Module(MgrModule):
+
+ NOTIFY_TYPES = [NotifyType.health]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+
+ self._shutdown = False
+ self._evt = threading.Event()
+
+ # health history tracking
+ self._pending_health = []
+ self._health_slot = None
+ self._store = {}
+
+ # The following three functions, get_store, set_store, and get_store_prefix
+ # mask the functions defined in the parent to avoid storing large keys
+ # persistently to disk as that was proving problematic. Long term we may
+ # implement a different mechanism to make these persistent. When that day
+ # comes it should just be a matter of deleting these three functions.
+ def get_store(self, key):
+ return self._store.get(key)
+
+ def set_store(self, key, value):
+ if value is None:
+ if key in self._store:
+ del self._store[key]
+ else:
+ self._store[key] = value
+
+ def get_store_prefix(self, prefix):
+ return { k: v for k, v in self._store.items() if k.startswith(prefix) }
+
+
+ def notify(self, ttype: NotifyType, ident):
+ """Queue updates for processing"""
+ if ttype == NotifyType.health:
+ self.log.info("Received health check update {} pending".format(
+ len(self._pending_health)))
+ health = json.loads(self.get("health")["json"])
+ self._pending_health.append(health)
+ self._evt.set()
+
+ def serve(self):
+ self._health_reset()
+ while True:
+ self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
+ self._evt.clear()
+ if self._shutdown:
+ break
+
+ # when the current health slot expires, finalize it by flushing it to
+ # the store, and initializing a new empty slot.
+ if self._health_slot.expired():
+ self.log.info("Health history slot expired {}".format(
+ self._health_slot))
+ self._health_maybe_flush()
+ self._health_reset()
+ self._health_prune_history(HEALTH_RETENTION_HOURS)
+
+ # fold in pending health snapshots and flush
+ self.log.info("Applying {} health updates to slot {}".format(
+ len(self._pending_health), self._health_slot))
+ for health in self._pending_health:
+ self._health_slot.add(health)
+ self._pending_health = []
+ self._health_maybe_flush()
+
+ def shutdown(self):
+ self._shutdown = True
+ self._evt.set()
+
+ def _health_reset(self):
+ """Initialize the current health slot
+
+ The slot will be initialized with any state found to have already been
+ persisted, otherwise the slot will start empty.
+ """
+ key = health_util.HealthHistorySlot.curr_key()
+ data = self.get_store(key)
+ if data:
+ init_health = json.loads(data)
+ self._health_slot = health_util.HealthHistorySlot(init_health)
+ else:
+ self._health_slot = health_util.HealthHistorySlot()
+ self.log.info("Reset curr health slot {}".format(self._health_slot))
+
+ def _health_maybe_flush(self):
+ """Store the health for the current time slot if needed"""
+
+ self.log.info("Maybe flushing slot {} needed {}".format(
+ self._health_slot, self._health_slot.need_flush()))
+
+ if self._health_slot.need_flush():
+ key = self._health_slot.key()
+
+ # build store data entry
+ slot = self._health_slot.health()
+ assert "version" not in slot
+ slot.update(dict(version=ON_DISK_VERSION))
+ data = json.dumps(slot, cls=health_util.HealthEncoder)
+
+ self.log.debug("Storing health key {} data {}".format(
+ key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
+
+ self.set_store(key, data)
+ self._health_slot.mark_flushed()
+
+ def _health_filter(self, f):
+ """Filter hourly health reports timestamp"""
+ matches = filter(
+ lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
+ self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX).items())
+ return map(lambda t: t[0], matches)
+
+ def _health_prune_history(self, hours):
+ """Prune old health entries"""
+ cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
+ for key in self._health_filter(lambda ts: ts <= cutoff):
+ self.log.info("Removing old health slot key {}".format(key))
+ self.set_store(key, None)
+ if not hours:
+ self._health_slot = health_util.HealthHistorySlot()
+
+ def _health_report(self, hours):
+ """
+ Report a consolidated health report for the past N hours.
+ """
+ # roll up the past N hours of health info
+ collector = health_util.HealthHistorySlot()
+ keys = health_util.HealthHistorySlot.key_range(hours)
+ for key in keys:
+ data = self.get_store(key)
+ self.log.info("Reporting health key {} found {}".format(
+ key, bool(data)))
+ health = json.loads(data) if data else {}
+ slot = health_util.HealthHistorySlot(health)
+ collector.merge(slot)
+
+ # include history that hasn't yet been flushed
+ collector.merge(self._health_slot)
+
+ return dict(
+ current=json.loads(self.get("health")["json"]),
+ history=collector.health()
+ )
+
+ def _version_parse(self, version):
+ """
+ Return the components of a Ceph version string.
+
+ This returns nothing when the version string cannot be parsed into its
+ constituent components, such as when Ceph has been built with
+ ENABLE_GIT_VERSION=OFF.
+ """
+ r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
+ m = re.match(r, version)
+ ver = {} if not m else {
+ "release": m.group("release"),
+ "major": m.group("major"),
+ "minor": m.group("minor")
+ }
+ return {k: int(v) for k, v in ver.items()}
+
+ def _crash_history(self, hours):
+ """
+ Load crash history for the past N hours from the crash module.
+ """
+ result = dict(
+ summary={},
+ hours=hours,
+ )
+
+ health_check_details = []
+
+ try:
+ _, _, crashes = self.remote("crash", "do_json_report", hours)
+ result["summary"] = json.loads(crashes)
+ except Exception as e:
+ errmsg = "failed to invoke crash module"
+ self.log.warning("{}: {}".format(errmsg, str(e)))
+ health_check_details.append(errmsg)
+ else:
+ self.log.debug("Crash module invocation succeeded {}".format(
+ json.dumps(result["summary"], indent=2)))
+
+ return result, health_check_details
+
+ def _apply_osd_stats(self, osd_map):
+ # map from osd id to its index in the map structure
+ osd_id_to_idx = {}
+ for idx in range(len(osd_map["osds"])):
+ osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx
+
+ # include stats, including space utilization performance counters.
+ # adapted from dashboard api controller
+ for s in self.get('osd_stats')['osd_stats']:
+ try:
+ idx = osd_id_to_idx[s["osd"]]
+ osd_map["osds"][idx].update({'osd_stats': s})
+ except KeyError as e:
+ self.log.warning("inconsistent api state: {}".format(str(e)))
+
+ for osd in osd_map["osds"]:
+ osd['stats'] = {}
+ for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
+ osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s)
+
+ def _config_dump(self):
+ """Report cluster configuration
+
+ This report is the standard `config dump` report. It does not include
+ configuration defaults; these can be inferred from the version number.
+ """
+ result = CommandResult("")
+ args = dict(prefix="config dump", format="json")
+ self.send_command(result, "mon", "", json.dumps(args), "")
+ ret, outb, outs = result.wait()
+ if ret == 0:
+ return json.loads(outb), []
+ else:
+ self.log.warning("send_command 'config dump' failed. \
+ ret={}, outs=\"{}\"".format(ret, outs))
+ return [], ["Failed to read monitor config dump"]
+
+ @CLIReadCommand('insights')
+ def do_report(self):
+ '''
+ Retrieve insights report
+ '''
+ health_check_details = []
+ report = {}
+
+ report.update({
+ "version": dict(full=self.version,
+ **self._version_parse(self.version))
+ })
+
+ # crash history
+ crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
+ report["crashes"] = crashes
+ health_check_details.extend(health_details)
+
+ # health history
+ report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
+
+ # cluster configuration
+ config, health_details = self._config_dump()
+ report["config"] = config
+ health_check_details.extend(health_details)
+
+ osd_map = self.get("osd_map")
+ del osd_map['pg_temp']
+ self._apply_osd_stats(osd_map)
+ report["osd_dump"] = osd_map
+
+ report["df"] = self.get("df")
+ report["osd_tree"] = self.get("osd_map_tree")
+ report["fs_map"] = self.get("fs_map")
+ report["crush_map"] = self.get("osd_map_crush")
+ report["mon_map"] = self.get("mon_map")
+ report["service_map"] = self.get("service_map")
+ report["manager_map"] = self.get("mgr_map")
+ report["mon_status"] = json.loads(self.get("mon_status")["json"])
+ report["pg_summary"] = self.get("pg_summary")
+ report["osd_metadata"] = self.get("osd_metadata")
+
+ report.update({
+ "errors": health_check_details
+ })
+
+ if health_check_details:
+ self.set_health_checks({
+ INSIGHTS_HEALTH_CHECK: {
+ "severity": "warning",
+ "summary": "Generated incomplete Insights report",
+ "detail": health_check_details
+ }
+ })
+
+ result = json.dumps(report, indent=2, cls=health_util.HealthEncoder)
+ return HandleCommandResult(stdout=result)
+
+ @CLICommand('insights prune-health')
+ def do_prune_health(self, hours: int):
+ '''
+ Remove health history older than <hours> hours
+ '''
+ self._health_prune_history(hours)
+ return HandleCommandResult()
+
+ def testing_set_now_time_offset(self, hours):
+ """
+ Control what "now" time it is by applying an offset. This is called from
+ the selftest module to manage testing scenarios related to tracking
+ health history.
+ """
+ hours = int(hours)
+ health_util.NOW_OFFSET = datetime.timedelta(hours=hours)
+ self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))
diff --git a/src/pybind/mgr/insights/tests/__init__.py b/src/pybind/mgr/insights/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/insights/tests/__init__.py
diff --git a/src/pybind/mgr/insights/tests/test_health.py b/src/pybind/mgr/insights/tests/test_health.py
new file mode 100644
index 000000000..06e6454f9
--- /dev/null
+++ b/src/pybind/mgr/insights/tests/test_health.py
@@ -0,0 +1,275 @@
+import unittest
+from tests import mock
+from ..health import *
+
+
+class HealthChecksTest(unittest.TestCase):
+ def test_check_accum_empty(self):
+ # health checks accum initially empty reports empty
+ h = HealthCheckAccumulator()
+ self.assertEqual(h.checks(), {})
+
+ h = HealthCheckAccumulator({})
+ self.assertEqual(h.checks(), {})
+
+ def _get_init_checks(self):
+ return HealthCheckAccumulator({
+ "C0": {
+ "S0": {
+ "summary": ["s0", "s1"],
+ "detail": ("d0", "d1")
+ }
+ }
+ })
+
+ def test_check_init(self):
+ # initialization with lists and tuples is OK
+ h = self._get_init_checks()
+ self.assertEqual(h.checks(), {
+ "C0": {
+ "S0": {
+ "summary": set(["s0", "s1"]),
+ "detail": set(["d0", "d1"])
+ }
+ }
+ })
+
+ def _get_merged_checks(self):
+ h = self._get_init_checks()
+ h.merge(HealthCheckAccumulator({
+ "C0": {
+ "S0": {
+ "summary": ["s0", "s1", "s2"],
+ "detail": ("d2",)
+ },
+ "S1": {
+ "summary": ["s0", "s1", "s2"],
+ "detail": ()
+ }
+ },
+ "C1": {
+ "S0": {
+ "summary": [],
+ "detail": ("d0", "d1", "d2")
+ }
+ }
+ }))
+ return h
+
+ def test_check_merge(self):
+ # merging combines and de-duplicates
+ h = self._get_merged_checks()
+ self.assertEqual(h.checks(), {
+ "C0": {
+ "S0": {
+ "summary": set(["s0", "s1", "s2"]),
+ "detail": set(["d0", "d1", "d2"])
+ },
+ "S1": {
+ "summary": set(["s0", "s1", "s2"]),
+ "detail": set([])
+ }
+ },
+ "C1": {
+ "S0": {
+ "summary": set([]),
+ "detail": set(["d0", "d1", "d2"])
+ }
+ }
+ })
+
+ def test_check_add_no_change(self):
+ # returns false when nothing changes
+ h = self._get_merged_checks()
+
+ self.assertFalse(h.add({}))
+
+ self.assertFalse(h.add({
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s0"},
+ "detail": []
+ }
+ }))
+
+ self.assertFalse(h.add({
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s1"},
+ "detail": [{"message": "d1"}]
+ }
+ }))
+
+ self.assertFalse(h.add({
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s0"},
+ "detail": [{"message": "d1"}, {"message": "d2"}]
+ }
+ }))
+
+ def test_check_add_changed(self):
+ # new checks report change
+ h = self._get_merged_checks()
+
+ self.assertTrue(h.add({
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s3"},
+ "detail": []
+ }
+ }))
+
+ self.assertTrue(h.add({
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s1"},
+ "detail": [{"message": "d4"}]
+ }
+ }))
+
+ self.assertTrue(h.add({
+ "C0": {
+ "severity": "S2",
+ "summary": {"message": "s0"},
+ "detail": [{"message": "d0"}]
+ }
+ }))
+
+ self.assertTrue(h.add({
+ "C2": {
+ "severity": "S0",
+ "summary": {"message": "s0"},
+ "detail": [{"message": "d0"}, {"message": "d1"}]
+ }
+ }))
+
+ self.assertEqual(h.checks(), {
+ "C0": {
+ "S0": {
+ "summary": set(["s0", "s1", "s2", "s3"]),
+ "detail": set(["d0", "d1", "d2", "d4"])
+ },
+ "S1": {
+ "summary": set(["s0", "s1", "s2"]),
+ "detail": set([])
+ },
+ "S2": {
+ "summary": set(["s0"]),
+ "detail": set(["d0"])
+ }
+ },
+ "C1": {
+ "S0": {
+ "summary": set([]),
+ "detail": set(["d0", "d1", "d2"])
+ }
+ },
+ "C2": {
+ "S0": {
+ "summary": set(["s0"]),
+ "detail": set(["d0", "d1"])
+ }
+ }
+ })
+
+
+class HealthHistoryTest(unittest.TestCase):
+ def _now(self):
+ # return some time truncated at 30 minutes past the hour. this lets us
+ # fiddle with time offsets without worrying about accidentally landing
+ # on exactly the top of the hour which is the edge of a time slot for
+ # tracking health history.
+ dt = datetime.datetime.utcnow()
+ return datetime.datetime(
+ year=dt.year,
+ month=dt.month,
+ day=dt.day,
+ hour=dt.hour,
+ minute=30)
+
+ def test_empty_slot(self):
+ now = self._now()
+
+ HealthHistorySlot._now = mock.Mock(return_value=now)
+ h = HealthHistorySlot()
+
+ # reports no historical checks
+ self.assertEqual(h.health(), {"checks": {}})
+
+ # an empty slot doesn't need to be flushed
+ self.assertFalse(h.need_flush())
+
+ def test_expires(self):
+ now = self._now()
+
+ HealthHistorySlot._now = mock.Mock(return_value=now)
+ h = HealthHistorySlot()
+ self.assertFalse(h.expired())
+
+ # an hour from now it would be expired
+ future = now + datetime.timedelta(hours=1)
+ HealthHistorySlot._now = mock.Mock(return_value=future)
+ self.assertTrue(h.expired())
+
+ def test_need_flush(self):
+ now = self._now()
+
+ HealthHistorySlot._now = mock.Mock(return_value=now)
+ h = HealthHistorySlot()
+ self.assertFalse(h.need_flush())
+
+ self.assertTrue(h.add(dict(checks={
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s0"},
+ "detail": [{"message": "d0"}]
+ }
+ })))
+ # no flush needed, yet...
+ self.assertFalse(h.need_flush())
+
+ # after persist period time elapses, a flush is needed
+ future = now + PERSIST_PERIOD
+ HealthHistorySlot._now = mock.Mock(return_value=future)
+ self.assertTrue(h.need_flush())
+
+ # mark flush resets
+ h.mark_flushed()
+ self.assertFalse(h.need_flush())
+
+ def test_need_flush_edge(self):
+ # test needs flush is true because it has expired, not because it has
+ # been dirty for the persistence period
+ dt = datetime.datetime.utcnow()
+ now = datetime.datetime(
+ year=dt.year,
+ month=dt.month,
+ day=dt.day,
+ hour=dt.hour,
+ minute=59,
+ second=59)
+ HealthHistorySlot._now = mock.Mock(return_value=now)
+ h = HealthHistorySlot()
+ self.assertFalse(h.expired())
+ self.assertFalse(h.need_flush())
+
+ # now it is dirty, but it doesn't need a flush
+ self.assertTrue(h.add(dict(checks={
+ "C0": {
+ "severity": "S0",
+ "summary": {"message": "s0"},
+ "detail": [{"message": "d0"}]
+ }
+ })))
+ self.assertFalse(h.expired())
+ self.assertFalse(h.need_flush())
+
+ # advance time past the hour so it expires, but not past the persistence
+ # period deadline for the last event that set the dirty bit
+ self.assertTrue(PERSIST_PERIOD.total_seconds() > 5)
+ future = now + datetime.timedelta(seconds=5)
+ HealthHistorySlot._now = mock.Mock(return_value=future)
+
+ self.assertTrue(h.expired())
+ self.assertTrue(h.need_flush())