summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-10-13 08:36:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-10-13 08:36:33 +0000
commita30a849b78fa4fe8552141b7b2802d1af1b18c09 (patch)
treefab3c8bf29bf2d565595d4fa6a9413916ff02fee /collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py
parentAdding upstream version 1.17.1. (diff)
downloadnetdata-a30a849b78fa4fe8552141b7b2802d1af1b18c09.tar.xz
netdata-a30a849b78fa4fe8552141b7b2802d1af1b18c09.zip
Adding upstream version 1.18.0.upstream/1.18.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py')
-rw-r--r--collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py271
1 files changed, 188 insertions, 83 deletions
diff --git a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py
index 20109c64f..8aaa08583 100644
--- a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py
+++ b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py
@@ -168,6 +168,10 @@ ORDER = [
'cluster_stats_store',
'cluster_stats_indices',
'cluster_stats_shards_total',
+ 'index_docs_count',
+ 'index_store_size',
+ 'index_replica',
+ 'index_health',
]
CHARTS = {
@@ -196,7 +200,8 @@ CHARTS = {
]
},
'search_latency': {
- 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency', 'stacked'],
+ 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency',
+ 'stacked'],
'lines': [
['query_latency', 'query', 'absolute', 1, 1000],
['fetch_latency', 'fetch', 'absolute', 1, 1000]
@@ -397,9 +402,6 @@ CHARTS = {
'lines': [
['status_green', 'green', 'absolute'],
['status_red', 'red', 'absolute'],
- ['status_foo1', None, 'absolute'],
- ['status_foo2', None, 'absolute'],
- ['status_foo3', None, 'absolute'],
['status_yellow', 'yellow', 'absolute']
]
},
@@ -483,10 +485,61 @@ CHARTS = {
'lines': [
['http_current_open', 'opened', 'absolute', 1, 1]
]
- }
+ },
+ 'index_docs_count': {
+ 'options': [None, 'Docs Count', 'count', 'indices', 'elastic.index_docs', 'line'],
+ 'lines': []
+ },
+ 'index_store_size': {
+ 'options': [None, 'Store Size', 'bytes', 'indices', 'elastic.index_store_size', 'line'],
+ 'lines': []
+ },
+ 'index_replica': {
+ 'options': [None, 'Replica', 'count', 'indices', 'elastic.index_replica', 'line'],
+ 'lines': []
+ },
+ 'index_health': {
+ 'options': [None, 'Health', 'status', 'indices', 'elastic.index_health', 'line'],
+ 'lines': []
+ },
}
+def convert_index_store_size_to_bytes(size):
+ # can be b, kb, mb, gb
+ if size.endswith('kb'):
+ return round(float(size[:-2]) * 1024)
+ elif size.endswith('mb'):
+ return round(float(size[:-2]) * 1024 * 1024)
+ elif size.endswith('gb'):
+ return round(float(size[:-2]) * 1024 * 1024 * 1024)
+ elif size.endswith('b'):
+ return round(float(size[:-1]))
+ return -1
+
+
+def convert_index_health(health):
+ if health == 'green':
+ return 0
+ elif health == 'yellow':
+ return 1
+ elif health == 'read':
+ return 2
+ return -1
+
+
+def get_survive_any(method):
+ def w(*args):
+ try:
+ method(*args)
+ except Exception as error:
+ self, queue, url = args[0], args[1], args[2]
+ self.error("error during '{0}' : {1}".format(url, error))
+ queue.put(dict())
+
+ return w
+
+
class Service(UrlService):
def __init__(self, configuration=None, name=None):
UrlService.__init__(self, configuration=configuration, name=name)
@@ -501,34 +554,41 @@ class Service(UrlService):
)
self.latency = dict()
self.methods = list()
+ self.collected_indices = set()
def check(self):
- if not all([self.host,
- self.port,
- isinstance(self.host, str),
- isinstance(self.port, (str, int))]):
+ if not self.host:
self.error('Host is not defined in the module configuration file')
return False
- # Hostname -> ip address
try:
self.host = gethostbyname(self.host)
except gaierror as error:
- self.error(str(error))
+ self.error(repr(error))
return False
- # Create URL for every Elasticsearch API
- self.methods = [METHODS(get_data=self._get_node_stats,
- url=self.url + '/_nodes/_local/stats',
- run=self.configuration.get('node_stats', True)),
- METHODS(get_data=self._get_cluster_health,
- url=self.url + '/_cluster/health',
- run=self.configuration.get('cluster_health', True)),
- METHODS(get_data=self._get_cluster_stats,
- url=self.url + '/_cluster/stats',
- run=self.configuration.get('cluster_stats', True))]
-
- # Remove disabled API calls from 'avail methods'
+ self.methods = [
+ METHODS(
+ get_data=self._get_node_stats,
+ url=self.url + '/_nodes/_local/stats',
+ run=self.configuration.get('node_stats', True),
+ ),
+ METHODS(
+ get_data=self._get_cluster_health,
+ url=self.url + '/_cluster/health',
+ run=self.configuration.get('cluster_health', True)
+ ),
+ METHODS(
+ get_data=self._get_cluster_stats,
+ url=self.url + '/_cluster/stats',
+ run=self.configuration.get('cluster_stats', True),
+ ),
+ METHODS(
+ get_data=self._get_indices,
+ url=self.url + '/_cat/indices?format=json',
+ run=self.configuration.get('indices_stats', False),
+ ),
+ ]
return UrlService.check(self)
def _get_data(self):
@@ -539,8 +599,11 @@ class Service(UrlService):
for method in self.methods:
if not method.run:
continue
- th = threading.Thread(target=method.get_data,
- args=(queue, method.url))
+ th = threading.Thread(
+ target=method.get_data,
+ args=(queue, method.url),
+ )
+ th.daemon = True
th.start()
threads.append(th)
@@ -550,88 +613,128 @@ class Service(UrlService):
return result or None
- def _get_cluster_health(self, queue, url):
- """
- Format data received from http request
- :return: dict
- """
-
+ def add_index_to_charts(self, idx_name):
+ for name in ('index_docs_count', 'index_store_size', 'index_replica', 'index_health'):
+ chart = self.charts[name]
+ dim = ['{0}_{1}'.format(idx_name, name), idx_name]
+ chart.add_dimension(dim)
+
+ @get_survive_any
+ def _get_indices(self, queue, url):
+ # [
+ # {
+ # "pri.store.size": "650b",
+ # "health": "yellow",
+ # "status": "open",
+ # "index": "twitter",
+ # "pri": "5",
+ # "rep": "1",
+ # "docs.count": "10",
+ # "docs.deleted": "3",
+ # "store.size": "650b"
+ # }
+ # ]
raw_data = self._get_raw_data(url)
-
if not raw_data:
return queue.put(dict())
- data = self.json_reply(raw_data)
-
- if not data:
+ indices = self.json_parse(raw_data)
+ if not indices:
return queue.put(dict())
- to_netdata = fetch_data_(raw_data=data,
- metrics=HEALTH_STATS)
+ charts_initialized = len(self.charts) != 0
+ data = dict()
+ for idx in indices:
+ try:
+ name = idx['index']
+ is_system_index = name.startswith('.')
+ if is_system_index:
+ continue
+
+ v = {
+ '{0}_index_docs_count'.format(name): idx['docs.count'],
+ '{0}_index_replica'.format(name): idx['rep'],
+ '{0}_index_health'.format(name): convert_index_health(idx['health']),
+ }
+ size = convert_index_store_size_to_bytes(idx['store.size'])
+ if size != -1:
+ v['{0}_index_store_size'.format(name)] = size
+ except KeyError as error:
+ self.debug("error on parsing index : {0}".format(repr(error)))
+ continue
- to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
- 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
- current_status = 'status_' + data['status']
- to_netdata[current_status] = 1
+ data.update(v)
+ if name not in self.collected_indices and charts_initialized:
+ self.collected_indices.add(name)
+ self.add_index_to_charts(name)
- return queue.put(to_netdata)
+ return queue.put(data)
- def _get_cluster_stats(self, queue, url):
- """
- Format data received from http request
- :return: dict
- """
-
- raw_data = self._get_raw_data(url)
+ @get_survive_any
+ def _get_cluster_health(self, queue, url):
+ raw = self._get_raw_data(url)
+ if not raw:
+ return queue.put(dict())
- if not raw_data:
+ parsed = self.json_parse(raw)
+ if not parsed:
return queue.put(dict())
- data = self.json_reply(raw_data)
+ data = fetch_data(raw_data=parsed, metrics=HEALTH_STATS)
+ dummy = {
+ 'status_green': 0,
+ 'status_red': 0,
+ 'status_yellow': 0,
+ }
+ data.update(dummy)
+ current_status = 'status_' + parsed['status']
+ data[current_status] = 1
- if not data:
- return queue.put(dict())
+ return queue.put(data)
- to_netdata = fetch_data_(raw_data=data,
- metrics=CLUSTER_STATS)
+ @get_survive_any
+ def _get_cluster_stats(self, queue, url):
+ raw = self._get_raw_data(url)
+ if not raw:
+ return queue.put(dict())
- return queue.put(to_netdata)
+ parsed = self.json_parse(raw)
+ if not parsed:
+ return queue.put(dict())
- def _get_node_stats(self, queue, url):
- """
- Format data received from http request
- :return: dict
- """
+ data = fetch_data(raw_data=parsed, metrics=CLUSTER_STATS)
- raw_data = self._get_raw_data(url)
+ return queue.put(data)
- if not raw_data:
+ @get_survive_any
+ def _get_node_stats(self, queue, url):
+ raw = self._get_raw_data(url)
+ if not raw:
return queue.put(dict())
- data = self.json_reply(raw_data)
-
- if not data:
+ parsed = self.json_parse(raw)
+ if not parsed:
return queue.put(dict())
- node = list(data['nodes'].keys())[0]
- to_netdata = fetch_data_(raw_data=data['nodes'][node],
- metrics=NODE_STATS)
+ node = list(parsed['nodes'].keys())[0]
+ data = fetch_data(raw_data=parsed['nodes'][node], metrics=NODE_STATS)
# Search, index, flush, fetch performance latency
for key in LATENCY:
try:
- to_netdata[key] = self.find_avg(total=to_netdata[LATENCY[key]['total']],
- spent_time=to_netdata[LATENCY[key]['spent_time']],
- key=key)
+ data[key] = self.find_avg(
+ total=data[LATENCY[key]['total']],
+ spent_time=data[LATENCY[key]['spent_time']],
+ key=key)
except KeyError:
continue
- if 'process_open_file_descriptors' in to_netdata and 'process_max_file_descriptors' in to_netdata:
- to_netdata['file_descriptors_used'] = round(float(to_netdata['process_open_file_descriptors'])
- / to_netdata['process_max_file_descriptors'] * 1000)
+ if 'process_open_file_descriptors' in data and 'process_max_file_descriptors' in data:
+ v = float(data['process_open_file_descriptors']) / data['process_max_file_descriptors'] * 1000
+ data['file_descriptors_used'] = round(v)
- return queue.put(to_netdata)
+ return queue.put(data)
- def json_reply(self, reply):
+ def json_parse(self, reply):
try:
return json.loads(reply)
except ValueError as err:
@@ -640,20 +743,22 @@ class Service(UrlService):
def find_avg(self, total, spent_time, key):
if key not in self.latency:
- self.latency[key] = dict(total=total,
- spent_time=spent_time)
+ self.latency[key] = dict(total=total, spent_time=spent_time)
return 0
+
if self.latency[key]['total'] != total:
- latency = float(spent_time - self.latency[key]['spent_time'])\
- / float(total - self.latency[key]['total']) * 1000
+ spent_diff = spent_time - self.latency[key]['spent_time']
+ total_diff = total - self.latency[key]['total']
+ latency = float(spent_diff) / float(total_diff) * 1000
self.latency[key]['total'] = total
self.latency[key]['spent_time'] = spent_time
return latency
+
self.latency[key]['spent_time'] = spent_time
return 0
-def fetch_data_(raw_data, metrics):
+def fetch_data(raw_data, metrics):
data = dict()
for metric in metrics:
value = raw_data
@@ -661,7 +766,7 @@ def fetch_data_(raw_data, metrics):
try:
for m in metrics_list:
value = value[m]
- except KeyError:
+ except (KeyError, TypeError):
continue
data['_'.join(metrics_list)] = value
return data