summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/diskprediction_cloud/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/diskprediction_cloud/module.py')
-rw-r--r--src/pybind/mgr/diskprediction_cloud/module.py454
1 files changed, 454 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction_cloud/module.py b/src/pybind/mgr/diskprediction_cloud/module.py
new file mode 100644
index 00000000..5052d55e
--- /dev/null
+++ b/src/pybind/mgr/diskprediction_cloud/module.py
@@ -0,0 +1,454 @@
+"""
+diskprediction with cloud predictor
+"""
+from __future__ import absolute_import
+
+import base64
+from datetime import datetime
+import errno
+import json
+from mgr_module import MgrModule
+import os
+from threading import Event
+
+try:
+ from string import maketrans
+except ImportError:
+ maketrans = str.maketrans
+
+from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED
+from .task import MetricsRunner, SmartRunner, PredictRunner, TestRunner
+
+TIME_DAYS = 24*60*60
+TIME_WEEK = TIME_DAYS * 7
+DP_AGENTS = [MetricsRunner, SmartRunner, PredictRunner]
+CUSTOMER_ALPHABET = "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45"
+ORIGIN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
+
+
+def get_transtable():
+ transtable = maketrans(ORIGIN_ALPHABET, CUSTOMER_ALPHABET)
+ return transtable
+
+
+def get_reverse_transtable():
+ transtable = maketrans(CUSTOMER_ALPHABET, ORIGIN_ALPHABET)
+ return transtable
+
+
+def encode_string(value):
+ if len(value) == 0:
+ return ""
+ transtable = get_transtable()
+ e = str((base64.b64encode(str(value).encode())).decode("utf-8"))
+ e = e.rstrip("=")
+ return e.translate(transtable)
+
+
+class Module(MgrModule):
+
+ MODULE_OPTIONS = [
+ {
+ 'name': 'diskprediction_server',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_port',
+ 'default': '31400'
+ },
+ {
+ 'name': 'diskprediction_user',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_password',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_upload_metrics_interval',
+ 'default': '600'
+ },
+ {
+ 'name': 'diskprediction_upload_smart_interval',
+ 'default': '43200'
+ },
+ {
+ 'name': 'diskprediction_retrieve_prediction_interval',
+ 'default': '43200'
+ },
+ {
+ 'name': 'diskprediction_cert_context',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_ssl_target_name_override',
+ 'default': 'localhost'
+ },
+ {
+ 'name': 'diskprediction_default_authority',
+ 'default': 'localhost'
+ },
+ {
+ 'name': 'sleep_interval',
+ 'default': str(600),
+ }
+ ]
+
+ COMMANDS = [
+ {
+ 'cmd': 'device show-prediction-config',
+ 'desc': 'Prints diskprediction configuration',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device set-cloud-prediction-config '
+ 'name=server,type=CephString,req=true '
+ 'name=user,type=CephString,req=true '
+ 'name=password,type=CephString,req=true '
+ 'name=certfile,type=CephString,req=true '
+ 'name=port,type=CephString,req=false ',
+ 'desc': 'Configure Disk Prediction service',
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'device debug metrics-forced',
+ 'desc': 'Run metrics agent forced',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device debug smart-forced',
+ 'desc': 'Run smart agent forced',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'diskprediction_cloud status',
+ 'desc': 'Check diskprediction_cloud status',
+ 'perm': 'r'
+ }
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.status = {'status': DP_MGR_STAT_DISABLED}
+ self._event = Event()
+ self._predict_event = Event()
+ self._agents = []
+ self._activated_cloud = False
+ self.prediction_result = {}
+ self.config = dict()
+ self._run = True
+
+ def config_notify(self):
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+ if not self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') == 'cloud':
+ self._event.set()
+ if self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') != 'cloud':
+ self._event.set()
+
+ @property
+ def config_keys(self):
+ return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
+
+ def set_config_option(self, option, value):
+ if option not in self.config_keys.keys():
+ raise RuntimeError('{0} is a unknown configuration '
+ 'option'.format(option))
+
+ if option in ['diskprediction_port',
+ 'diskprediction_upload_metrics_interval',
+ 'diskprediction_upload_smart_interval',
+ 'diskprediction_retrieve_prediction_interval']:
+ if not str(value).isdigit():
+ raise RuntimeError('invalid {} configured. Please specify '
+ 'a valid integer {}'.format(option, value))
+
+ self.log.debug('Setting in-memory config option %s to: %s', option,
+ value)
+ self.set_module_option(option, value)
+ self.config[option] = value
+
+ return True
+
+ def get_configuration(self, key):
+ return self.get_module_option(key, self.config_keys[key])
+
+ @staticmethod
+ def _convert_timestamp(predicted_timestamp, life_expectancy_day):
+ """
+ :param predicted_timestamp: unit is nanoseconds
+ :param life_expectancy_day: unit is seconds
+ :return:
+ date format '%Y-%m-%d' ex. 2018-01-01
+ """
+ return datetime.fromtimestamp(
+ predicted_timestamp / (1000 ** 3) + life_expectancy_day).strftime('%Y-%m-%d')
+
+ def _show_prediction_config(self, cmd):
+ self.show_module_config()
+ return 0, json.dumps(self.config, indent=4), ''
+
+ def _set_ssl_target_name(self, cmd):
+ str_ssl_target = cmd.get('ssl_target_name', '')
+ try:
+ self.set_module_option('diskprediction_ssl_target_name_override', str_ssl_target)
+ return (0,
+ 'success to config ssl target name', 0)
+ except Exception as e:
+ return -errno.EINVAL, '', str(e)
+
+ def _set_ssl_default_authority(self, cmd):
+ str_ssl_authority = cmd.get('ssl_authority', '')
+ try:
+ self.set_module_option('diskprediction_default_authority', str_ssl_authority)
+ return 0, 'success to config ssl default authority', 0
+ except Exception as e:
+ return -errno.EINVAL, '', str(e)
+
+ def _set_cloud_prediction_config(self, cmd):
+ str_cert_path = cmd.get('certfile', '')
+ if os.path.exists(str_cert_path):
+ with open(str_cert_path, 'rb') as f:
+ trusted_certs = f.read()
+ self.set_config_option(
+ 'diskprediction_cert_context', trusted_certs)
+ for _agent in self._agents:
+ _agent.event.set()
+ self.set_module_option('diskprediction_server', cmd['server'])
+ self.set_module_option('diskprediction_user', cmd['user'])
+ self.set_module_option('diskprediction_password', encode_string(cmd['password']))
+ if cmd.get('port'):
+ self.set_module_option('diskprediction_port', cmd['port'])
+ return 0, 'succeed to config cloud mode connection', ''
+ else:
+ return -errno.EINVAL, '', 'certification file not existed'
+
+ def _debug_metrics_forced(self, cmd):
+ msg = ''
+ for _agent in self._agents:
+ if isinstance(_agent, MetricsRunner):
+ msg = 'run metrics agent successfully'
+ _agent.event.set()
+ return 0, msg, ''
+
+ def _debug_smart_forced(self, cmd):
+ msg = ' '
+ for _agent in self._agents:
+ if isinstance(_agent, SmartRunner):
+ msg = 'run smart agent successfully'
+ _agent.event.set()
+ return 0, msg, ''
+
+ def refresh_config(self):
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def _status(self, cmd):
+ return 0, json.dumps(self.status), ''
+
+ def _refresh_cloud_prediction_result(self):
+ for _agent in self._agents:
+ if isinstance(_agent, PredictRunner):
+ self._predict_event.clear()
+ _agent.event.set()
+ self._predict_event.wait(300)
+ if self._predict_event.is_set():
+ self._predict_event.clear()
+ break
+
+ def predict_life_expectancy(self, devid):
+ assert devid
+ result = self.get('device {}'.format(devid))
+ if not result:
+ return -1, '', 'device {} not found'.format(devid)
+ dev_info = result.get('device', {})
+ if not dev_info:
+ return -1, '', 'device {} not found'.format(devid)
+ self._refresh_cloud_prediction_result()
+ prediction_data = self.prediction_result.get(devid)
+ if not prediction_data:
+ return -1, '', 'device {} prediction data not ready'.format(devid)
+ elif prediction_data.get('near_failure', '').lower() == 'good':
+ return 0, '>6w', ''
+ elif prediction_data.get('near_failure', '').lower() == 'warning':
+ return 0, '>=2w and <=6w', ''
+ elif prediction_data.get('near_failure', '').lower() == 'bad':
+ return 0, '<2w', ''
+ else:
+ return 0, 'unknown', ''
+
+ def _update_device_life_expectancy_day(self, devid, prediction):
+ # Update osd life-expectancy
+ from .common.clusterdata import ClusterAPI
+ predicted = None
+ life_expectancy_day_min = None
+ life_expectancy_day_max = None
+ if prediction.get('predicted'):
+ predicted = int(prediction['predicted'])
+ if prediction.get('near_failure'):
+ if prediction['near_failure'].lower() == 'good':
+ life_expectancy_day_min = (TIME_WEEK * 6) + TIME_DAYS
+ life_expectancy_day_max = None
+ elif prediction['near_failure'].lower() == 'warning':
+ life_expectancy_day_min = (TIME_WEEK * 2)
+ life_expectancy_day_max = (TIME_WEEK * 6)
+ elif prediction['near_failure'].lower() == 'bad':
+ life_expectancy_day_min = 0
+ life_expectancy_day_max = (TIME_WEEK * 2) - TIME_DAYS
+ else:
+ # Near failure state is unknown.
+ predicted = None
+ life_expectancy_day_min = None
+ life_expectancy_day_max = None
+
+ obj_api = ClusterAPI(self)
+ if predicted and devid and life_expectancy_day_min is not None:
+ from_date = None
+ to_date = None
+ try:
+ if life_expectancy_day_min is not None:
+ from_date = self._convert_timestamp(predicted, life_expectancy_day_min)
+
+ if life_expectancy_day_max is not None:
+ to_date = self._convert_timestamp(predicted, life_expectancy_day_max)
+
+ obj_api.set_device_life_expectancy(devid, from_date, to_date)
+ self.log.info(
+ 'succeed to set device {} life expectancy from: {}, to: {}'.format(
+ devid, from_date, to_date))
+ except Exception as e:
+ self.log.error(
+ 'failed to set device {} life expectancy from: {}, to: {}, {}'.format(
+ devid, from_date, to_date, str(e)))
+ else:
+ obj_api.reset_device_life_expectancy(devid)
+
+ def predict_all_devices(self):
+ if not self._activated_cloud:
+ return -1, '', 'diskprecition_cloud not ready'
+ self.refresh_config()
+ result = self.get('devices')
+ if not result:
+ return -1, '', 'unable to get all devices for prediction'
+ self._refresh_cloud_prediction_result()
+ for dev in result.get('devices', []):
+ devid = dev.get('devid')
+ if not devid:
+ continue
+ prediction_data = self.prediction_result.get(devid)
+ if prediction_data:
+ break
+ if not prediction_data:
+ return -1, '', 'device {} prediction data not ready'.format(dev.get('devid'))
+ else:
+ self._update_device_life_expectancy_day(dev.get('devid'), prediction_data)
+ return 0, '', ''
+
+ def handle_command(self, _, cmd):
+ for o_cmd in self.COMMANDS:
+ if cmd['prefix'] == o_cmd['cmd'][:len(cmd['prefix'])]:
+ fun_name = ''
+ avgs = o_cmd['cmd'].split(' ')
+ for avg in avgs:
+ if avg.lower() == 'diskprediction_cloud':
+ continue
+ if avg.lower() == 'device':
+ continue
+ if '=' in avg or ',' in avg or not avg:
+ continue
+ fun_name += '_%s' % avg.replace('-', '_')
+ if fun_name:
+ fun = getattr(
+ self, fun_name)
+ if fun:
+ return fun(cmd)
+ return -errno.EINVAL, '', 'cmd not found'
+
+ def show_module_config(self):
+ for key, default in self.config_keys.items():
+ self.set_config_option(key, self.get_module_option(key, default))
+
+ def serve(self):
+ self.log.info('Starting diskprediction module')
+ self.config_notify()
+ self.status = {'status': DP_MGR_STAT_ENABLED}
+
+ while self._run:
+ self.refresh_config()
+ mode = self.get_ceph_option('device_failure_prediction_mode')
+ if mode == 'cloud':
+ if not self._activated_cloud:
+ self.start_cloud_disk_prediction()
+ else:
+ if self._activated_cloud:
+ self.stop_disk_prediction()
+
+ # Check agent hang is?
+ restart_agent = False
+ try:
+ for dp_agent in self._agents:
+ if dp_agent.is_timeout():
+ self.log.error('agent name: {] timeout'.format(dp_agent.task_name))
+ restart_agent = True
+ break
+ except Exception as IOError:
+ self.log.error('disk prediction plugin failed to started and try to restart')
+ restart_agent = True
+
+ if restart_agent:
+ self.stop_disk_prediction()
+ else:
+ sleep_interval = int(self.sleep_interval) or 60
+ self._event.wait(sleep_interval)
+ self._event.clear()
+ self.stop_disk_prediction()
+
+ def _agent_call_back(self):
+ self.log.debug('notify refresh devices prediction result')
+ self._predict_event.set()
+
+ def start_cloud_disk_prediction(self):
+ assert not self._activated_cloud
+ for dp_agent in DP_AGENTS:
+ if dp_agent == PredictRunner:
+ obj_agent = dp_agent(self, 300, self._agent_call_back)
+ else:
+ obj_agent = dp_agent(self, 300)
+ if obj_agent:
+ obj_agent.start()
+ else:
+ raise Exception('failed to start task %s' % obj_agent.task_name)
+ self._agents.append(obj_agent)
+ self._activated_cloud = True
+ self.log.info('start cloud disk prediction')
+
+ def stop_disk_prediction(self):
+ assert self._activated_cloud
+ try:
+ self.status = {'status': DP_MGR_STAT_DISABLED}
+ while self._agents:
+ dp_agent = self._agents.pop()
+ self.log.info('agent name: {}'.format(dp_agent.task_name))
+ dp_agent.terminate()
+ dp_agent.join(5)
+ del dp_agent
+ self._activated_cloud = False
+ self.log.info('stop disk prediction')
+ except Exception as IOError:
+ self.log.error('failed to stop disk prediction clould plugin')
+
+ def shutdown(self):
+ self._run = False
+ self._event.set()
+ super(Module, self).shutdown()
+
+ def self_test(self):
+ obj_test = TestRunner(self)
+ obj_test.run()
+ self.log.info('self test completed')