summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/devicehealth
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/devicehealth/__init__.py2
-rw-r--r--src/pybind/mgr/devicehealth/module.py651
2 files changed, 653 insertions, 0 deletions
diff --git a/src/pybind/mgr/devicehealth/__init__.py b/src/pybind/mgr/devicehealth/__init__.py
new file mode 100644
index 00000000..4c5b97ce
--- /dev/null
+++ b/src/pybind/mgr/devicehealth/__init__.py
@@ -0,0 +1,2 @@
+
+from .module import Module
diff --git a/src/pybind/mgr/devicehealth/module.py b/src/pybind/mgr/devicehealth/module.py
new file mode 100644
index 00000000..9bddc66f
--- /dev/null
+++ b/src/pybind/mgr/devicehealth/module.py
@@ -0,0 +1,651 @@
+"""
+Device health monitoring
+"""
+
+import errno
+import json
+from mgr_module import MgrModule, CommandResult
+import operator
+import rados
+from threading import Event
+from datetime import datetime, timedelta, date, time
+import _strptime
+from six import iteritems
+
+TIME_FORMAT = '%Y%m%d-%H%M%S'
+
+DEVICE_HEALTH = 'DEVICE_HEALTH'
+DEVICE_HEALTH_IN_USE = 'DEVICE_HEALTH_IN_USE'
+DEVICE_HEALTH_TOOMANY = 'DEVICE_HEALTH_TOOMANY'
+HEALTH_MESSAGES = {
+ DEVICE_HEALTH: '%d device(s) expected to fail soon',
+ DEVICE_HEALTH_IN_USE: '%d daemons(s) expected to fail soon and still contain data',
+ DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
+}
+
+MAX_SAMPLES=500
+
+
+class Module(MgrModule):
+ MODULE_OPTIONS = [
+ {
+ 'name': 'enable_monitoring',
+ 'default': False,
+ 'type': 'bool',
+ 'desc': 'monitor device health metrics',
+ 'runtime': True,
+ },
+ {
+ 'name': 'scrape_frequency',
+ 'default': 86400,
+ 'type': 'secs',
+ 'desc': 'how frequently to scrape device health metrics',
+ 'runtime': True,
+ },
+ {
+ 'name': 'pool_name',
+ 'default': 'device_health_metrics',
+ 'type': 'str',
+ 'desc': 'name of pool in which to store device health metrics',
+ 'runtime': True,
+ },
+ {
+ 'name': 'retention_period',
+ 'default': (86400 * 180),
+ 'type': 'secs',
+ 'desc': 'how long to retain device health metrics',
+ 'runtime': True,
+ },
+ {
+ 'name': 'mark_out_threshold',
+ 'default': (86400 * 14 * 2),
+ 'type': 'secs',
+ 'desc': 'automatically mark OSD if it may fail before this long',
+ 'runtime': True,
+ },
+ {
+ 'name': 'warn_threshold',
+ 'default': (86400 * 14 * 6),
+ 'type': 'secs',
+ 'desc': 'raise health warning if OSD may fail before this long',
+ 'runtime': True,
+ },
+ {
+ 'name': 'self_heal',
+ 'default': True,
+ 'type': 'bool',
+ 'desc': 'preemptively heal cluster around devices that may fail',
+ 'runtime': True,
+ },
+ {
+ 'name': 'sleep_interval',
+ 'default': 600,
+ 'type': 'secs',
+ 'desc': 'how frequently to wake up and check device health',
+ 'runtime': True,
+ },
+ ]
+
+ COMMANDS = [
+ {
+ "cmd": "device query-daemon-health-metrics "
+ "name=who,type=CephString",
+ "desc": "Get device health metrics for a given daemon",
+ "perm": "r"
+ },
+ {
+ "cmd": "device scrape-daemon-health-metrics "
+ "name=who,type=CephString",
+ "desc": "Scrape and store device health metrics "
+ "for a given daemon",
+ "perm": "r"
+ },
+ {
+ "cmd": "device scrape-health-metrics "
+ "name=devid,type=CephString,req=False",
+ "desc": "Scrape and store health metrics",
+ "perm": "r"
+ },
+ {
+ "cmd": "device get-health-metrics "
+ "name=devid,type=CephString "
+ "name=sample,type=CephString,req=False",
+ "desc": "Show stored device metrics for the device",
+ "perm": "r"
+ },
+ {
+ "cmd": "device check-health",
+ "desc": "Check life expectancy of devices",
+ "perm": "rw",
+ },
+ {
+ "cmd": "device monitoring on",
+ "desc": "Enable device health monitoring",
+ "perm": "rw",
+ },
+ {
+ "cmd": "device monitoring off",
+ "desc": "Disable device health monitoring",
+ "perm": "rw",
+ },
+ {
+ 'cmd': 'device predict-life-expectancy '
+ 'name=devid,type=CephString,req=true',
+ 'desc': 'Predict life expectancy with local predictor',
+ 'perm': 'r'
+ },
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+
+ # populate options (just until serve() runs)
+ for opt in self.MODULE_OPTIONS:
+ setattr(self, opt['name'], opt['default'])
+
+ # other
+ self.run = True
+ self.event = Event()
+
+ def is_valid_daemon_name(self, who):
+ l = who.split('.')
+ if len(l) != 2:
+ return False
+ if l[0] not in ('osd', 'mon'):
+ return False;
+ return True;
+
+ def handle_command(self, _, cmd):
+ self.log.error("handle_command")
+
+ if cmd['prefix'] == 'device query-daemon-health-metrics':
+ who = cmd.get('who', '')
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = cmd.get('who', '').split('.')
+ result = CommandResult('')
+ self.send_command(result, daemon_type, daemon_id, json.dumps({
+ 'prefix': 'smart',
+ 'format': 'json',
+ }), '')
+ r, outb, outs = result.wait()
+ return r, outb, outs
+ elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
+ who = cmd.get('who', '')
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = cmd.get('who', '').split('.')
+ return self.scrape_daemon(daemon_type, daemon_id)
+ elif cmd['prefix'] == 'device scrape-health-metrics':
+ if 'devid' in cmd:
+ return self.scrape_device(cmd['devid'])
+ return self.scrape_all()
+ elif cmd['prefix'] == 'device get-health-metrics':
+ return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
+ elif cmd['prefix'] == 'device check-health':
+ return self.check_health()
+ elif cmd['prefix'] == 'device monitoring on':
+ self.set_module_option('enable_monitoring', True)
+ self.event.set()
+ return 0, '', ''
+ elif cmd['prefix'] == 'device monitoring off':
+ self.set_module_option('enable_monitoring', False)
+ self.set_health_checks({}) # avoid stuck health alerts
+ return 0, '', ''
+ elif cmd['prefix'] == 'device predict-life-expectancy':
+ return self.predict_lift_expectancy(cmd['devid'])
+ else:
+ # mgr should respect our self.COMMANDS and not call us for
+ # any prefix we don't advertise
+ raise NotImplementedError(cmd['prefix'])
+
+ def self_test(self):
+ self.config_notify()
+ osdmap = self.get('osd_map')
+ osd_id = osdmap['osds'][0]['osd']
+ osdmeta = self.get('osd_metadata')
+ devs = osdmeta.get(str(osd_id), {}).get('device_ids')
+ if devs:
+ devid = devs.split()[0].split('=')[1]
+ (r, before, err) = self.show_device_metrics(devid, '')
+ assert r == 0
+ (r, out, err) = self.scrape_device(devid)
+ assert r == 0
+ (r, after, err) = self.show_device_metrics(devid, '')
+ assert r == 0
+ assert before != after
+
+ def config_notify(self):
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def serve(self):
+ self.log.info("Starting")
+ self.config_notify()
+
+ last_scrape = None
+ ls = self.get_store('last_scrape')
+ if ls:
+ try:
+ last_scrape = datetime.strptime(ls, TIME_FORMAT)
+ except ValueError as e:
+ pass
+ self.log.debug('Last scrape %s', last_scrape)
+
+ while self.run:
+ if self.enable_monitoring:
+ self.log.debug('Running')
+ self.check_health()
+
+ now = datetime.utcnow()
+ if not last_scrape:
+ next_scrape = now
+ else:
+ # align to scrape interval
+ scrape_frequency = int(self.scrape_frequency) or 86400
+ seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
+ seconds -= seconds % scrape_frequency
+ seconds += scrape_frequency
+ next_scrape = datetime.utcfromtimestamp(seconds)
+ if last_scrape:
+ self.log.debug('Last scrape %s, next scrape due %s',
+ last_scrape.strftime(TIME_FORMAT),
+ next_scrape.strftime(TIME_FORMAT))
+ else:
+ self.log.debug('Last scrape never, next scrape due %s',
+ next_scrape.strftime(TIME_FORMAT))
+ if now >= next_scrape:
+ self.scrape_all()
+ self.predict_all_devices()
+ last_scrape = now
+ self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
+
+ # sleep
+ sleep_interval = int(self.sleep_interval) or 60
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ ret = self.event.wait(sleep_interval)
+ self.event.clear()
+
+ def shutdown(self):
+ self.log.info('Stopping')
+ self.run = False
+ self.event.set()
+
+ def open_connection(self, create_if_missing=True):
+ pools = self.rados.list_pools()
+ is_pool = False
+ for pool in pools:
+ if pool == self.pool_name:
+ is_pool = True
+ break
+ if not is_pool:
+ if not create_if_missing:
+ return None
+ self.log.debug('create %s pool' % self.pool_name)
+ # create pool
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd pool create',
+ 'format': 'json',
+ 'pool': self.pool_name,
+ 'pg_num': 1,
+ 'pg_num_min': 1,
+ }), '')
+ r, outb, outs = result.wait()
+ assert r == 0
+
+ # set pool application
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd pool application enable',
+ 'format': 'json',
+ 'pool': self.pool_name,
+ 'app': 'mgr_devicehealth',
+ }), '')
+ r, outb, outs = result.wait()
+ assert r == 0
+
+ ioctx = self.rados.open_ioctx(self.pool_name)
+ return ioctx
+
+ def scrape_daemon(self, daemon_type, daemon_id):
+ ioctx = self.open_connection()
+ if daemon_type != 'osd':
+ return -errno.EINVAL, '', 'scraping non-OSDs not currently supported'
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
+ if raw_smart_data:
+ for device, raw_data in raw_smart_data.items():
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(ioctx, device, data)
+ ioctx.close()
+ return 0, "", ""
+
+ def scrape_all(self):
+ osdmap = self.get("osd_map")
+ assert osdmap is not None
+ ioctx = self.open_connection()
+ did_device = {}
+ ids = []
+ for osd in osdmap['osds']:
+ ids.append(('osd', str(osd['osd'])))
+ for daemon_type, daemon_id in ids:
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
+ if not raw_smart_data:
+ continue
+ for device, raw_data in raw_smart_data.items():
+ if device in did_device:
+ self.log.debug('skipping duplicate %s' % device)
+ continue
+ did_device[device] = 1
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(ioctx, device, data)
+ ioctx.close()
+ return 0, "", ""
+
+ def scrape_device(self, devid):
+ r = self.get("device " + devid)
+ if not r or 'device' not in r.keys():
+ return -errno.ENOENT, '', 'device ' + devid + ' not found'
+ daemons = [d for d in r['device'].get('daemons', []) if d.startswith('osd.')]
+ if not daemons:
+ return (-errno.EAGAIN, '',
+ 'device ' + devid + ' not claimed by any active OSD daemons')
+ (daemon_type, daemon_id) = daemons[0].split('.')
+ ioctx = self.open_connection()
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
+ devid=devid)
+ if raw_smart_data:
+ for device, raw_data in raw_smart_data.items():
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(ioctx, device, data)
+ ioctx.close()
+ return 0, "", ""
+
+ def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
+ """
+ :return: a dict, or None if the scrape failed.
+ """
+ self.log.debug('do_scrape_daemon %s.%s' % (daemon_type, daemon_id))
+ result = CommandResult('')
+ self.send_command(result, daemon_type, daemon_id, json.dumps({
+ 'prefix': 'smart',
+ 'format': 'json',
+ 'devid': devid,
+ }), '')
+ r, outb, outs = result.wait()
+
+ try:
+ return json.loads(outb)
+ except (IndexError, ValueError):
+ self.log.error(
+ "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
+ daemon_type, daemon_id, outb))
+
+ def put_device_metrics(self, ioctx, devid, data):
+ assert devid
+ old_key = datetime.utcnow() - timedelta(
+ seconds=int(self.retention_period))
+ prune = old_key.strftime(TIME_FORMAT)
+ self.log.debug('put_device_metrics device %s prune %s' %
+ (devid, prune))
+ erase = []
+ try:
+ with rados.ReadOpCtx() as op:
+ omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES) # fixme
+ assert ret == 0
+ ioctx.operate_read_op(op, devid)
+ for key, _ in list(omap_iter):
+ if key >= prune:
+ break
+ erase.append(key)
+ except rados.ObjectNotFound:
+ # The object doesn't already exist, no problem.
+ pass
+ except rados.Error as e:
+ # Do not proceed with writes if something unexpected
+ # went wrong with the reads.
+ self.log.exception("Error reading OMAP: {0}".format(e))
+ return
+
+ key = datetime.utcnow().strftime(TIME_FORMAT)
+ self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
+ (devid, key, data, erase))
+ with rados.WriteOpCtx() as op:
+ ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
+ if len(erase):
+ ioctx.remove_omap_keys(op, tuple(erase))
+ ioctx.operate_write_op(op, devid)
+
+ def _get_device_metrics(self, devid, sample=None, min_sample=None):
+ res = {}
+ ioctx = self.open_connection(create_if_missing=False)
+ if not ioctx:
+ return {}
+ with ioctx:
+ with rados.ReadOpCtx() as op:
+ omap_iter, ret = ioctx.get_omap_vals(op, min_sample or '', sample or '',
+ MAX_SAMPLES) # fixme
+ assert ret == 0
+ try:
+ ioctx.operate_read_op(op, devid)
+ for key, value in list(omap_iter):
+ if sample and key != sample:
+ break
+ if min_sample and key < min_sample:
+ break
+ try:
+ v = json.loads(value)
+ except (ValueError, IndexError):
+ self.log.debug('unable to parse value for %s: "%s"' %
+ (key, value))
+ pass
+ res[key] = v
+ except rados.ObjectNotFound:
+ pass
+ except rados.Error as e:
+ self.log.exception("RADOS error reading omap: {0}".format(e))
+ raise
+ return res
+
+ def show_device_metrics(self, devid, sample):
+ # verify device exists
+ r = self.get("device " + devid)
+ if not r or 'device' not in r.keys():
+ return -errno.ENOENT, '', 'device ' + devid + ' not found'
+ # fetch metrics
+ res = self._get_device_metrics(devid, sample=sample)
+ return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+ def check_health(self):
+ self.log.info('Check health')
+ config = self.get('config')
+ min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
+ mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
+ warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
+ checks = {}
+ health_warnings = {
+ DEVICE_HEALTH: [],
+ DEVICE_HEALTH_IN_USE: [],
+ }
+ devs = self.get("devices")
+ osds_in = {}
+ osds_out = {}
+ now = datetime.utcnow()
+ osdmap = self.get("osd_map")
+ assert osdmap is not None
+ for dev in devs['devices']:
+ devid = dev['devid']
+ if 'life_expectancy_max' not in dev:
+ continue
+ # ignore devices that are not consumed by any daemons
+ if not dev['daemons']:
+ continue
+ if not dev['life_expectancy_max'] or \
+ dev['life_expectancy_max'] == '0.000000':
+ continue
+ # life_expectancy_(min/max) is in the format of:
+ # '%Y-%m-%d %H:%M:%S.%f', e.g.:
+ # '2019-01-20 21:12:12.000000'
+ life_expectancy_max = datetime.strptime(
+ dev['life_expectancy_max'],
+ '%Y-%m-%d %H:%M:%S.%f')
+ self.log.debug('device %s expectancy max %s', dev,
+ life_expectancy_max)
+
+ if life_expectancy_max - now <= mark_out_threshold_td:
+ if self.self_heal:
+ # dev['daemons'] == ["osd.0","osd.1","osd.2"]
+ if dev['daemons']:
+ osds = [x for x in dev['daemons']
+ if x.startswith('osd.')]
+ osd_ids = map(lambda x: x[4:], osds)
+ for _id in osd_ids:
+ if self.is_osd_in(osdmap, _id):
+ osds_in[_id] = life_expectancy_max
+ else:
+ osds_out[_id] = 1
+
+ if life_expectancy_max - now <= warn_threshold_td:
+ # device can appear in more than one location in case
+ # of SCSI multipath
+ device_locations = map(lambda x: x['host'] + ':' + x['dev'],
+ dev['location'])
+ health_warnings[DEVICE_HEALTH].append(
+ '%s (%s); daemons %s; life expectancy between %s and %s'
+ % (dev['devid'],
+ ','.join(device_locations),
+ ','.join(dev.get('daemons', ['none'])),
+ dev['life_expectancy_max'],
+ dev.get('life_expectancy_max', 'unknown')))
+
+ # OSD might be marked 'out' (which means it has no
+ # data), however PGs are still attached to it.
+ for _id in osds_out:
+ num_pgs = self.get_osd_num_pgs(_id)
+ if num_pgs > 0:
+ health_warnings[DEVICE_HEALTH_IN_USE].append(
+ 'osd.%s is marked out '
+ 'but still has %s PG(s)' %
+ (_id, num_pgs))
+ if osds_in:
+ self.log.debug('osds_in %s' % osds_in)
+ # calculate target in ratio
+ num_osds = len(osdmap['osds'])
+ num_in = len([x for x in osdmap['osds'] if x['in']])
+ num_bad = len(osds_in)
+ # sort with next-to-fail first
+ bad_osds = sorted(osds_in.items(), key=operator.itemgetter(1))
+ did = 0
+ to_mark_out = []
+ for osd_id, when in bad_osds:
+ ratio = float(num_in - did - 1) / float(num_osds)
+ if ratio < min_in_ratio:
+ final_ratio = float(num_in - num_bad) / float(num_osds)
+ checks[DEVICE_HEALTH_TOOMANY] = {
+ 'severity': 'warning',
+ 'summary': HEALTH_MESSAGES[DEVICE_HEALTH_TOOMANY],
+ 'detail': [
+ '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (num_bad - did, final_ratio, min_in_ratio)
+ ]
+ }
+ break
+ to_mark_out.append(osd_id)
+ did += 1
+ if to_mark_out:
+ self.mark_out_etc(to_mark_out)
+ for warning, ls in iteritems(health_warnings):
+ n = len(ls)
+ if n:
+ checks[warning] = {
+ 'severity': 'warning',
+ 'summary': HEALTH_MESSAGES[warning] % n,
+ 'detail': ls,
+ }
+ self.set_health_checks(checks)
+ return 0, "", ""
+
+ def is_osd_in(self, osdmap, osd_id):
+ for osd in osdmap['osds']:
+ if str(osd_id) == str(osd['osd']):
+ return bool(osd['in'])
+ return False
+
+ def get_osd_num_pgs(self, osd_id):
+ stats = self.get('osd_stats')
+ assert stats is not None
+ for stat in stats['osd_stats']:
+ if str(osd_id) == str(stat['osd']):
+ return stat['num_pgs']
+ return -1
+
+ def mark_out_etc(self, osd_ids):
+ self.log.info('Marking out OSDs: %s' % osd_ids)
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd out',
+ 'format': 'json',
+ 'ids': osd_ids,
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.warn('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
+ for osd_id in osd_ids:
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd primary-affinity',
+ 'format': 'json',
+ 'id': int(osd_id),
+ 'weight': 0.0,
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.warn('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
+
+ def extract_smart_features(self, raw):
+ # FIXME: extract and normalize raw smartctl --json output and
+ # generate a dict of the fields we care about.
+ return raw
+
+ def predict_lift_expectancy(self, devid):
+ plugin_name = ''
+ model = self.get_ceph_option('device_failure_prediction_mode')
+ if model and model.lower() == 'cloud':
+ plugin_name = 'diskprediction_cloud'
+ elif model and model.lower() == 'local':
+ plugin_name = 'diskprediction_local'
+ else:
+ return -1, '', 'unable to enable any disk prediction model[local/cloud]'
+ try:
+ can_run, _ = self.remote(plugin_name, 'can_run')
+ if can_run:
+ return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
+ except:
+ return -1, '', 'unable to invoke diskprediction local or remote plugin'
+
+ def predict_all_devices(self):
+ plugin_name = ''
+ model = self.get_ceph_option('device_failure_prediction_mode')
+ if model and model.lower() == 'cloud':
+ plugin_name = 'diskprediction_cloud'
+ elif model and model.lower() == 'local':
+ plugin_name = 'diskprediction_local'
+ else:
+ return -1, '', 'unable to enable any disk prediction model[local/cloud]'
+ try:
+ can_run, _ = self.remote(plugin_name, 'can_run')
+ if can_run:
+ return self.remote(plugin_name, 'predict_all_devices')
+ except:
+ return -1, '', 'unable to invoke diskprediction local or remote plugin'
+
+ def get_recent_device_metrics(self, devid, min_sample):
+ return self._get_device_metrics(devid, min_sample=min_sample)
+
+ def get_time_format(self):
+ return TIME_FORMAT