summaryrefslogtreecommitdiffstats
path: root/python.d/elasticsearch.chart.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--python.d/elasticsearch.chart.py198
1 files changed, 101 insertions, 97 deletions
diff --git a/python.d/elasticsearch.chart.py b/python.d/elasticsearch.chart.py
index 430227f6..9ec08719 100644
--- a/python.d/elasticsearch.chart.py
+++ b/python.d/elasticsearch.chart.py
@@ -85,6 +85,21 @@ HEALTH_STATS = [
('active_shards_percent_as_number', 'health_active_shards_percent_as_number', None)
]
+LATENCY = {
+ 'query_latency':
+ {'total': 'query_total',
+ 'spent_time': 'query_time_in_millis'},
+ 'fetch_latency':
+ {'total': 'fetch_total',
+ 'spent_time': 'fetch_time_in_millis'},
+ 'indexing_latency':
+ {'total': 'indexing_index_total',
+ 'spent_time': 'indexing_index_time_in_millis'},
+ 'flushing_latency':
+ {'total': 'flush_total',
+ 'spent_time': 'flush_total_time_in_millis'}
+}
+
# charts order (can be overridden if you want less charts, or different order)
ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
@@ -95,34 +110,34 @@ ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search
CHARTS = {
'search_perf_total': {
- 'options': [None, 'Total number of queries, fetches', 'number of', 'search performance',
+ 'options': [None, 'Queries And Fetches', 'number of', 'search performance',
'es.search_query_total', 'stacked'],
'lines': [
['query_total', 'queries', 'incremental'],
['fetch_total', 'fetches', 'incremental']
]},
'search_perf_current': {
- 'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance',
+ 'options': [None, 'Queries and Fetches In Progress', 'number of', 'search performance',
'es.search_query_current', 'stacked'],
'lines': [
['query_current', 'queries', 'absolute'],
['fetch_current', 'fetches', 'absolute']
]},
'search_perf_time': {
- 'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance',
+ 'options': [None, 'Time Spent On Queries And Fetches', 'seconds', 'search performance',
'es.search_time', 'stacked'],
'lines': [
['query_time_in_millis', 'query', 'incremental', 1, 1000],
['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
]},
'search_latency': {
- 'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
+ 'options': [None, 'Query And Fetch Latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
'lines': [
['query_latency', 'query', 'absolute', 1, 1000],
['fetch_latency', 'fetch', 'absolute', 1, 1000]
]},
'index_perf_total': {
- 'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
+ 'options': [None, 'Indexed Documents, Index Refreshes, Index Flushes To Disk', 'number of',
'indexing performance', 'es.index_performance_total', 'stacked'],
'lines': [
['indexing_index_total', 'indexed', 'incremental'],
@@ -130,13 +145,13 @@ CHARTS = {
['flush_total', 'flushes', 'incremental']
]},
'index_perf_current': {
- 'options': [None, 'Number of documents currently being indexed', 'currently indexed',
+ 'options': [None, 'Number Of Documents Currently Being Indexed', 'currently indexed',
'indexing performance', 'es.index_performance_current', 'stacked'],
'lines': [
['indexing_index_current', 'documents', 'absolute']
]},
'index_perf_time': {
- 'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
+ 'options': [None, 'Time Spent On Indexing, Refreshing, Flushing', 'seconds', 'indexing performance',
'es.search_time', 'stacked'],
'lines': [
['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
@@ -144,33 +159,33 @@ CHARTS = {
['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
]},
'index_latency': {
- 'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
+ 'options': [None, 'Indexing And Flushing Latency', 'ms', 'indexing performance',
'es.index_latency', 'stacked'],
'lines': [
['indexing_latency', 'indexing', 'absolute', 1, 1000],
['flushing_latency', 'flushing', 'absolute', 1, 1000]
]},
'jvm_mem_heap': {
- 'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
+ 'options': [None, 'JVM Heap Currently in Use/Committed', 'percent/MB', 'memory usage and gc',
'es.jvm_heap', 'area'],
'lines': [
['jvm_heap_percent', 'inuse', 'absolute'],
['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
]},
'jvm_gc_count': {
- 'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
+ 'options': [None, 'Garbage Collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
'lines': [
['young_collection_count', 'young', 'incremental'],
['old_collection_count', 'old', 'incremental']
]},
'jvm_gc_time': {
- 'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
+ 'options': [None, 'Time Spent On Garbage Collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
'lines': [
['young_collection_time_in_millis', 'young', 'incremental'],
['old_collection_time_in_millis', 'old', 'incremental']
]},
'thread_pool_qr_q': {
- 'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
+ 'options': [None, 'Number Of Queued Threads In Thread Pool', 'queued threads', 'queues and rejections',
'es.thread_pool_queued', 'stacked'],
'lines': [
['bulk_queue', 'bulk', 'absolute'],
@@ -179,7 +194,7 @@ CHARTS = {
['merge_queue', 'merge', 'absolute']
]},
'thread_pool_qr_r': {
- 'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
+ 'options': [None, 'Rejected Threads In Thread Pool', 'rejected threads', 'queues and rejections',
'es.thread_pool_rejected', 'stacked'],
'lines': [
['bulk_rejected', 'bulk', 'absolute'],
@@ -188,19 +203,19 @@ CHARTS = {
['merge_rejected', 'merge', 'absolute']
]},
'fdata_cache': {
- 'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
+ 'options': [None, 'Fielddata Cache', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
'lines': [
['index_fdata_memory', 'cache', 'absolute', 1, 1048576]
]},
'fdata_ev_tr': {
- 'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
+ 'options': [None, 'Fielddata Evictions And Circuit Breaker Tripped Count', 'number of events',
'fielddata cache', 'es.evictions_tripped', 'line'],
'lines': [
['evictions', None, 'incremental'],
['tripped', None, 'incremental']
]},
'cluster_health_nodes': {
- 'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
+ 'options': [None, 'Nodes And Tasks Statistics', 'units', 'cluster health API',
'es.cluster_health_nodes', 'stacked'],
'lines': [
['health_number_of_nodes', 'nodes', 'absolute'],
@@ -209,7 +224,7 @@ CHARTS = {
['health_number_of_in_flight_fetch', 'in_flight_fetch', 'absolute']
]},
'cluster_health_status': {
- 'options': [None, 'Cluster status', 'status', 'cluster health API',
+ 'options': [None, 'Cluster Status', 'status', 'cluster health API',
'es.cluster_health_status', 'area'],
'lines': [
['status_green', 'green', 'absolute'],
@@ -220,7 +235,7 @@ CHARTS = {
['status_yellow', 'yellow', 'absolute']
]},
'cluster_health_shards': {
- 'options': [None, 'Shards statistics', 'shards', 'cluster health API',
+ 'options': [None, 'Shards Statistics', 'shards', 'cluster health API',
'es.cluster_health_shards', 'stacked'],
'lines': [
['health_active_shards', 'active_shards', 'absolute'],
@@ -231,7 +246,7 @@ CHARTS = {
['health_active_shards_percent_as_number', 'active_percent', 'absolute']
]},
'cluster_stats_nodes': {
- 'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
+ 'options': [None, 'Nodes Statistics', 'nodes', 'cluster stats API',
'es.cluster_nodes', 'stacked'],
'lines': [
['count_data_only', 'data_only', 'absolute'],
@@ -241,46 +256,46 @@ CHARTS = {
['count_client', 'client', 'absolute']
]},
'cluster_stats_query_cache': {
- 'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
+ 'options': [None, 'Query Cache Statistics', 'queries', 'cluster stats API',
'es.cluster_query_cache', 'stacked'],
'lines': [
['query_cache_hit_count', 'hit', 'incremental'],
['query_cache_miss_count', 'miss', 'incremental']
]},
'cluster_stats_docs': {
- 'options': [None, 'Docs statistics', 'count', 'cluster stats API',
+ 'options': [None, 'Docs Statistics', 'count', 'cluster stats API',
'es.cluster_docs', 'line'],
'lines': [
['docs_count', 'docs', 'absolute']
]},
'cluster_stats_store': {
- 'options': [None, 'Store statistics', 'MB', 'cluster stats API',
+ 'options': [None, 'Store Statistics', 'MB', 'cluster stats API',
'es.cluster_store', 'line'],
'lines': [
['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
]},
'cluster_stats_indices_shards': {
- 'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
+ 'options': [None, 'Indices And Shards Statistics', 'count', 'cluster stats API',
'es.cluster_indices_shards', 'stacked'],
'lines': [
['indices_count', 'indices', 'absolute'],
['shards_total', 'shards', 'absolute']
]},
'host_metrics_transport': {
- 'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
+ 'options': [None, 'Cluster Communication Transport Metrics', 'kilobit/s', 'host metrics',
'es.host_transport', 'area'],
'lines': [
['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
]},
'host_metrics_file_descriptors': {
- 'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
+ 'options': [None, 'Available File Descriptors In Percent', 'percent', 'host metrics',
'es.host_descriptors', 'area'],
'lines': [
['file_descriptors_used', 'used', 'absolute', 1, 10]
]},
'host_metrics_http': {
- 'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
+ 'options': [None, 'Opened HTTP Connections', 'connections', 'host metrics',
'es.host_http_connections', 'line'],
'lines': [
['http_current_open', 'opened', 'absolute', 1, 1]
@@ -300,12 +315,13 @@ class Service(UrlService):
self.methods = list()
def check(self):
- # We can't start if <host> AND <port> not specified
- if not all([self.host, self.port, isinstance(self.host, str), isinstance(self.port, (str, int))]):
+ if not all([self.host,
+ self.port,
+ isinstance(self.host, str),
+ isinstance(self.port, (str, int))]):
self.error('Host is not defined in the module configuration file')
return False
- # It as a bad idea to use hostname.
# Hostname -> ip address
try:
self.host = gethostbyname(self.host)
@@ -313,45 +329,33 @@ class Service(UrlService):
self.error(str(error))
return False
- scheme = 'http' if self.scheme else 'https'
+ scheme = 'http' if self.scheme == 'http' else 'https'
# Add handlers (auth, self signed cert accept)
self.url = '%s://%s:%s' % (scheme, self.host, self.port)
- self._UrlService__add_openers()
+ self.opener = self._build_opener()
# Create URL for every Elasticsearch API
url_node_stats = '%s://%s:%s/_nodes/_local/stats' % (scheme, self.host, self.port)
url_cluster_health = '%s://%s:%s/_cluster/health' % (scheme, self.host, self.port)
url_cluster_stats = '%s://%s:%s/_cluster/stats' % (scheme, self.host, self.port)
- # Create list of enabled API calls
user_choice = [bool(self.configuration.get('node_stats', True)),
bool(self.configuration.get('cluster_health', True)),
bool(self.configuration.get('cluster_stats', True))]
- avail_methods = [METHODS(get_data_function=self._get_node_stats_, url=url_node_stats),
- METHODS(get_data_function=self._get_cluster_health_, url=url_cluster_health),
- METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
+ avail_methods = [METHODS(get_data_function=self._get_node_stats_,
+ url=url_node_stats),
+ METHODS(get_data_function=self._get_cluster_health_,
+ url=url_cluster_health),
+ METHODS(get_data_function=self._get_cluster_stats_,
+ url=url_cluster_stats)]
# Remove disabled API calls from 'avail methods'
self.methods = [avail_methods[e[0]] for e in enumerate(avail_methods) if user_choice[e[0]]]
-
- # Run _get_data for ALL active API calls.
- api_check_result = dict()
- data_from_check = dict()
- for method in self.methods:
- try:
- api_check_result[method.url] = method.get_data_function(None, method.url)
- data_from_check.update(api_check_result[method.url] or dict())
- except KeyError as error:
- self.error('Failed to parse %s. Error: %s' % (method.url, str(error)))
- return False
-
- # We can start ONLY if all active API calls returned NOT None
- if not all(api_check_result.values()):
- self.error('Plugin could not get data from all APIs')
+ data = self._get_data()
+ if not data:
return False
- else:
- self._data_from_check = data_from_check
- return True
+ self._data_from_check = data
+ return True
def _get_data(self):
threads = list()
@@ -359,7 +363,8 @@ class Service(UrlService):
result = dict()
for method in self.methods:
- th = Thread(target=method.get_data_function, args=(queue, method.url))
+ th = Thread(target=method.get_data_function,
+ args=(queue, method.url))
th.start()
threads.append(th)
@@ -378,18 +383,18 @@ class Service(UrlService):
raw_data = self._get_raw_data(url)
if not raw_data:
- return queue.put(dict()) if queue else None
- else:
- data = loads(raw_data)
+ return queue.put(dict())
- to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
+ data = loads(raw_data)
+ to_netdata = fetch_data_(raw_data=data,
+ metrics_list=HEALTH_STATS)
- to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
- 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
- current_status = 'status_' + data['status']
- to_netdata[current_status] = 1
+ to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
+ 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
+ current_status = 'status_' + data['status']
+ to_netdata[current_status] = 1
- return queue.put(to_netdata) if queue else to_netdata
+ return queue.put(to_netdata)
def _get_cluster_stats_(self, queue, url):
"""
@@ -400,13 +405,13 @@ class Service(UrlService):
raw_data = self._get_raw_data(url)
if not raw_data:
- return queue.put(dict()) if queue else None
- else:
- data = loads(raw_data)
+ return queue.put(dict())
- to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
+ data = loads(raw_data)
+ to_netdata = fetch_data_(raw_data=data,
+ metrics_list=CLUSTER_STATS)
- return queue.put(to_netdata) if queue else to_netdata
+ return queue.put(to_netdata)
def _get_node_stats_(self, queue, url):
"""
@@ -417,47 +422,46 @@ class Service(UrlService):
raw_data = self._get_raw_data(url)
if not raw_data:
- return queue.put(dict()) if queue else None
- else:
- data = loads(raw_data)
-
- node = list(data['nodes'].keys())[0]
- to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
+ return queue.put(dict())
- # Search performance latency
- to_netdata['query_latency'] = self.find_avg_(to_netdata['query_total'],
- to_netdata['query_time_in_millis'], 'query_latency')
- to_netdata['fetch_latency'] = self.find_avg_(to_netdata['fetch_total'],
- to_netdata['fetch_time_in_millis'], 'fetch_latency')
+ data = loads(raw_data)
- # Indexing performance latency
- to_netdata['indexing_latency'] = self.find_avg_(to_netdata['indexing_index_total'],
- to_netdata['indexing_index_time_in_millis'], 'index_latency')
- to_netdata['flushing_latency'] = self.find_avg_(to_netdata['flush_total'],
- to_netdata['flush_total_time_in_millis'], 'flush_latency')
+ node = list(data['nodes'].keys())[0]
+ to_netdata = fetch_data_(raw_data=data['nodes'][node],
+ metrics_list=NODE_STATS)
+ # Search, index, flush, fetch performance latency
+ for key in LATENCY:
+ try:
+ to_netdata[key] = self.find_avg_(total=to_netdata[LATENCY[key]['total']],
+ spent_time=to_netdata[LATENCY[key]['spent_time']],
+ key=key)
+ except KeyError:
+ continue
+ if 'open_file_descriptors' in to_netdata and 'max_file_descriptors' in to_netdata:
to_netdata['file_descriptors_used'] = round(float(to_netdata['open_file_descriptors'])
/ to_netdata['max_file_descriptors'] * 1000)
- return queue.put(to_netdata) if queue else to_netdata
+ return queue.put(to_netdata)
- def find_avg_(self, value1, value2, key):
+ def find_avg_(self, total, spent_time, key):
if key not in self.latency:
- self.latency.update({key: [value1, value2]})
+ self.latency[key] = dict(total=total,
+ spent_time=spent_time)
return 0
- else:
- if not self.latency[key][0] == value1:
- latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
- self.latency.update({key: [value1, value2]})
- return latency
- else:
- self.latency.update({key: [value1, value2]})
- return 0
+ if self.latency[key]['total'] != total:
+ latency = float(spent_time - self.latency[key]['spent_time'])\
+ / float(total - self.latency[key]['total']) * 1000
+ self.latency[key]['total'] = total
+ self.latency[key]['spent_time'] = spent_time
+ return latency
+ self.latency[key]['spent_time'] = spent_time
+ return 0
def fetch_data_(raw_data, metrics_list):
to_netdata = dict()
- for metric, new_name, function in metrics_list:
+ for metric, new_name, func in metrics_list:
value = raw_data
for key in metric.split('.'):
try:
@@ -465,7 +469,7 @@ def fetch_data_(raw_data, metrics_list):
except KeyError:
break
if not isinstance(value, dict) and key:
- to_netdata[new_name or key] = value if not function else function(value)
+ to_netdata[new_name or key] = value if not func else func(value)
return to_netdata