diff options
Diffstat (limited to 'collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py')
-rw-r--r-- | collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py | 271 |
1 files changed, 188 insertions, 83 deletions
diff --git a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py index 20109c64f..8aaa08583 100644 --- a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py +++ b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py @@ -168,6 +168,10 @@ ORDER = [ 'cluster_stats_store', 'cluster_stats_indices', 'cluster_stats_shards_total', + 'index_docs_count', + 'index_store_size', + 'index_replica', + 'index_health', ] CHARTS = { @@ -196,7 +200,8 @@ CHARTS = { ] }, 'search_latency': { - 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency', 'stacked'], + 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency', + 'stacked'], 'lines': [ ['query_latency', 'query', 'absolute', 1, 1000], ['fetch_latency', 'fetch', 'absolute', 1, 1000] @@ -397,9 +402,6 @@ CHARTS = { 'lines': [ ['status_green', 'green', 'absolute'], ['status_red', 'red', 'absolute'], - ['status_foo1', None, 'absolute'], - ['status_foo2', None, 'absolute'], - ['status_foo3', None, 'absolute'], ['status_yellow', 'yellow', 'absolute'] ] }, @@ -483,10 +485,61 @@ CHARTS = { 'lines': [ ['http_current_open', 'opened', 'absolute', 1, 1] ] - } + }, + 'index_docs_count': { + 'options': [None, 'Docs Count', 'count', 'indices', 'elastic.index_docs', 'line'], + 'lines': [] + }, + 'index_store_size': { + 'options': [None, 'Store Size', 'bytes', 'indices', 'elastic.index_store_size', 'line'], + 'lines': [] + }, + 'index_replica': { + 'options': [None, 'Replica', 'count', 'indices', 'elastic.index_replica', 'line'], + 'lines': [] + }, + 'index_health': { + 'options': [None, 'Health', 'status', 'indices', 'elastic.index_health', 'line'], + 'lines': [] + }, } +def convert_index_store_size_to_bytes(size): + # can be b, kb, mb, gb + if size.endswith('kb'): + return round(float(size[:-2]) * 1024) + elif size.endswith('mb'): + return round(float(size[:-2]) * 1024 * 1024) + elif size.endswith('gb'): + return round(float(size[:-2]) * 1024 * 1024 * 1024) + elif size.endswith('b'): + return round(float(size[:-1])) + return -1 + + +def convert_index_health(health): + if health == 'green': + return 0 + elif health == 'yellow': + return 1 + elif health == 'read': + return 2 + return -1 + + +def get_survive_any(method): + def w(*args): + try: + method(*args) + except Exception as error: + self, queue, url = args[0], args[1], args[2] + self.error("error during '{0}' : {1}".format(url, error)) + queue.put(dict()) + + return w + + class Service(UrlService): def __init__(self, configuration=None, name=None): UrlService.__init__(self, configuration=configuration, name=name) @@ -501,34 +554,41 @@ class Service(UrlService): ) self.latency = dict() self.methods = list() + self.collected_indices = set() def check(self): - if not all([self.host, - self.port, - isinstance(self.host, str), - isinstance(self.port, (str, int))]): + if not self.host: self.error('Host is not defined in the module configuration file') return False - # Hostname -> ip address try: self.host = gethostbyname(self.host) except gaierror as error: - self.error(str(error)) + self.error(repr(error)) return False - # Create URL for every Elasticsearch API - self.methods = [METHODS(get_data=self._get_node_stats, - url=self.url + '/_nodes/_local/stats', - run=self.configuration.get('node_stats', True)), - METHODS(get_data=self._get_cluster_health, - url=self.url + '/_cluster/health', - run=self.configuration.get('cluster_health', True)), - METHODS(get_data=self._get_cluster_stats, - url=self.url + '/_cluster/stats', - run=self.configuration.get('cluster_stats', True))] - - # Remove disabled API calls from 'avail methods' + self.methods = [ + METHODS( + get_data=self._get_node_stats, + url=self.url + '/_nodes/_local/stats', + run=self.configuration.get('node_stats', True), + ), + METHODS( + get_data=self._get_cluster_health, + url=self.url + '/_cluster/health', + run=self.configuration.get('cluster_health', True) + ), + METHODS( + get_data=self._get_cluster_stats, + url=self.url + '/_cluster/stats', + run=self.configuration.get('cluster_stats', True), + ), + METHODS( + get_data=self._get_indices, + url=self.url + '/_cat/indices?format=json', + run=self.configuration.get('indices_stats', False), + ), + ] return UrlService.check(self) def _get_data(self): @@ -539,8 +599,11 @@ class Service(UrlService): for method in self.methods: if not method.run: continue - th = threading.Thread(target=method.get_data, - args=(queue, method.url)) + th = threading.Thread( + target=method.get_data, + args=(queue, method.url), + ) + th.daemon = True th.start() threads.append(th) @@ -550,88 +613,128 @@ class Service(UrlService): return result or None - def _get_cluster_health(self, queue, url): - """ - Format data received from http request - :return: dict - """ - + def add_index_to_charts(self, idx_name): + for name in ('index_docs_count', 'index_store_size', 'index_replica', 'index_health'): + chart = self.charts[name] + dim = ['{0}_{1}'.format(idx_name, name), idx_name] + chart.add_dimension(dim) + + @get_survive_any + def _get_indices(self, queue, url): + # [ + # { + # "pri.store.size": "650b", + # "health": "yellow", + # "status": "open", + # "index": "twitter", + # "pri": "5", + # "rep": "1", + # "docs.count": "10", + # "docs.deleted": "3", + # "store.size": "650b" + # } + # ] raw_data = self._get_raw_data(url) - if not raw_data: return queue.put(dict()) - data = self.json_reply(raw_data) - - if not data: + indices = self.json_parse(raw_data) + if not indices: return queue.put(dict()) - to_netdata = fetch_data_(raw_data=data, - metrics=HEALTH_STATS) + charts_initialized = len(self.charts) != 0 + data = dict() + for idx in indices: + try: + name = idx['index'] + is_system_index = name.startswith('.') + if is_system_index: + continue + + v = { + '{0}_index_docs_count'.format(name): idx['docs.count'], + '{0}_index_replica'.format(name): idx['rep'], + '{0}_index_health'.format(name): convert_index_health(idx['health']), + } + size = convert_index_store_size_to_bytes(idx['store.size']) + if size != -1: + v['{0}_index_store_size'.format(name)] = size + except KeyError as error: + self.debug("error on parsing index : {0}".format(repr(error))) + continue - to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0, - 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0}) - current_status = 'status_' + data['status'] - to_netdata[current_status] = 1 + data.update(v) + if name not in self.collected_indices and charts_initialized: + self.collected_indices.add(name) + self.add_index_to_charts(name) - return queue.put(to_netdata) + return queue.put(data) - def _get_cluster_stats(self, queue, url): - """ - Format data received from http request - :return: dict - """ - - raw_data = self._get_raw_data(url) + @get_survive_any + def _get_cluster_health(self, queue, url): + raw = self._get_raw_data(url) + if not raw: + return queue.put(dict()) - if not raw_data: + parsed = self.json_parse(raw) + if not parsed: return queue.put(dict()) - data = self.json_reply(raw_data) + data = fetch_data(raw_data=parsed, metrics=HEALTH_STATS) + dummy = { + 'status_green': 0, + 'status_red': 0, + 'status_yellow': 0, + } + data.update(dummy) + current_status = 'status_' + parsed['status'] + data[current_status] = 1 - if not data: - return queue.put(dict()) + return queue.put(data) - to_netdata = fetch_data_(raw_data=data, - metrics=CLUSTER_STATS) + @get_survive_any + def _get_cluster_stats(self, queue, url): + raw = self._get_raw_data(url) + if not raw: + return queue.put(dict()) - return queue.put(to_netdata) + parsed = self.json_parse(raw) + if not parsed: + return queue.put(dict()) - def _get_node_stats(self, queue, url): - """ - Format data received from http request - :return: dict - """ + data = fetch_data(raw_data=parsed, metrics=CLUSTER_STATS) - raw_data = self._get_raw_data(url) + return queue.put(data) - if not raw_data: + @get_survive_any + def _get_node_stats(self, queue, url): + raw = self._get_raw_data(url) + if not raw: return queue.put(dict()) - data = self.json_reply(raw_data) - - if not data: + parsed = self.json_parse(raw) + if not parsed: return queue.put(dict()) - node = list(data['nodes'].keys())[0] - to_netdata = fetch_data_(raw_data=data['nodes'][node], - metrics=NODE_STATS) + node = list(parsed['nodes'].keys())[0] + data = fetch_data(raw_data=parsed['nodes'][node], metrics=NODE_STATS) # Search, index, flush, fetch performance latency for key in LATENCY: try: - to_netdata[key] = self.find_avg(total=to_netdata[LATENCY[key]['total']], - spent_time=to_netdata[LATENCY[key]['spent_time']], - key=key) + data[key] = self.find_avg( + total=data[LATENCY[key]['total']], + spent_time=data[LATENCY[key]['spent_time']], + key=key) except KeyError: continue - if 'process_open_file_descriptors' in to_netdata and 'process_max_file_descriptors' in to_netdata: - to_netdata['file_descriptors_used'] = round(float(to_netdata['process_open_file_descriptors']) - / to_netdata['process_max_file_descriptors'] * 1000) + if 'process_open_file_descriptors' in data and 'process_max_file_descriptors' in data: + v = float(data['process_open_file_descriptors']) / data['process_max_file_descriptors'] * 1000 + data['file_descriptors_used'] = round(v) - return queue.put(to_netdata) + return queue.put(data) - def json_reply(self, reply): + def json_parse(self, reply): try: return json.loads(reply) except ValueError as err: @@ -640,20 +743,22 @@ class Service(UrlService): def find_avg(self, total, spent_time, key): if key not in self.latency: - self.latency[key] = dict(total=total, - spent_time=spent_time) + self.latency[key] = dict(total=total, spent_time=spent_time) return 0 + if self.latency[key]['total'] != total: - latency = float(spent_time - self.latency[key]['spent_time'])\ - / float(total - self.latency[key]['total']) * 1000 + spent_diff = spent_time - self.latency[key]['spent_time'] + total_diff = total - self.latency[key]['total'] + latency = float(spent_diff) / float(total_diff) * 1000 self.latency[key]['total'] = total self.latency[key]['spent_time'] = spent_time return latency + self.latency[key]['spent_time'] = spent_time return 0 -def fetch_data_(raw_data, metrics): +def fetch_data(raw_data, metrics): data = dict() for metric in metrics: value = raw_data @@ -661,7 +766,7 @@ def fetch_data_(raw_data, metrics): try: for m in metrics_list: value = value[m] - except KeyError: + except (KeyError, TypeError): continue data['_'.join(metrics_list)] = value return data |