summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/diskprediction_cloud/common/grpcclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/diskprediction_cloud/common/grpcclient.py')
-rw-r--r--src/pybind/mgr/diskprediction_cloud/common/grpcclient.py242
1 files changed, 242 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction_cloud/common/grpcclient.py b/src/pybind/mgr/diskprediction_cloud/common/grpcclient.py
new file mode 100644
index 00000000..5a1d5e7e
--- /dev/null
+++ b/src/pybind/mgr/diskprediction_cloud/common/grpcclient.py
@@ -0,0 +1,242 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+import grpc
+import json
+from logging import getLogger
+import time
+
+from . import DummyResonse
+from . import client_pb2
+from . import client_pb2_grpc
+
+
+def gen_configuration(**kwargs):
+ configuration = {
+ 'host': kwargs.get('host', 'api.diskprophet.com'),
+ 'user': kwargs.get('user'),
+ 'password': kwargs.get('password'),
+ 'port': kwargs.get('port', 31400),
+ 'mgr_inst': kwargs.get('mgr_inst', None),
+ 'cert_context': kwargs.get('cert_context'),
+ 'ssl_target_name': kwargs.get('ssl_target_name', 'api.diskprophet.com'),
+ 'default_authority': kwargs.get('default_authority', 'api.diskprophet.com')}
+ return configuration
+
+
+class GRPcClient(object):
+
+ def __init__(self, configuration):
+ self.auth = None
+ self.host = configuration.get('host')
+ self.port = configuration.get('port')
+ if configuration.get('user') and configuration.get('password'):
+ self.auth = (
+ ('account', configuration.get('user')),
+ ('password', configuration.get('password')))
+ self.cert_context = configuration.get('cert_context')
+ self.ssl_target_name = configuration.get('ssl_target_name')
+ self.default_authority = configuration.get('default_authority')
+ self.mgr_inst = configuration.get('mgr_inst')
+ if self.mgr_inst:
+ self._logger = self.mgr_inst.log
+ else:
+ self._logger = getLogger()
+ self._client = self._get_channel()
+
+ def close(self):
+ if self._client:
+ self._client.close()
+
+ @staticmethod
+ def connectivity_update(connectivity):
+ pass
+
+ def _get_channel(self):
+ try:
+ creds = grpc.ssl_channel_credentials(
+ root_certificates=self.cert_context)
+ channel = \
+ grpc.secure_channel('{}:{}'.format(
+ self.host, self.port), creds,
+ options=(('grpc.ssl_target_name_override', self.ssl_target_name,),
+ ('grpc.default_authority', self.default_authority),))
+ channel.subscribe(self.connectivity_update, try_to_connect=True)
+ return channel
+ except Exception as e:
+ self._logger.error(
+ 'failed to create connection exception: {}'.format(
+ ';'.join(str(e).split('\n\t'))))
+ return None
+
+ def test_connection(self):
+ try:
+ stub_accout = client_pb2_grpc.AccountStub(self._client)
+ result = stub_accout.AccountHeartbeat(client_pb2.Empty())
+ self._logger.debug('text connection result: {}'.format(str(result)))
+ if result and "is alive" in str(result.message):
+ return True
+ else:
+ return False
+ except Exception as e:
+ self._logger.error(
+ 'failed to test connection exception: {}'.format(
+ ';'.join(str(e).split('\n\t'))))
+ return False
+
+ def _send_metrics(self, data, measurement):
+ status_info = dict()
+ status_info['measurement'] = None
+ status_info['success_count'] = 0
+ status_info['failure_count'] = 0
+ for dp_data in data:
+ d_measurement = dp_data.measurement
+ if not d_measurement:
+ status_info['measurement'] = measurement
+ else:
+ status_info['measurement'] = d_measurement
+ tag_list = []
+ field_list = []
+ for name in dp_data.tags:
+ tag = '{}={}'.format(name, dp_data.tags[name])
+ tag_list.append(tag)
+ for name in dp_data.fields:
+ if dp_data.fields[name] is None:
+ continue
+ if isinstance(dp_data.fields[name], str):
+ field = '{}=\"{}\"'.format(name, dp_data.fields[name])
+ elif isinstance(dp_data.fields[name], bool):
+ field = '{}={}'.format(name,
+ str(dp_data.fields[name]).lower())
+ elif (isinstance(dp_data.fields[name], int) or
+ isinstance(dp_data.fields[name], long)):
+ field = '{}={}i'.format(name, dp_data.fields[name])
+ else:
+ field = '{}={}'.format(name, dp_data.fields[name])
+ field_list.append(field)
+ data = '{},{} {} {}'.format(
+ status_info['measurement'],
+ ','.join(tag_list),
+ ','.join(field_list),
+ int(time.time() * 1000 * 1000 * 1000))
+ try:
+ resp = self._send_info(data=[data], measurement=status_info['measurement'])
+ status_code = resp.status_code
+ if 200 <= status_code < 300:
+ self._logger.debug(
+ '{} send diskprediction api success(ret: {})'.format(
+ status_info['measurement'], status_code))
+ status_info['success_count'] += 1
+ else:
+ self._logger.error(
+ 'return code: {}, content: {}'.format(
+ status_code, resp.content))
+ status_info['failure_count'] += 1
+ except Exception as e:
+ status_info['failure_count'] += 1
+ self._logger.error(str(e))
+ return status_info
+
+ def _send_db_relay(self, data, measurement):
+ status_info = dict()
+ status_info['measurement'] = measurement
+ status_info['success_count'] = 0
+ status_info['failure_count'] = 0
+ for dp_data in data:
+ try:
+ resp = self._send_info(
+ data=[dp_data.fields['cmd']], measurement=measurement)
+ status_code = resp.status_code
+ if 200 <= status_code < 300:
+ self._logger.debug(
+ '{} send diskprediction api success(ret: {})'.format(
+ measurement, status_code))
+ status_info['success_count'] += 1
+ else:
+ self._logger.error(
+ 'return code: {}, content: {}'.format(
+ status_code, resp.content))
+ status_info['failure_count'] += 1
+ except Exception as e:
+ status_info['failure_count'] += 1
+ self._logger.error(str(e))
+ return status_info
+
+ def send_info(self, data, measurement):
+ """
+ :param data: data structure
+ :param measurement: data measurement class name
+ :return:
+ status_info = {
+ 'success_count': <count>,
+ 'failure_count': <count>
+ }
+ """
+ if measurement == 'db_relay':
+ return self._send_db_relay(data, measurement)
+ else:
+ return self._send_metrics(data, measurement)
+
+ def _send_info(self, data, measurement):
+ resp = DummyResonse()
+ try:
+ stub_collection = client_pb2_grpc.CollectionStub(self._client)
+ if measurement == 'db_relay':
+ result = stub_collection.PostDBRelay(
+ client_pb2.PostDBRelayInput(cmds=data), metadata=self.auth)
+ else:
+ result = stub_collection.PostMetrics(
+ client_pb2.PostMetricsInput(points=data), metadata=self.auth)
+ if result and 'success' in str(result.message).lower():
+ resp.status_code = 200
+ resp.content = ''
+ else:
+ resp.status_code = 400
+ resp.content = ';'.join(str(result).split('\n\t'))
+ self._logger.error(
+ 'failed to send info: {}'.format(resp.content))
+ except Exception as e:
+ resp.status_code = 400
+ resp.content = ';'.join(str(e).split('\n\t'))
+ self._logger.error(
+ 'failed to send info exception: {}'.format(resp.content))
+ return resp
+
+ def query_info(self, host_domain_id, disk_domain_id, measurement):
+ resp = DummyResonse()
+ try:
+ stub_dp = client_pb2_grpc.DiskprophetStub(self._client)
+ predicted = stub_dp.DPGetDisksPrediction(
+ client_pb2.DPGetDisksPredictionInput(
+ physicalDiskIds=disk_domain_id),
+ metadata=self.auth)
+ if predicted and hasattr(predicted, 'data'):
+ resp.status_code = 200
+ resp.content = ''
+ resp_json = json.loads(predicted.data)
+ rc = resp_json.get('results', [])
+ if rc:
+ series = rc[0].get('series', [])
+ if series:
+ values = series[0].get('values', [])
+ if not values:
+ resp.resp_json = {}
+ else:
+ columns = series[0].get('columns', [])
+ for item in values:
+ # get prediction key and value from server.
+ for name, value in zip(columns, item):
+ # process prediction data
+ resp.resp_json[name] = value
+ self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
+ return resp
+ else:
+ resp.status_code = 400
+ resp.content = ''
+ resp.resp_json = {'error': ';'.join(str(predicted).split('\n\t'))}
+ self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
+ return resp
+ except Exception as e:
+ resp.status_code = 400
+ resp.content = ';'.join(str(e).split('\n\t'))
+ resp.resp_json = {'error': resp.content}
+ self._logger.debug("query {}:{} result:{}".format(host_domain_id, disk_domain_id, resp))
+ return resp