diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/diskprediction_cloud/module.py | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/diskprediction_cloud/module.py')
-rw-r--r-- | src/pybind/mgr/diskprediction_cloud/module.py | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction_cloud/module.py b/src/pybind/mgr/diskprediction_cloud/module.py new file mode 100644 index 00000000..5052d55e --- /dev/null +++ b/src/pybind/mgr/diskprediction_cloud/module.py @@ -0,0 +1,454 @@ +""" +diskprediction with cloud predictor +""" +from __future__ import absolute_import + +import base64 +from datetime import datetime +import errno +import json +from mgr_module import MgrModule +import os +from threading import Event + +try: + from string import maketrans +except ImportError: + maketrans = str.maketrans + +from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED +from .task import MetricsRunner, SmartRunner, PredictRunner, TestRunner + +TIME_DAYS = 24*60*60 +TIME_WEEK = TIME_DAYS * 7 +DP_AGENTS = [MetricsRunner, SmartRunner, PredictRunner] +CUSTOMER_ALPHABET = "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45" +ORIGIN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + + +def get_transtable(): + transtable = maketrans(ORIGIN_ALPHABET, CUSTOMER_ALPHABET) + return transtable + + +def get_reverse_transtable(): + transtable = maketrans(CUSTOMER_ALPHABET, ORIGIN_ALPHABET) + return transtable + + +def encode_string(value): + if len(value) == 0: + return "" + transtable = get_transtable() + e = str((base64.b64encode(str(value).encode())).decode("utf-8")) + e = e.rstrip("=") + return e.translate(transtable) + + +class Module(MgrModule): + + MODULE_OPTIONS = [ + { + 'name': 'diskprediction_server', + 'default': '' + }, + { + 'name': 'diskprediction_port', + 'default': '31400' + }, + { + 'name': 'diskprediction_user', + 'default': '' + }, + { + 'name': 'diskprediction_password', + 'default': '' + }, + { + 'name': 'diskprediction_upload_metrics_interval', + 'default': '600' + }, + { + 'name': 'diskprediction_upload_smart_interval', + 'default': '43200' + }, + { + 'name': 'diskprediction_retrieve_prediction_interval', + 'default': '43200' + }, + { + 'name': 'diskprediction_cert_context', + 'default': '' + }, + { + 'name': 'diskprediction_ssl_target_name_override', + 'default': 'localhost' + }, + { + 'name': 'diskprediction_default_authority', + 'default': 'localhost' + }, + { + 'name': 'sleep_interval', + 'default': str(600), + } + ] + + COMMANDS = [ + { + 'cmd': 'device show-prediction-config', + 'desc': 'Prints diskprediction configuration', + 'perm': 'r' + }, + { + 'cmd': 'device set-cloud-prediction-config ' + 'name=server,type=CephString,req=true ' + 'name=user,type=CephString,req=true ' + 'name=password,type=CephString,req=true ' + 'name=certfile,type=CephString,req=true ' + 'name=port,type=CephString,req=false ', + 'desc': 'Configure Disk Prediction service', + 'perm': 'rw' + }, + { + 'cmd': 'device debug metrics-forced', + 'desc': 'Run metrics agent forced', + 'perm': 'r' + }, + { + 'cmd': 'device debug smart-forced', + 'desc': 'Run smart agent forced', + 'perm': 'r' + }, + { + 'cmd': 'diskprediction_cloud status', + 'desc': 'Check diskprediction_cloud status', + 'perm': 'r' + } + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.status = {'status': DP_MGR_STAT_DISABLED} + self._event = Event() + self._predict_event = Event() + self._agents = [] + self._activated_cloud = False + self.prediction_result = {} + self.config = dict() + self._run = True + + def config_notify(self): + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) + if not self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') == 'cloud': + self._event.set() + if self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') != 'cloud': + self._event.set() + + @property + def config_keys(self): + return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) + + def set_config_option(self, option, value): + if option not in self.config_keys.keys(): + raise RuntimeError('{0} is a unknown configuration ' + 'option'.format(option)) + + if option in ['diskprediction_port', + 'diskprediction_upload_metrics_interval', + 'diskprediction_upload_smart_interval', + 'diskprediction_retrieve_prediction_interval']: + if not str(value).isdigit(): + raise RuntimeError('invalid {} configured. Please specify ' + 'a valid integer {}'.format(option, value)) + + self.log.debug('Setting in-memory config option %s to: %s', option, + value) + self.set_module_option(option, value) + self.config[option] = value + + return True + + def get_configuration(self, key): + return self.get_module_option(key, self.config_keys[key]) + + @staticmethod + def _convert_timestamp(predicted_timestamp, life_expectancy_day): + """ + :param predicted_timestamp: unit is nanoseconds + :param life_expectancy_day: unit is seconds + :return: + date format '%Y-%m-%d' ex. 2018-01-01 + """ + return datetime.fromtimestamp( + predicted_timestamp / (1000 ** 3) + life_expectancy_day).strftime('%Y-%m-%d') + + def _show_prediction_config(self, cmd): + self.show_module_config() + return 0, json.dumps(self.config, indent=4), '' + + def _set_ssl_target_name(self, cmd): + str_ssl_target = cmd.get('ssl_target_name', '') + try: + self.set_module_option('diskprediction_ssl_target_name_override', str_ssl_target) + return (0, + 'success to config ssl target name', 0) + except Exception as e: + return -errno.EINVAL, '', str(e) + + def _set_ssl_default_authority(self, cmd): + str_ssl_authority = cmd.get('ssl_authority', '') + try: + self.set_module_option('diskprediction_default_authority', str_ssl_authority) + return 0, 'success to config ssl default authority', 0 + except Exception as e: + return -errno.EINVAL, '', str(e) + + def _set_cloud_prediction_config(self, cmd): + str_cert_path = cmd.get('certfile', '') + if os.path.exists(str_cert_path): + with open(str_cert_path, 'rb') as f: + trusted_certs = f.read() + self.set_config_option( + 'diskprediction_cert_context', trusted_certs) + for _agent in self._agents: + _agent.event.set() + self.set_module_option('diskprediction_server', cmd['server']) + self.set_module_option('diskprediction_user', cmd['user']) + self.set_module_option('diskprediction_password', encode_string(cmd['password'])) + if cmd.get('port'): + self.set_module_option('diskprediction_port', cmd['port']) + return 0, 'succeed to config cloud mode connection', '' + else: + return -errno.EINVAL, '', 'certification file not existed' + + def _debug_metrics_forced(self, cmd): + msg = '' + for _agent in self._agents: + if isinstance(_agent, MetricsRunner): + msg = 'run metrics agent successfully' + _agent.event.set() + return 0, msg, '' + + def _debug_smart_forced(self, cmd): + msg = ' ' + for _agent in self._agents: + if isinstance(_agent, SmartRunner): + msg = 'run smart agent successfully' + _agent.event.set() + return 0, msg, '' + + def refresh_config(self): + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) + + def _status(self, cmd): + return 0, json.dumps(self.status), '' + + def _refresh_cloud_prediction_result(self): + for _agent in self._agents: + if isinstance(_agent, PredictRunner): + self._predict_event.clear() + _agent.event.set() + self._predict_event.wait(300) + if self._predict_event.is_set(): + self._predict_event.clear() + break + + def predict_life_expectancy(self, devid): + assert devid + result = self.get('device {}'.format(devid)) + if not result: + return -1, '', 'device {} not found'.format(devid) + dev_info = result.get('device', {}) + if not dev_info: + return -1, '', 'device {} not found'.format(devid) + self._refresh_cloud_prediction_result() + prediction_data = self.prediction_result.get(devid) + if not prediction_data: + return -1, '', 'device {} prediction data not ready'.format(devid) + elif prediction_data.get('near_failure', '').lower() == 'good': + return 0, '>6w', '' + elif prediction_data.get('near_failure', '').lower() == 'warning': + return 0, '>=2w and <=6w', '' + elif prediction_data.get('near_failure', '').lower() == 'bad': + return 0, '<2w', '' + else: + return 0, 'unknown', '' + + def _update_device_life_expectancy_day(self, devid, prediction): + # Update osd life-expectancy + from .common.clusterdata import ClusterAPI + predicted = None + life_expectancy_day_min = None + life_expectancy_day_max = None + if prediction.get('predicted'): + predicted = int(prediction['predicted']) + if prediction.get('near_failure'): + if prediction['near_failure'].lower() == 'good': + life_expectancy_day_min = (TIME_WEEK * 6) + TIME_DAYS + life_expectancy_day_max = None + elif prediction['near_failure'].lower() == 'warning': + life_expectancy_day_min = (TIME_WEEK * 2) + life_expectancy_day_max = (TIME_WEEK * 6) + elif prediction['near_failure'].lower() == 'bad': + life_expectancy_day_min = 0 + life_expectancy_day_max = (TIME_WEEK * 2) - TIME_DAYS + else: + # Near failure state is unknown. + predicted = None + life_expectancy_day_min = None + life_expectancy_day_max = None + + obj_api = ClusterAPI(self) + if predicted and devid and life_expectancy_day_min is not None: + from_date = None + to_date = None + try: + if life_expectancy_day_min is not None: + from_date = self._convert_timestamp(predicted, life_expectancy_day_min) + + if life_expectancy_day_max is not None: + to_date = self._convert_timestamp(predicted, life_expectancy_day_max) + + obj_api.set_device_life_expectancy(devid, from_date, to_date) + self.log.info( + 'succeed to set device {} life expectancy from: {}, to: {}'.format( + devid, from_date, to_date)) + except Exception as e: + self.log.error( + 'failed to set device {} life expectancy from: {}, to: {}, {}'.format( + devid, from_date, to_date, str(e))) + else: + obj_api.reset_device_life_expectancy(devid) + + def predict_all_devices(self): + if not self._activated_cloud: + return -1, '', 'diskprecition_cloud not ready' + self.refresh_config() + result = self.get('devices') + if not result: + return -1, '', 'unable to get all devices for prediction' + self._refresh_cloud_prediction_result() + for dev in result.get('devices', []): + devid = dev.get('devid') + if not devid: + continue + prediction_data = self.prediction_result.get(devid) + if prediction_data: + break + if not prediction_data: + return -1, '', 'device {} prediction data not ready'.format(dev.get('devid')) + else: + self._update_device_life_expectancy_day(dev.get('devid'), prediction_data) + return 0, '', '' + + def handle_command(self, _, cmd): + for o_cmd in self.COMMANDS: + if cmd['prefix'] == o_cmd['cmd'][:len(cmd['prefix'])]: + fun_name = '' + avgs = o_cmd['cmd'].split(' ') + for avg in avgs: + if avg.lower() == 'diskprediction_cloud': + continue + if avg.lower() == 'device': + continue + if '=' in avg or ',' in avg or not avg: + continue + fun_name += '_%s' % avg.replace('-', '_') + if fun_name: + fun = getattr( + self, fun_name) + if fun: + return fun(cmd) + return -errno.EINVAL, '', 'cmd not found' + + def show_module_config(self): + for key, default in self.config_keys.items(): + self.set_config_option(key, self.get_module_option(key, default)) + + def serve(self): + self.log.info('Starting diskprediction module') + self.config_notify() + self.status = {'status': DP_MGR_STAT_ENABLED} + + while self._run: + self.refresh_config() + mode = self.get_ceph_option('device_failure_prediction_mode') + if mode == 'cloud': + if not self._activated_cloud: + self.start_cloud_disk_prediction() + else: + if self._activated_cloud: + self.stop_disk_prediction() + + # Check agent hang is? + restart_agent = False + try: + for dp_agent in self._agents: + if dp_agent.is_timeout(): + self.log.error('agent name: {] timeout'.format(dp_agent.task_name)) + restart_agent = True + break + except Exception as IOError: + self.log.error('disk prediction plugin failed to started and try to restart') + restart_agent = True + + if restart_agent: + self.stop_disk_prediction() + else: + sleep_interval = int(self.sleep_interval) or 60 + self._event.wait(sleep_interval) + self._event.clear() + self.stop_disk_prediction() + + def _agent_call_back(self): + self.log.debug('notify refresh devices prediction result') + self._predict_event.set() + + def start_cloud_disk_prediction(self): + assert not self._activated_cloud + for dp_agent in DP_AGENTS: + if dp_agent == PredictRunner: + obj_agent = dp_agent(self, 300, self._agent_call_back) + else: + obj_agent = dp_agent(self, 300) + if obj_agent: + obj_agent.start() + else: + raise Exception('failed to start task %s' % obj_agent.task_name) + self._agents.append(obj_agent) + self._activated_cloud = True + self.log.info('start cloud disk prediction') + + def stop_disk_prediction(self): + assert self._activated_cloud + try: + self.status = {'status': DP_MGR_STAT_DISABLED} + while self._agents: + dp_agent = self._agents.pop() + self.log.info('agent name: {}'.format(dp_agent.task_name)) + dp_agent.terminate() + dp_agent.join(5) + del dp_agent + self._activated_cloud = False + self.log.info('stop disk prediction') + except Exception as IOError: + self.log.error('failed to stop disk prediction clould plugin') + + def shutdown(self): + self._run = False + self._event.set() + super(Module, self).shutdown() + + def self_test(self): + obj_test = TestRunner(self) + obj_test.run() + self.log.info('self test completed') |