summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/devicehealth/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/devicehealth/module.py')
-rw-r--r--src/pybind/mgr/devicehealth/module.py780
1 files changed, 780 insertions, 0 deletions
diff --git a/src/pybind/mgr/devicehealth/module.py b/src/pybind/mgr/devicehealth/module.py
new file mode 100644
index 000000000..07768db75
--- /dev/null
+++ b/src/pybind/mgr/devicehealth/module.py
@@ -0,0 +1,780 @@
+"""
+Device health monitoring
+"""
+
+import errno
+import json
+from mgr_module import MgrModule, CommandResult, MgrModuleRecoverDB, CLIRequiresDB, CLICommand, CLIReadCommand, Option, MgrDBNotReady
+import operator
+import rados
+import re
+from threading import Event
+from datetime import datetime, timedelta, timezone
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
+
+TIME_FORMAT = '%Y%m%d-%H%M%S'
+
+DEVICE_HEALTH = 'DEVICE_HEALTH'
+DEVICE_HEALTH_IN_USE = 'DEVICE_HEALTH_IN_USE'
+DEVICE_HEALTH_TOOMANY = 'DEVICE_HEALTH_TOOMANY'
+HEALTH_MESSAGES = {
+ DEVICE_HEALTH: '%d device(s) expected to fail soon',
+ DEVICE_HEALTH_IN_USE: '%d daemon(s) expected to fail soon and still contain data',
+ DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
+}
+
+
+def get_ata_wear_level(data: Dict[Any, Any]) -> Optional[float]:
+ """
+ Extract wear level (as float) from smartctl -x --json output for SATA SSD
+ """
+ for page in data.get("ata_device_statistics", {}).get("pages", []):
+ if page is None or page.get("number") != 7:
+ continue
+ for item in page.get("table", []):
+ if item["offset"] == 8:
+ return item["value"] / 100.0
+ return None
+
+
+def get_nvme_wear_level(data: Dict[Any, Any]) -> Optional[float]:
+ """
+ Extract wear level (as float) from smartctl -x --json output for NVME SSD
+ """
+ pct_used = data.get("nvme_smart_health_information_log", {}).get("percentage_used")
+ if pct_used is None:
+ return None
+ return pct_used / 100.0
+
+
+class Module(MgrModule):
+
+ # latest (if db does not exist)
+ SCHEMA = """
+CREATE TABLE Device (
+ devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+ time DATETIME DEFAULT (strftime('%s', 'now')),
+ devid TEXT NOT NULL REFERENCES Device (devid),
+ raw_smart TEXT NOT NULL,
+ PRIMARY KEY (time, devid)
+);
+"""
+
+ SCHEMA_VERSIONED = [
+ # v1
+ """
+CREATE TABLE Device (
+ devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+ time DATETIME DEFAULT (strftime('%s', 'now')),
+ devid TEXT NOT NULL REFERENCES Device (devid),
+ raw_smart TEXT NOT NULL,
+ PRIMARY KEY (time, devid)
+);
+"""
+ ]
+
+ MODULE_OPTIONS = [
+ Option(
+ name='enable_monitoring',
+ default=True,
+ type='bool',
+ desc='monitor device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='scrape_frequency',
+ default=86400,
+ type='secs',
+ desc='how frequently to scrape device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='pool_name',
+ default='device_health_metrics',
+ type='str',
+ desc='name of pool in which to store device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='retention_period',
+ default=(86400 * 180),
+ type='secs',
+ desc='how long to retain device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='mark_out_threshold',
+ default=(86400 * 14 * 2),
+ type='secs',
+ desc='automatically mark OSD if it may fail before this long',
+ runtime=True,
+ ),
+ Option(
+ name='warn_threshold',
+ default=(86400 * 14 * 6),
+ type='secs',
+ desc='raise health warning if OSD may fail before this long',
+ runtime=True,
+ ),
+ Option(
+ name='self_heal',
+ default=True,
+ type='bool',
+ desc='preemptively heal cluster around devices that may fail',
+ runtime=True,
+ ),
+ Option(
+ name='sleep_interval',
+ default=600,
+ type='secs',
+ desc='how frequently to wake up and check device health',
+ runtime=True,
+ ),
+ ]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+
+ # populate options (just until serve() runs)
+ for opt in self.MODULE_OPTIONS:
+ setattr(self, opt['name'], opt['default'])
+
+ # other
+ self.run = True
+ self.event = Event()
+
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.enable_monitoring = True
+ self.scrape_frequency = 0.0
+ self.pool_name = ''
+ self.device_health_metrics = ''
+ self.retention_period = 0.0
+ self.mark_out_threshold = 0.0
+ self.warn_threshold = 0.0
+ self.self_heal = True
+ self.sleep_interval = 0.0
+
+ def is_valid_daemon_name(self, who: str) -> bool:
+ parts = who.split('.', 1)
+ if len(parts) != 2:
+ return False
+ return parts[0] in ('osd', 'mon')
+
+ @CLIReadCommand('device query-daemon-health-metrics')
+ def do_query_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+ '''
+ Get device health metrics for a given daemon
+ '''
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = who.split('.')
+ result = CommandResult('')
+ self.send_command(result, daemon_type, daemon_id, json.dumps({
+ 'prefix': 'smart',
+ 'format': 'json',
+ }), '')
+ return result.wait()
+
+ @CLIRequiresDB
+ @CLIReadCommand('device scrape-daemon-health-metrics')
+ @MgrModuleRecoverDB
+ def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+ '''
+ Scrape and store device health metrics for a given daemon
+ '''
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = who.split('.')
+ return self.scrape_daemon(daemon_type, daemon_id)
+
+ @CLIRequiresDB
+ @CLIReadCommand('device scrape-health-metrics')
+ @MgrModuleRecoverDB
+ def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Scrape and store device health metrics
+ '''
+ if devid is None:
+ return self.scrape_all()
+ else:
+ return self.scrape_device(devid)
+
+ @CLIRequiresDB
+ @CLIReadCommand('device get-health-metrics')
+ @MgrModuleRecoverDB
+ def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Show stored device metrics for the device
+ '''
+ return self.show_device_metrics(devid, sample)
+
+ @CLIRequiresDB
+ @CLICommand('device check-health')
+ @MgrModuleRecoverDB
+ def do_check_health(self) -> Tuple[int, str, str]:
+ '''
+ Check life expectancy of devices
+ '''
+ return self.check_health()
+
+ @CLICommand('device monitoring on')
+ def do_monitoring_on(self) -> Tuple[int, str, str]:
+ '''
+ Enable device health monitoring
+ '''
+ self.set_module_option('enable_monitoring', True)
+ self.event.set()
+ return 0, '', ''
+
+ @CLICommand('device monitoring off')
+ def do_monitoring_off(self) -> Tuple[int, str, str]:
+ '''
+ Disable device health monitoring
+ '''
+ self.set_module_option('enable_monitoring', False)
+ self.set_health_checks({}) # avoid stuck health alerts
+ return 0, '', ''
+
+ @CLIRequiresDB
+ @CLIReadCommand('device predict-life-expectancy')
+ @MgrModuleRecoverDB
+ def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
+ '''
+ Predict life expectancy with local predictor
+ '''
+ return self.predict_lift_expectancy(devid)
+
+ def self_test(self) -> None:
+ assert self.db_ready()
+ self.config_notify()
+ osdmap = self.get('osd_map')
+ osd_id = osdmap['osds'][0]['osd']
+ osdmeta = self.get('osd_metadata')
+ devs = osdmeta.get(str(osd_id), {}).get('device_ids')
+ if devs:
+ devid = devs.split()[0].split('=')[1]
+ self.log.debug(f"getting devid {devid}")
+ (r, before, err) = self.show_device_metrics(devid, None)
+ assert r == 0
+ self.log.debug(f"before: {before}")
+ (r, out, err) = self.scrape_device(devid)
+ assert r == 0
+ (r, after, err) = self.show_device_metrics(devid, None)
+ assert r == 0
+ self.log.debug(f"after: {after}")
+ assert before != after
+
+ def config_notify(self) -> None:
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def _legacy_put_device_metrics(self, t: str, devid: str, data: str) -> None:
+ SQL = """
+ INSERT OR IGNORE INTO DeviceHealthMetrics (time, devid, raw_smart)
+ VALUES (?, ?, ?);
+ """
+
+ self._create_device(devid)
+ epoch = self._t2epoch(t)
+ json.loads(data) # valid?
+ self.db.execute(SQL, (epoch, devid, data))
+
+ devre = r"[a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+"
+
+ def _load_legacy_object(self, ioctx: rados.Ioctx, oid: str) -> bool:
+ MAX_OMAP = 10000
+ self.log.debug(f"loading object {oid}")
+ if re.search(self.devre, oid) is None:
+ return False
+ with rados.ReadOpCtx() as op:
+ it, rc = ioctx.get_omap_vals(op, None, None, MAX_OMAP)
+ if rc == 0:
+ ioctx.operate_read_op(op, oid)
+ count = 0
+ for t, raw_smart in it:
+ self.log.debug(f"putting {oid} {t}")
+ self._legacy_put_device_metrics(t, oid, raw_smart)
+ count += 1
+ assert count < MAX_OMAP
+ self.log.debug(f"removing object {oid}")
+ ioctx.remove_object(oid)
+ return True
+
+ def check_legacy_pool(self) -> bool:
+ try:
+ # 'device_health_metrics' is automatically renamed '.mgr' in
+ # create_mgr_pool
+ ioctx = self.rados.open_ioctx(self.MGR_POOL_NAME)
+ except rados.ObjectNotFound:
+ return True
+ if not ioctx:
+ return True
+
+ done = False
+ with ioctx, self._db_lock, self.db:
+ count = 0
+ for obj in ioctx.list_objects():
+ try:
+ if self._load_legacy_object(ioctx, obj.key):
+ count += 1
+ except json.decoder.JSONDecodeError:
+ pass
+ if count >= 10:
+ break
+ done = count < 10
+ self.log.debug(f"finished reading legacy pool, complete = {done}")
+ return done
+
+ @MgrModuleRecoverDB
+ def _do_serve(self) -> None:
+ last_scrape = None
+ finished_loading_legacy = False
+
+ while self.run:
+ # sleep first, in case of exceptions causing retry:
+ sleep_interval = self.sleep_interval or 60
+ if not finished_loading_legacy:
+ sleep_interval = 2
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ self.event.wait(sleep_interval)
+ self.event.clear()
+
+ if self.db_ready() and self.enable_monitoring:
+ self.log.debug('Running')
+
+ if not finished_loading_legacy:
+ finished_loading_legacy = self.check_legacy_pool()
+
+ if last_scrape is None:
+ ls = self.get_kv('last_scrape')
+ if ls:
+ try:
+ last_scrape = datetime.strptime(ls, TIME_FORMAT)
+ except ValueError:
+ pass
+ self.log.debug('Last scrape %s', last_scrape)
+
+ self.check_health()
+
+ now = datetime.utcnow()
+ if not last_scrape:
+ next_scrape = now
+ else:
+ # align to scrape interval
+ scrape_frequency = self.scrape_frequency or 86400
+ seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
+ seconds -= seconds % scrape_frequency
+ seconds += scrape_frequency
+ next_scrape = datetime.utcfromtimestamp(seconds)
+ if last_scrape:
+ self.log.debug('Last scrape %s, next scrape due %s',
+ last_scrape.strftime(TIME_FORMAT),
+ next_scrape.strftime(TIME_FORMAT))
+ else:
+ self.log.debug('Last scrape never, next scrape due %s',
+ next_scrape.strftime(TIME_FORMAT))
+ if now >= next_scrape:
+ self.scrape_all()
+ self.predict_all_devices()
+ last_scrape = now
+ self.set_kv('last_scrape', last_scrape.strftime(TIME_FORMAT))
+
+ def serve(self) -> None:
+ self.log.info("Starting")
+ self.config_notify()
+
+ self._do_serve()
+
+ def shutdown(self) -> None:
+ self.log.info('Stopping')
+ self.run = False
+ self.event.set()
+
+ def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
+ if raw_smart_data:
+ for device, raw_data in raw_smart_data.items():
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(device, data)
+ return 0, "", ""
+
+ def scrape_all(self) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
+ osdmap = self.get("osd_map")
+ assert osdmap is not None
+ did_device = {}
+ ids = []
+ for osd in osdmap['osds']:
+ ids.append(('osd', str(osd['osd'])))
+ monmap = self.get("mon_map")
+ for mon in monmap['mons']:
+ ids.append(('mon', mon['name']))
+ for daemon_type, daemon_id in ids:
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
+ if not raw_smart_data:
+ continue
+ for device, raw_data in raw_smart_data.items():
+ if device in did_device:
+ self.log.debug('skipping duplicate %s' % device)
+ continue
+ did_device[device] = 1
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(device, data)
+ return 0, "", ""
+
+ def scrape_device(self, devid: str) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
+ r = self.get("device " + devid)
+ if not r or 'device' not in r.keys():
+ return -errno.ENOENT, '', 'device ' + devid + ' not found'
+ daemons = r['device'].get('daemons', [])
+ if not daemons:
+ return (-errno.EAGAIN, '',
+ 'device ' + devid + ' not claimed by any active daemons')
+ (daemon_type, daemon_id) = daemons[0].split('.')
+ raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
+ devid=devid)
+ if raw_smart_data:
+ for device, raw_data in raw_smart_data.items():
+ data = self.extract_smart_features(raw_data)
+ if device and data:
+ self.put_device_metrics(device, data)
+ return 0, "", ""
+
+ def do_scrape_daemon(self,
+ daemon_type: str,
+ daemon_id: str,
+ devid: str = '') -> Optional[Dict[str, Any]]:
+ """
+ :return: a dict, or None if the scrape failed.
+ """
+ self.log.debug('do_scrape_daemon %s.%s' % (daemon_type, daemon_id))
+ result = CommandResult('')
+ self.send_command(result, daemon_type, daemon_id, json.dumps({
+ 'prefix': 'smart',
+ 'format': 'json',
+ 'devid': devid,
+ }), '')
+ r, outb, outs = result.wait()
+
+ try:
+ return json.loads(outb)
+ except (IndexError, ValueError):
+ self.log.error(
+ "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
+ daemon_type, daemon_id, outb))
+ return None
+
+ def _prune_device_metrics(self) -> None:
+ SQL = """
+ DELETE FROM DeviceHealthMetrics
+ WHERE time < (strftime('%s', 'now') - ?);
+ """
+
+ cursor = self.db.execute(SQL, (self.retention_period,))
+ if cursor.rowcount >= 1:
+ self.log.info(f"pruned {cursor.rowcount} metrics")
+
+ def _create_device(self, devid: str) -> None:
+ SQL = """
+ INSERT OR IGNORE INTO Device VALUES (?);
+ """
+
+ cursor = self.db.execute(SQL, (devid,))
+ if cursor.rowcount >= 1:
+ self.log.info(f"created device {devid}")
+ else:
+ self.log.debug(f"device {devid} already exists")
+
+ def put_device_metrics(self, devid: str, data: Any) -> None:
+ SQL = """
+ INSERT INTO DeviceHealthMetrics (devid, raw_smart)
+ VALUES (?, ?);
+ """
+
+ with self._db_lock, self.db:
+ self._create_device(devid)
+ self.db.execute(SQL, (devid, json.dumps(data)))
+ self._prune_device_metrics()
+
+ # extract wear level?
+ wear_level = get_ata_wear_level(data)
+ if wear_level is None:
+ wear_level = get_nvme_wear_level(data)
+ dev_data = self.get(f"device {devid}") or {}
+ if wear_level is not None:
+ if dev_data.get(wear_level) != str(wear_level):
+ dev_data["wear_level"] = str(wear_level)
+ self.log.debug(f"updating {devid} wear level to {wear_level}")
+ self.set_device_wear_level(devid, wear_level)
+ else:
+ if "wear_level" in dev_data:
+ del dev_data["wear_level"]
+ self.log.debug(f"removing {devid} wear level")
+ self.set_device_wear_level(devid, -1.0)
+
+ def _t2epoch(self, t: Optional[str]) -> int:
+ if not t:
+ return 0
+ else:
+ return int(datetime.strptime(t, TIME_FORMAT).strftime("%s"))
+
+ def _get_device_metrics(self, devid: str,
+ sample: Optional[str] = None,
+ min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
+ res = {}
+
+ SQL_EXACT = """
+ SELECT time, raw_smart
+ FROM DeviceHealthMetrics
+ WHERE devid = ? AND time = ?
+ ORDER BY time DESC;
+ """
+ SQL_MIN = """
+ SELECT time, raw_smart
+ FROM DeviceHealthMetrics
+ WHERE devid = ? AND ? <= time
+ ORDER BY time DESC;
+ """
+
+ isample = None
+ imin_sample = None
+ if sample:
+ isample = self._t2epoch(sample)
+ else:
+ imin_sample = self._t2epoch(min_sample)
+
+ self.log.debug(f"_get_device_metrics: {devid} {sample} {min_sample}")
+
+ with self._db_lock, self.db:
+ if isample:
+ cursor = self.db.execute(SQL_EXACT, (devid, isample))
+ else:
+ cursor = self.db.execute(SQL_MIN, (devid, imin_sample))
+ for row in cursor:
+ t = row['time']
+ dt = datetime.utcfromtimestamp(t).strftime(TIME_FORMAT)
+ try:
+ res[dt] = json.loads(row['raw_smart'])
+ except (ValueError, IndexError):
+ self.log.debug(f"unable to parse value for {devid}:{t}")
+ pass
+ return res
+
+ def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]:
+ # verify device exists
+ r = self.get("device " + devid)
+ if not r or 'device' not in r.keys():
+ return -errno.ENOENT, '', 'device ' + devid + ' not found'
+ # fetch metrics
+ res = self._get_device_metrics(devid, sample=sample)
+ return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+ def check_health(self) -> Tuple[int, str, str]:
+ self.log.info('Check health')
+ config = self.get('config')
+ min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
+ mark_out_threshold_td = timedelta(seconds=self.mark_out_threshold)
+ warn_threshold_td = timedelta(seconds=self.warn_threshold)
+ checks: Dict[str, Dict[str, Union[int, str, Sequence[str]]]] = {}
+ health_warnings: Dict[str, List[str]] = {
+ DEVICE_HEALTH: [],
+ DEVICE_HEALTH_IN_USE: [],
+ }
+ devs = self.get("devices")
+ osds_in = {}
+ osds_out = {}
+ now = datetime.now(timezone.utc) # e.g. '2021-09-22 13:18:45.021712+00:00'
+ osdmap = self.get("osd_map")
+ assert osdmap is not None
+ for dev in devs['devices']:
+ if 'life_expectancy_max' not in dev:
+ continue
+ # ignore devices that are not consumed by any daemons
+ if not dev['daemons']:
+ continue
+ if not dev['life_expectancy_max'] or \
+ dev['life_expectancy_max'] == '0.000000':
+ continue
+ # life_expectancy_(min/max) is in the format of:
+ # '%Y-%m-%dT%H:%M:%S.%f%z', e.g.:
+ # '2019-01-20 21:12:12.000000+00:00'
+ life_expectancy_max = datetime.strptime(
+ dev['life_expectancy_max'],
+ '%Y-%m-%dT%H:%M:%S.%f%z')
+ self.log.debug('device %s expectancy max %s', dev,
+ life_expectancy_max)
+
+ if life_expectancy_max - now <= mark_out_threshold_td:
+ if self.self_heal:
+ # dev['daemons'] == ["osd.0","osd.1","osd.2"]
+ if dev['daemons']:
+ osds = [x for x in dev['daemons']
+ if x.startswith('osd.')]
+ osd_ids = map(lambda x: x[4:], osds)
+ for _id in osd_ids:
+ if self.is_osd_in(osdmap, _id):
+ osds_in[_id] = life_expectancy_max
+ else:
+ osds_out[_id] = 1
+
+ if life_expectancy_max - now <= warn_threshold_td:
+ # device can appear in more than one location in case
+ # of SCSI multipath
+ device_locations = map(lambda x: x['host'] + ':' + x['dev'],
+ dev['location'])
+ health_warnings[DEVICE_HEALTH].append(
+ '%s (%s); daemons %s; life expectancy between %s and %s'
+ % (dev['devid'],
+ ','.join(device_locations),
+ ','.join(dev.get('daemons', ['none'])),
+ dev['life_expectancy_max'],
+ dev.get('life_expectancy_max', 'unknown')))
+
+ # OSD might be marked 'out' (which means it has no
+ # data), however PGs are still attached to it.
+ for _id in osds_out:
+ num_pgs = self.get_osd_num_pgs(_id)
+ if num_pgs > 0:
+ health_warnings[DEVICE_HEALTH_IN_USE].append(
+ 'osd.%s is marked out '
+ 'but still has %s PG(s)' %
+ (_id, num_pgs))
+ if osds_in:
+ self.log.debug('osds_in %s' % osds_in)
+ # calculate target in ratio
+ num_osds = len(osdmap['osds'])
+ num_in = len([x for x in osdmap['osds'] if x['in']])
+ num_bad = len(osds_in)
+ # sort with next-to-fail first
+ bad_osds = sorted(osds_in.items(), key=operator.itemgetter(1))
+ did = 0
+ to_mark_out = []
+ for osd_id, when in bad_osds:
+ ratio = float(num_in - did - 1) / float(num_osds)
+ if ratio < min_in_ratio:
+ final_ratio = float(num_in - num_bad) / float(num_osds)
+ checks[DEVICE_HEALTH_TOOMANY] = {
+ 'severity': 'warning',
+ 'summary': HEALTH_MESSAGES[DEVICE_HEALTH_TOOMANY],
+ 'detail': [
+ '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (
+ num_bad - did, final_ratio, min_in_ratio)
+ ]
+ }
+ break
+ to_mark_out.append(osd_id)
+ did += 1
+ if to_mark_out:
+ self.mark_out_etc(to_mark_out)
+ for warning, ls in health_warnings.items():
+ n = len(ls)
+ if n:
+ checks[warning] = {
+ 'severity': 'warning',
+ 'summary': HEALTH_MESSAGES[warning] % n,
+ 'count': len(ls),
+ 'detail': ls,
+ }
+ self.set_health_checks(checks)
+ return 0, "", ""
+
+ def is_osd_in(self, osdmap: Dict[str, Any], osd_id: str) -> bool:
+ for osd in osdmap['osds']:
+ if osd_id == str(osd['osd']):
+ return bool(osd['in'])
+ return False
+
+ def get_osd_num_pgs(self, osd_id: str) -> int:
+ stats = self.get('osd_stats')
+ assert stats is not None
+ for stat in stats['osd_stats']:
+ if osd_id == str(stat['osd']):
+ return stat['num_pgs']
+ return -1
+
+ def mark_out_etc(self, osd_ids: List[str]) -> None:
+ self.log.info('Marking out OSDs: %s' % osd_ids)
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd out',
+ 'format': 'json',
+ 'ids': osd_ids,
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]',
+ osd_ids, r, outb, outs)
+ for osd_id in osd_ids:
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd primary-affinity',
+ 'format': 'json',
+ 'id': int(osd_id),
+ 'weight': 0.0,
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.warning('Could not set osd.%s primary-affinity, '
+ 'r: [%s], outb: [%s], outs: [%s]',
+ osd_id, r, outb, outs)
+
+ def extract_smart_features(self, raw: Any) -> Any:
+ # FIXME: extract and normalize raw smartctl --json output and
+ # generate a dict of the fields we care about.
+ return raw
+
+ def predict_lift_expectancy(self, devid: str) -> Tuple[int, str, str]:
+ plugin_name = ''
+ model = self.get_ceph_option('device_failure_prediction_mode')
+ if cast(str, model).lower() == 'local':
+ plugin_name = 'diskprediction_local'
+ else:
+ return -1, '', 'unable to enable any disk prediction model[local/cloud]'
+ try:
+ can_run, _ = self.remote(plugin_name, 'can_run')
+ if can_run:
+ return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
+ else:
+ return -1, '', f'{plugin_name} is not available'
+ except Exception:
+ return -1, '', 'unable to invoke diskprediction local or remote plugin'
+
+ def predict_all_devices(self) -> Tuple[int, str, str]:
+ plugin_name = ''
+ model = self.get_ceph_option('device_failure_prediction_mode')
+ if cast(str, model).lower() == 'local':
+ plugin_name = 'diskprediction_local'
+ else:
+ return -1, '', 'unable to enable any disk prediction model[local/cloud]'
+ try:
+ can_run, _ = self.remote(plugin_name, 'can_run')
+ if can_run:
+ return self.remote(plugin_name, 'predict_all_devices')
+ else:
+ return -1, '', f'{plugin_name} is not available'
+ except Exception:
+ return -1, '', 'unable to invoke diskprediction local or remote plugin'
+
+ def get_recent_device_metrics(self, devid: str, min_sample: str) -> Dict[str, Dict[str, Any]]:
+ try:
+ return self._get_device_metrics(devid, min_sample=min_sample)
+ except MgrDBNotReady:
+ return dict()
+
+ def get_time_format(self) -> str:
+ return TIME_FORMAT