summaryrefslogtreecommitdiffstats
path: root/python.d/elasticsearch.chart.py
diff options
context:
space:
mode:
authorFederico Ceratto <federico.ceratto@gmail.com>2017-04-30 16:09:37 +0000
committerFederico Ceratto <federico.ceratto@gmail.com>2017-04-30 16:09:37 +0000
commit51f689a8e17ff3929acd2dbf39e936d2cd3ac723 (patch)
tree92e54f543171b69dcbc639be09d11221cf96ba28 /python.d/elasticsearch.chart.py
parentNew upstream version 1.5.0+dfsg (diff)
downloadnetdata-51f689a8e17ff3929acd2dbf39e936d2cd3ac723.tar.xz
netdata-51f689a8e17ff3929acd2dbf39e936d2cd3ac723.zip
New upstream version 1.6.0+dfsgupstream/1.6.0+dfsg
Diffstat (limited to 'python.d/elasticsearch.chart.py')
-rw-r--r--python.d/elasticsearch.chart.py401
1 files changed, 235 insertions, 166 deletions
diff --git a/python.d/elasticsearch.chart.py b/python.d/elasticsearch.chart.py
index ff841f17..430227f6 100644
--- a/python.d/elasticsearch.chart.py
+++ b/python.d/elasticsearch.chart.py
@@ -3,13 +3,14 @@
# Author: l2isbad
from base import UrlService
-from requests import get
-from socket import gethostbyname
+from socket import gethostbyname, gaierror
try:
from queue import Queue
except ImportError:
from Queue import Queue
from threading import Thread
+from collections import namedtuple
+from json import loads
# default module values (can be overridden per job in `config`)
# update_every = 2
@@ -17,45 +18,125 @@ update_every = 5
priority = 60000
retries = 60
+METHODS = namedtuple('METHODS', ['get_data_function', 'url'])
+
+NODE_STATS = [
+ ('indices.search.fetch_current', None, None),
+ ('indices.search.fetch_total', None, None),
+ ('indices.search.query_current', None, None),
+ ('indices.search.query_total', None, None),
+ ('indices.search.query_time_in_millis', None, None),
+ ('indices.search.fetch_time_in_millis', None, None),
+ ('indices.indexing.index_total', 'indexing_index_total', None),
+ ('indices.indexing.index_current', 'indexing_index_current', None),
+ ('indices.indexing.index_time_in_millis', 'indexing_index_time_in_millis', None),
+ ('indices.refresh.total', 'refresh_total', None),
+ ('indices.refresh.total_time_in_millis', 'refresh_total_time_in_millis', None),
+ ('indices.flush.total', 'flush_total', None),
+ ('indices.flush.total_time_in_millis', 'flush_total_time_in_millis', None),
+ ('jvm.gc.collectors.young.collection_count', 'young_collection_count', None),
+ ('jvm.gc.collectors.old.collection_count', 'old_collection_count', None),
+ ('jvm.gc.collectors.young.collection_time_in_millis', 'young_collection_time_in_millis', None),
+ ('jvm.gc.collectors.old.collection_time_in_millis', 'old_collection_time_in_millis', None),
+ ('jvm.mem.heap_used_percent', 'jvm_heap_percent', None),
+ ('jvm.mem.heap_committed_in_bytes', 'jvm_heap_commit', None),
+ ('thread_pool.bulk.queue', 'bulk_queue', None),
+ ('thread_pool.bulk.rejected', 'bulk_rejected', None),
+ ('thread_pool.index.queue', 'index_queue', None),
+ ('thread_pool.index.rejected', 'index_rejected', None),
+ ('thread_pool.search.queue', 'search_queue', None),
+ ('thread_pool.search.rejected', 'search_rejected', None),
+ ('thread_pool.merge.queue', 'merge_queue', None),
+ ('thread_pool.merge.rejected', 'merge_rejected', None),
+ ('indices.fielddata.memory_size_in_bytes', 'index_fdata_memory', None),
+ ('indices.fielddata.evictions', None, None),
+ ('breakers.fielddata.tripped', None, None),
+ ('http.current_open', 'http_current_open', None),
+ ('transport.rx_size_in_bytes', 'transport_rx_size_in_bytes', None),
+ ('transport.tx_size_in_bytes', 'transport_tx_size_in_bytes', None),
+ ('process.max_file_descriptors', None, None),
+ ('process.open_file_descriptors', None, None)
+]
+
+CLUSTER_STATS = [
+ ('nodes.count.data_only', 'count_data_only', None),
+ ('nodes.count.master_data', 'count_master_data', None),
+ ('nodes.count.total', 'count_total', None),
+ ('nodes.count.master_only', 'count_master_only', None),
+ ('nodes.count.client', 'count_client', None),
+ ('indices.docs.count', 'docs_count', None),
+ ('indices.query_cache.hit_count', 'query_cache_hit_count', None),
+ ('indices.query_cache.miss_count', 'query_cache_miss_count', None),
+ ('indices.store.size_in_bytes', 'store_size_in_bytes', None),
+ ('indices.count', 'indices_count', None),
+ ('indices.shards.total', 'shards_total', None)
+]
+
+HEALTH_STATS = [
+ ('number_of_nodes', 'health_number_of_nodes', None),
+ ('number_of_data_nodes', 'health_number_of_data_nodes', None),
+ ('number_of_pending_tasks', 'health_number_of_pending_tasks', None),
+ ('number_of_in_flight_fetch', 'health_number_of_in_flight_fetch', None),
+ ('active_shards', 'health_active_shards', None),
+ ('relocating_shards', 'health_relocating_shards', None),
+ ('unassigned_shards', 'health_unassigned_shards', None),
+ ('delayed_unassigned_shards', 'health_delayed_unassigned_shards', None),
+ ('initializing_shards', 'health_initializing_shards', None),
+ ('active_shards_percent_as_number', 'health_active_shards_percent_as_number', None)
+]
+
# charts order (can be overridden if you want less charts, or different order)
-ORDER = ['search_perf_total', 'search_perf_time', 'search_latency', 'index_perf_total', 'index_perf_time',
- 'index_latency', 'jvm_mem_heap', 'jvm_gc_count', 'jvm_gc_time', 'host_metrics_file_descriptors',
- 'host_metrics_http', 'host_metrics_transport', 'thread_pool_qr', 'fdata_cache', 'fdata_ev_tr',
- 'cluster_health_status', 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes',
- 'cluster_stats_query_cache', 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
+ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
+ 'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
+ 'jvm_gc_time', 'host_metrics_file_descriptors', 'host_metrics_http', 'host_metrics_transport',
+ 'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache', 'fdata_ev_tr', 'cluster_health_status',
+ 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes', 'cluster_stats_query_cache',
+ 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
CHARTS = {
'search_perf_total': {
- 'options': [None, 'Number of queries, fetches', 'queries', 'Search performance', 'es.search_query', 'stacked'],
+ 'options': [None, 'Total number of queries, fetches', 'number of', 'search performance',
+ 'es.search_query_total', 'stacked'],
+ 'lines': [
+ ['query_total', 'queries', 'incremental'],
+ ['fetch_total', 'fetches', 'incremental']
+ ]},
+ 'search_perf_current': {
+ 'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance',
+ 'es.search_query_current', 'stacked'],
'lines': [
- ['query_total', 'search_total', 'incremental'],
- ['fetch_total', 'fetch_total', 'incremental'],
- ['query_current', 'search_current', 'absolute'],
- ['fetch_current', 'fetch_current', 'absolute']
+ ['query_current', 'queries', 'absolute'],
+ ['fetch_current', 'fetches', 'absolute']
]},
'search_perf_time': {
- 'options': [None, 'Time spent on queries, fetches', 'seconds', 'Search performance', 'es.search_time', 'stacked'],
+ 'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance',
+ 'es.search_time', 'stacked'],
'lines': [
['query_time_in_millis', 'query', 'incremental', 1, 1000],
['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
]},
'search_latency': {
- 'options': [None, 'Query and fetch latency', 'ms', 'Search performance', 'es.search_latency', 'stacked'],
+ 'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
'lines': [
['query_latency', 'query', 'absolute', 1, 1000],
['fetch_latency', 'fetch', 'absolute', 1, 1000]
]},
'index_perf_total': {
- 'options': [None, 'Number of documents indexed, index refreshes, flushes', 'documents/indexes',
- 'Indexing performance', 'es.index_doc', 'stacked'],
+ 'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
+ 'indexing performance', 'es.index_performance_total', 'stacked'],
'lines': [
['indexing_index_total', 'indexed', 'incremental'],
['refresh_total', 'refreshes', 'incremental'],
- ['flush_total', 'flushes', 'incremental'],
- ['indexing_index_current', 'indexed_current', 'absolute'],
+ ['flush_total', 'flushes', 'incremental']
+ ]},
+ 'index_perf_current': {
+ 'options': [None, 'Number of documents currently being indexed', 'currently indexed',
+ 'indexing performance', 'es.index_performance_current', 'stacked'],
+ 'lines': [
+ ['indexing_index_current', 'documents', 'absolute']
]},
'index_perf_time': {
- 'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'Indexing performance',
+ 'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
'es.search_time', 'stacked'],
'lines': [
['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
@@ -63,67 +144,72 @@ CHARTS = {
['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
]},
'index_latency': {
- 'options': [None, 'Indexing and flushing latency', 'ms', 'Indexing performance',
+ 'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
'es.index_latency', 'stacked'],
'lines': [
['indexing_latency', 'indexing', 'absolute', 1, 1000],
['flushing_latency', 'flushing', 'absolute', 1, 1000]
]},
'jvm_mem_heap': {
- 'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'Memory usage and gc',
+ 'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
'es.jvm_heap', 'area'],
'lines': [
['jvm_heap_percent', 'inuse', 'absolute'],
['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
]},
'jvm_gc_count': {
- 'options': [None, 'Count of garbage collections', 'counts', 'Memory usage and gc', 'es.gc_count', 'stacked'],
+ 'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
'lines': [
['young_collection_count', 'young', 'incremental'],
['old_collection_count', 'old', 'incremental']
]},
'jvm_gc_time': {
- 'options': [None, 'Time spent on garbage collections', 'ms', 'Memory usage and gc', 'es.gc_time', 'stacked'],
+ 'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
'lines': [
['young_collection_time_in_millis', 'young', 'incremental'],
['old_collection_time_in_millis', 'old', 'incremental']
]},
- 'thread_pool_qr': {
- 'options': [None, 'Number of queued/rejected threads in thread pool', 'threads', 'Queues and rejections',
- 'es.qr', 'stacked'],
+ 'thread_pool_qr_q': {
+ 'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
+ 'es.thread_pool_queued', 'stacked'],
'lines': [
- ['bulk_queue', 'bulk_queue', 'absolute'],
- ['index_queue', 'index_queue', 'absolute'],
- ['search_queue', 'search_queue', 'absolute'],
- ['merge_queue', 'merge_queue', 'absolute'],
- ['bulk_rejected', 'bulk_rej', 'absolute'],
- ['index_rejected', 'index_rej', 'absolute'],
- ['search_rejected', 'search_rej', 'absolute'],
- ['merge_rejected', 'merge_rej', 'absolute']
+ ['bulk_queue', 'bulk', 'absolute'],
+ ['index_queue', 'index', 'absolute'],
+ ['search_queue', 'search', 'absolute'],
+ ['merge_queue', 'merge', 'absolute']
+ ]},
+ 'thread_pool_qr_r': {
+ 'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
+ 'es.thread_pool_rejected', 'stacked'],
+ 'lines': [
+ ['bulk_rejected', 'bulk', 'absolute'],
+ ['index_rejected', 'index', 'absolute'],
+ ['search_rejected', 'search', 'absolute'],
+ ['merge_rejected', 'merge', 'absolute']
]},
'fdata_cache': {
- 'options': [None, 'Fielddata cache size', 'MB', 'Fielddata cache', 'es.fdata_cache', 'line'],
+ 'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
'lines': [
- ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
+ ['index_fdata_memory', 'cache', 'absolute', 1, 1048576]
]},
'fdata_ev_tr': {
'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
- 'Fielddata cache', 'es.fdata_ev_tr', 'line'],
+ 'fielddata cache', 'es.evictions_tripped', 'line'],
'lines': [
- ['index_fdata_evic', 'evictions', 'incremental'],
- ['breakers_fdata_trip', 'tripped', 'incremental']
+ ['evictions', None, 'incremental'],
+ ['tripped', None, 'incremental']
]},
'cluster_health_nodes': {
- 'options': [None, 'Nodes and tasks statistics', 'units', 'Cluster health API',
- 'es.cluster_health', 'stacked'],
+ 'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
+ 'es.cluster_health_nodes', 'stacked'],
'lines': [
['health_number_of_nodes', 'nodes', 'absolute'],
['health_number_of_data_nodes', 'data_nodes', 'absolute'],
['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
- ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
+ ['health_number_of_in_flight_fetch', 'in_flight_fetch', 'absolute']
]},
'cluster_health_status': {
- 'options': [None, 'Cluster status', 'status', 'Cluster health API',
+ 'options': [None, 'Cluster status', 'status', 'cluster health API',
'es.cluster_health_status', 'area'],
'lines': [
['status_green', 'green', 'absolute'],
@@ -134,8 +220,8 @@ CHARTS = {
['status_yellow', 'yellow', 'absolute']
]},
'cluster_health_shards': {
- 'options': [None, 'Shards statistics', 'shards', 'Cluster health API',
- 'es.cluster_health_sharts', 'stacked'],
+ 'options': [None, 'Shards statistics', 'shards', 'cluster health API',
+ 'es.cluster_health_shards', 'stacked'],
'lines': [
['health_active_shards', 'active_shards', 'absolute'],
['health_relocating_shards', 'relocating_shards', 'absolute'],
@@ -145,8 +231,8 @@ CHARTS = {
['health_active_shards_percent_as_number', 'active_percent', 'absolute']
]},
'cluster_stats_nodes': {
- 'options': [None, 'Nodes statistics', 'nodes', 'Cluster stats API',
- 'es.cluster_stats_nodes', 'stacked'],
+ 'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
+ 'es.cluster_nodes', 'stacked'],
'lines': [
['count_data_only', 'data_only', 'absolute'],
['count_master_data', 'master_data', 'absolute'],
@@ -155,47 +241,47 @@ CHARTS = {
['count_client', 'client', 'absolute']
]},
'cluster_stats_query_cache': {
- 'options': [None, 'Query cache statistics', 'queries', 'Cluster stats API',
- 'es.cluster_stats_query_cache', 'stacked'],
+ 'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
+ 'es.cluster_query_cache', 'stacked'],
'lines': [
['query_cache_hit_count', 'hit', 'incremental'],
['query_cache_miss_count', 'miss', 'incremental']
]},
'cluster_stats_docs': {
- 'options': [None, 'Docs statistics', 'count', 'Cluster stats API',
- 'es.cluster_stats_docs', 'line'],
+ 'options': [None, 'Docs statistics', 'count', 'cluster stats API',
+ 'es.cluster_docs', 'line'],
'lines': [
['docs_count', 'docs', 'absolute']
]},
'cluster_stats_store': {
- 'options': [None, 'Store statistics', 'MB', 'Cluster stats API',
- 'es.cluster_stats_store', 'line'],
+ 'options': [None, 'Store statistics', 'MB', 'cluster stats API',
+ 'es.cluster_store', 'line'],
'lines': [
['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
]},
'cluster_stats_indices_shards': {
- 'options': [None, 'Indices and shards statistics', 'count', 'Cluster stats API',
- 'es.cluster_stats_ind_sha', 'stacked'],
+ 'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
+ 'es.cluster_indices_shards', 'stacked'],
'lines': [
['indices_count', 'indices', 'absolute'],
['shards_total', 'shards', 'absolute']
]},
'host_metrics_transport': {
- 'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'Host metrics',
- 'es.host_metrics_transport', 'area'],
+ 'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
+ 'es.host_transport', 'area'],
'lines': [
['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
]},
'host_metrics_file_descriptors': {
- 'options': [None, 'Available file descriptors in percent', 'percent', 'Host metrics',
- 'es.host_metrics_descriptors', 'area'],
+ 'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
+ 'es.host_descriptors', 'area'],
'lines': [
['file_descriptors_used', 'used', 'absolute', 1, 10]
]},
'host_metrics_http': {
- 'options': [None, 'Opened HTTP connections', 'connections', 'Host metrics',
- 'es.host_metrics_http', 'line'],
+ 'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
+ 'es.host_http_connections', 'line'],
'lines': [
['http_current_open', 'opened', 'absolute', 1, 1]
]}
@@ -208,78 +294,72 @@ class Service(UrlService):
self.order = ORDER
self.definitions = CHARTS
self.host = self.configuration.get('host')
- self.port = self.configuration.get('port')
- self.user = self.configuration.get('user')
- self.password = self.configuration.get('pass')
+ self.port = self.configuration.get('port', 9200)
+ self.scheme = self.configuration.get('scheme', 'http')
self.latency = dict()
+ self.methods = list()
def check(self):
# We can't start if <host> AND <port> not specified
- if not all([self.host, self.port]):
+ if not all([self.host, self.port, isinstance(self.host, str), isinstance(self.port, (str, int))]):
+ self.error('Host is not defined in the module configuration file')
return False
# It as a bad idea to use hostname.
- # Hostname -> ipaddress
+ # Hostname -> ip address
try:
self.host = gethostbyname(self.host)
- except Exception as e:
- self.error(str(e))
+ except gaierror as error:
+ self.error(str(error))
return False
- # HTTP Auth? NOT TESTED
- self.auth = self.user and self.password
-
+ scheme = 'http' if self.scheme else 'https'
+ # Add handlers (auth, self signed cert accept)
+ self.url = '%s://%s:%s' % (scheme, self.host, self.port)
+ self._UrlService__add_openers()
# Create URL for every Elasticsearch API
- url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
- url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
- url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
+ url_node_stats = '%s://%s:%s/_nodes/_local/stats' % (scheme, self.host, self.port)
+ url_cluster_health = '%s://%s:%s/_cluster/health' % (scheme, self.host, self.port)
+ url_cluster_stats = '%s://%s:%s/_cluster/stats' % (scheme, self.host, self.port)
# Create list of enabled API calls
user_choice = [bool(self.configuration.get('node_stats', True)),
bool(self.configuration.get('cluster_health', True)),
bool(self.configuration.get('cluster_stats', True))]
-
- avail_methods = [(self._get_node_stats, url_node_stats),
- (self._get_cluster_health, url_cluster_health),
- (self._get_cluster_stats, url_cluster_stats)]
+
+ avail_methods = [METHODS(get_data_function=self._get_node_stats_, url=url_node_stats),
+ METHODS(get_data_function=self._get_cluster_health_, url=url_cluster_health),
+ METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
# Remove disabled API calls from 'avail methods'
- self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
+ self.methods = [avail_methods[e[0]] for e in enumerate(avail_methods) if user_choice[e[0]]]
# Run _get_data for ALL active API calls.
- api_result = {}
+ api_check_result = dict()
+ data_from_check = dict()
for method in self.methods:
- api_result[method[1]] = (bool(self._get_raw_data(method[1])))
+ try:
+ api_check_result[method.url] = method.get_data_function(None, method.url)
+ data_from_check.update(api_check_result[method.url] or dict())
+ except KeyError as error:
+ self.error('Failed to parse %s. Error: %s' % (method.url, str(error)))
+ return False
# We can start ONLY if all active API calls returned NOT None
- if not all(api_result.values()):
+ if not all(api_check_result.values()):
self.error('Plugin could not get data from all APIs')
- self.error('%s' % api_result)
return False
else:
- self.info('%s' % api_result)
- self.info('Plugin was started successfully')
-
+ self._data_from_check = data_from_check
return True
- def _get_raw_data(self, url):
- try:
- if not self.auth:
- raw_data = get(url)
- else:
- raw_data = get(url, auth=(self.user, self.password))
- except Exception:
- return None
-
- return raw_data
-
def _get_data(self):
threads = list()
queue = Queue()
result = dict()
for method in self.methods:
- th = Thread(target=method[0], args=(queue, method[1]))
+ th = Thread(target=method.get_data_function, args=(queue, method.url))
th.start()
threads.append(th)
@@ -289,102 +369,79 @@ class Service(UrlService):
return result or None
- def _get_cluster_health(self, queue, url):
+ def _get_cluster_health_(self, queue, url):
"""
Format data received from http request
:return: dict
"""
- data = self._get_raw_data(url)
+ raw_data = self._get_raw_data(url)
- if not data:
- queue.put({})
+ if not raw_data:
+ return queue.put(dict()) if queue else None
else:
- data = data.json()
+ data = loads(raw_data)
+
+ to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
- to_netdata = dict()
- to_netdata.update(update_key('health', data))
to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
- to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
+ current_status = 'status_' + data['status']
+ to_netdata[current_status] = 1
- queue.put(to_netdata)
+ return queue.put(to_netdata) if queue else to_netdata
- def _get_cluster_stats(self, queue, url):
+ def _get_cluster_stats_(self, queue, url):
"""
Format data received from http request
:return: dict
"""
- data = self._get_raw_data(url)
+ raw_data = self._get_raw_data(url)
- if not data:
- queue.put({})
+ if not raw_data:
+ return queue.put(dict()) if queue else None
else:
- data = data.json()
+ data = loads(raw_data)
- to_netdata = dict()
- to_netdata.update(update_key('count', data['nodes']['count']))
- to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
- to_netdata.update(update_key('docs', data['indices']['docs']))
- to_netdata.update(update_key('store', data['indices']['store']))
- to_netdata['indices_count'] = data['indices']['count']
- to_netdata['shards_total'] = data['indices']['shards']['total']
+ to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
- queue.put(to_netdata)
+ return queue.put(to_netdata) if queue else to_netdata
- def _get_node_stats(self, queue, url):
+ def _get_node_stats_(self, queue, url):
"""
Format data received from http request
:return: dict
"""
- data = self._get_raw_data(url)
+ raw_data = self._get_raw_data(url)
- if not data:
- queue.put({})
+ if not raw_data:
+ return queue.put(dict()) if queue else None
else:
- data = data.json()
+ data = loads(raw_data)
+
node = list(data['nodes'].keys())[0]
- to_netdata = dict()
- # Search performance metrics
- to_netdata.update(data['nodes'][node]['indices']['search'])
- to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
- to_netdata['query_time_in_millis'], 'query_latency')
- to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
- to_netdata['fetch_time_in_millis'], 'fetch_latency')
-
- # Indexing performance metrics
- for key in ['indexing', 'refresh', 'flush']:
- to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
- to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
- to_netdata['indexing_index_time_in_millis'], 'index_latency')
- to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
- to_netdata['flush_total_time_in_millis'], 'flush_latency')
- # Memory usage and garbage collection
- to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
- to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
- to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
- to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
-
- # Thread pool queues and rejections
- for key in ['bulk', 'index', 'search', 'merge']:
- to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
-
- # Fielddata cache
- to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
- to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
- to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
-
- # Host metrics
- to_netdata.update(update_key('http', data['nodes'][node]['http']))
- to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
- to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
- / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
-
- queue.put(to_netdata)
-
- def find_avg(self, value1, value2, key):
+ to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
+
+ # Search performance latency
+ to_netdata['query_latency'] = self.find_avg_(to_netdata['query_total'],
+ to_netdata['query_time_in_millis'], 'query_latency')
+ to_netdata['fetch_latency'] = self.find_avg_(to_netdata['fetch_total'],
+ to_netdata['fetch_time_in_millis'], 'fetch_latency')
+
+ # Indexing performance latency
+ to_netdata['indexing_latency'] = self.find_avg_(to_netdata['indexing_index_total'],
+ to_netdata['indexing_index_time_in_millis'], 'index_latency')
+ to_netdata['flushing_latency'] = self.find_avg_(to_netdata['flush_total'],
+ to_netdata['flush_total_time_in_millis'], 'flush_latency')
+
+ to_netdata['file_descriptors_used'] = round(float(to_netdata['open_file_descriptors'])
+ / to_netdata['max_file_descriptors'] * 1000)
+
+ return queue.put(to_netdata) if queue else to_netdata
+
+ def find_avg_(self, value1, value2, key):
if key not in self.latency:
self.latency.update({key: [value1, value2]})
return 0
@@ -398,5 +455,17 @@ class Service(UrlService):
return 0
-def update_key(string, dictionary):
- return {'_'.join([string, k]): v for k, v in dictionary.items()}
+def fetch_data_(raw_data, metrics_list):
+ to_netdata = dict()
+ for metric, new_name, function in metrics_list:
+ value = raw_data
+ for key in metric.split('.'):
+ try:
+ value = value[key]
+ except KeyError:
+ break
+ if not isinstance(value, dict) and key:
+ to_netdata[new_name or key] = value if not function else function(value)
+
+ return to_netdata
+