summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/diskprediction_cloud/task.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/diskprediction_cloud/task.py181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction_cloud/task.py b/src/pybind/mgr/diskprediction_cloud/task.py
new file mode 100644
index 00000000..6ed04e60
--- /dev/null
+++ b/src/pybind/mgr/diskprediction_cloud/task.py
@@ -0,0 +1,181 @@
+from __future__ import absolute_import
+
+import time
+from threading import Event, Thread
+
+from .agent.predictor import PredictAgent
+from .agent.metrics.ceph_cluster import CephClusterAgent
+from .agent.metrics.ceph_mon_osd import CephMonOsdAgent
+from .agent.metrics.ceph_pool import CephPoolAgent
+from .agent.metrics.db_relay import DBRelayAgent
+from .agent.metrics.sai_agent import SAIAgent
+from .agent.metrics.sai_cluster import SAICluserAgent
+from .agent.metrics.sai_disk import SAIDiskAgent
+from .agent.metrics.sai_disk_smart import SAIDiskSmartAgent
+from .agent.metrics.sai_host import SAIHostAgent
+from .common import DP_MGR_STAT_FAILED, DP_MGR_STAT_OK, DP_MGR_STAT_WARNING
+
+
+class AgentRunner(Thread):
+
+ task_name = ''
+ interval_key = ''
+ agents = []
+
+ def __init__(self, mgr_module, agent_timeout=60, call_back=None):
+ """
+
+ :param mgr_module: parent ceph mgr module
+ :param agent_timeout: (unit seconds) agent execute timeout value, default: 60 secs
+ """
+ Thread.__init__(self)
+ self._agent_timeout = agent_timeout
+ self._module_inst = mgr_module
+ self._log = mgr_module.log
+ self._start_time = time.time()
+ self._th = None
+ self._call_back = call_back
+ self.exit = False
+ self.event = Event()
+ self.task_interval = \
+ int(self._module_inst.get_configuration(self.interval_key))
+
+ def terminate(self):
+ self.exit = True
+ self.event.set()
+ self._log.info('PDS terminate %s complete' % self.task_name)
+
+ def run(self):
+ self._start_time = time.time()
+ self._log.info(
+ 'start %s, interval: %s'
+ % (self.task_name, self.task_interval))
+ while not self.exit:
+ self.run_agents()
+ if self._call_back:
+ self._call_back()
+ if self.event:
+ self.event.wait(int(self.task_interval))
+ self.event.clear()
+ self._log.info(
+ 'completed %s(%s)' % (self.task_name, time.time()-self._start_time))
+
+ def run_agents(self):
+ obj_sender = None
+ try:
+ self._log.debug('run_agents %s' % self.task_name)
+ from .common.grpcclient import GRPcClient, gen_configuration
+ conf = gen_configuration(
+ host=self._module_inst.get_configuration('diskprediction_server'),
+ user=self._module_inst.get_configuration('diskprediction_user'),
+ password=self._module_inst.get_configuration(
+ 'diskprediction_password'),
+ port=self._module_inst.get_configuration('diskprediction_port'),
+ cert_context=self._module_inst.get_configuration('diskprediction_cert_context'),
+ mgr_inst=self._module_inst,
+ ssl_target_name=self._module_inst.get_configuration('diskprediction_ssl_target_name_override'),
+ default_authority=self._module_inst.get_configuration('diskprediction_default_authority'))
+ obj_sender = GRPcClient(conf)
+ if not obj_sender:
+ self._log.error('invalid diskprediction sender')
+ self._module_inst.status = \
+ {'status': DP_MGR_STAT_FAILED,
+ 'reason': 'invalid diskprediction sender'}
+ raise Exception('invalid diskprediction sender')
+ if obj_sender.test_connection():
+ self._module_inst.status = {'status': DP_MGR_STAT_OK}
+ self._log.debug('succeed to test connection')
+ self._run(self._module_inst, obj_sender)
+ else:
+ self._log.error('failed to test connection')
+ self._module_inst.status = \
+ {'status': DP_MGR_STAT_FAILED,
+ 'reason': 'failed to test connection'}
+ raise Exception('failed to test connection')
+ except Exception as e:
+ self._module_inst.status = \
+ {'status': DP_MGR_STAT_FAILED,
+ 'reason': 'failed to start %s agents, %s'
+ % (self.task_name, str(e))}
+ self._log.error(
+ 'failed to start %s agents, %s' % (self.task_name, str(e)))
+ raise
+ finally:
+ if obj_sender:
+ obj_sender.close()
+
+ def is_timeout(self):
+ now = time.time()
+ if (now - self._start_time) > self._agent_timeout:
+ return True
+ else:
+ return False
+
+ def _run(self, module_inst, sender):
+ self._log.debug('%s run' % self.task_name)
+ for agent in self.agents:
+ self._start_time = time.time()
+ retry_count = 3
+ while retry_count:
+ retry_count -= 1
+ try:
+ obj_agent = agent(module_inst, sender, self._agent_timeout)
+ obj_agent.run()
+ del obj_agent
+ break
+ except Exception as e:
+ if str(e).find('configuring') >= 0:
+ self._log.debug(
+ 'failed to execute {}, {}, retry again.'.format(
+ agent.measurement, str(e)))
+ time.sleep(1)
+ continue
+ else:
+ module_inst.status = \
+ {'status': DP_MGR_STAT_WARNING,
+ 'reason': 'failed to execute {}, {}'.format(
+ agent.measurement, ';'.join(str(e).split('\n\t')))}
+ self._log.warning(
+ 'failed to execute {}, {}'.format(
+ agent.measurement, ';'.join(str(e).split('\n\t'))))
+ break
+
+
+class MetricsRunner(AgentRunner):
+
+ task_name = 'Metrics Agent'
+ interval_key = 'diskprediction_upload_metrics_interval'
+ agents = [CephClusterAgent, CephMonOsdAgent, CephPoolAgent,
+ SAICluserAgent, SAIDiskAgent, SAIHostAgent, DBRelayAgent,
+ SAIAgent]
+
+
+class PredictRunner(AgentRunner):
+
+ task_name = 'Predictor Agent'
+ interval_key = 'diskprediction_retrieve_prediction_interval'
+ agents = [PredictAgent]
+
+
+class SmartRunner(AgentRunner):
+
+ task_name = 'Smart data Agent'
+ interval_key = 'diskprediction_upload_smart_interval'
+ agents = [SAIDiskSmartAgent]
+
+
+class TestRunner(object):
+ task_name = 'Test Agent'
+ interval_key = 'diskprediction_upload_metrics_interval'
+ agents = [CephClusterAgent, CephMonOsdAgent, CephPoolAgent,
+ SAICluserAgent, SAIDiskAgent, SAIHostAgent, DBRelayAgent,
+ SAIAgent, SAIDiskSmartAgent]
+
+ def __init__(self, mgr_module):
+ self._module_inst = mgr_module
+
+ def run(self):
+ for agent in self.agents:
+ obj_agent = agent(self._module_inst, None)
+ obj_agent.run()
+ del obj_agent