diff options
Diffstat (limited to 'collectors/python.d.plugin')
-rw-r--r-- | collectors/python.d.plugin/Makefile.am | 1 | ||||
-rw-r--r-- | collectors/python.d.plugin/README.md | 14 | ||||
-rw-r--r-- | collectors/python.d.plugin/dnsdist/README.md | 8 | ||||
-rw-r--r-- | collectors/python.d.plugin/elasticsearch/README.md | 21 | ||||
-rw-r--r-- | collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py | 271 | ||||
-rw-r--r-- | collectors/python.d.plugin/elasticsearch/elasticsearch.conf | 11 | ||||
-rw-r--r-- | collectors/python.d.plugin/gearman/Makefile.inc | 12 | ||||
-rw-r--r-- | collectors/python.d.plugin/gearman/README.md | 39 | ||||
-rw-r--r-- | collectors/python.d.plugin/gearman/gearman.chart.py | 229 | ||||
-rw-r--r-- | collectors/python.d.plugin/gearman/gearman.conf | 72 | ||||
-rw-r--r-- | collectors/python.d.plugin/mysql/README.md | 42 | ||||
-rw-r--r-- | collectors/python.d.plugin/mysql/mysql.chart.py | 259 | ||||
-rw-r--r-- | collectors/python.d.plugin/python.d.conf | 1 | ||||
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin.in | 32 | ||||
-rw-r--r-- | collectors/python.d.plugin/rabbitmq/README.md | 16 | ||||
-rw-r--r-- | collectors/python.d.plugin/rabbitmq/rabbitmq.chart.py | 123 |
16 files changed, 972 insertions, 179 deletions
diff --git a/collectors/python.d.plugin/Makefile.am b/collectors/python.d.plugin/Makefile.am index ad72cfaef..7d087fbfd 100644 --- a/collectors/python.d.plugin/Makefile.am +++ b/collectors/python.d.plugin/Makefile.am @@ -54,6 +54,7 @@ include example/Makefile.inc include exim/Makefile.inc include fail2ban/Makefile.inc include freeradius/Makefile.inc +include gearman/Makefile.inc include go_expvar/Makefile.inc include haproxy/Makefile.inc include hddtemp/Makefile.inc diff --git a/collectors/python.d.plugin/README.md b/collectors/python.d.plugin/README.md index d0074a124..f38ab6783 100644 --- a/collectors/python.d.plugin/README.md +++ b/collectors/python.d.plugin/README.md @@ -223,12 +223,12 @@ This is a generic checklist for submitting a new Python plugin for Netdata. It At minimum, to be buildable and testable, the PR needs to include: -- The module itself, following proper naming conventions: `python.d/<module_dir>/<module_name>.chart.py` -- A README.md file for the plugin under `python.d/<module_dir>`. -- The configuration file for the module: `conf.d/python.d/<module_name>.conf`. Python config files are in YAML format, and should include comments describing what options are present. The instructions are also needed in the configuration section of the README.md -- A basic configuration for the plugin in the appropriate global config file: `conf.d/python.d.conf`, which is also in YAML format. Either add a line that reads `# <module_name>: yes` if the module is to be enabled by default, or one that reads `<module_name>: no` if it is to be disabled by default. -- A line for the plugin in `python.d/Makefile.am` under `dist_python_DATA`. -- A line for the plugin configuration file in `conf.d/Makefile.am`, under `dist_pythonconfig_DATA` -- Optionally, chart information in `web/dashboard_info.js`. This generally involves specifying a name and icon for the section, and may include descriptions for the section or individual charts. +- The module itself, following proper naming conventions: `collectors/python.d.plugin/<module_dir>/<module_name>.chart.py` +- A README.md file for the plugin under `collectors/python.d.plugin/<module_dir>`. +- The configuration file for the module: `collectors/python.d.plugin/<module_dir>/<module_name>.conf`. Python config files are in YAML format, and should include comments describing what options are present. The instructions are also needed in the configuration section of the README.md +- A basic configuration for the plugin in the appropriate global config file: `collectors/python.d.plugin/python.d.conf`, which is also in YAML format. Either add a line that reads `# <module_name>: yes` if the module is to be enabled by default, or one that reads `<module_name>: no` if it is to be disabled by default. +- A makefile for the plugin at `collectors/python.d.plugin/<module_dir>/Makefile.inc`. Check an existing plugin for what this should look like. +- A line in `collectors/python.d.plugin/Makefile.am` including the above-mentioned makefile. Place it with the other plugin includes (please keep the includes sorted alphabetically). +- Optionally, chart information in `web/gui/dashboard_info.js`. This generally involves specifying a name and icon for the section, and may include descriptions for the section or individual charts. [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcollectors%2Fpython.d.plugin%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/collectors/python.d.plugin/dnsdist/README.md b/collectors/python.d.plugin/dnsdist/README.md index fecf4a5d8..4310fe28a 100644 --- a/collectors/python.d.plugin/dnsdist/README.md +++ b/collectors/python.d.plugin/dnsdist/README.md @@ -1,8 +1,8 @@ # dnsdist -Module monitor dnsdist performance and health metrics. +This module monitors dnsdist performance and health metrics. -Following charts are drawn: +The module draws the following charts: 1. **Response latency** @@ -45,7 +45,7 @@ Following charts are drawn: - servfail-responses - trunc-failures -## configuration +## Configuration ```yaml localhost: @@ -57,6 +57,4 @@ localhost: X-API-Key: 'dnsdist-api-key' ``` ---- - [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcollectors%2Fpython.d.plugin%2Fdnsdist%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/collectors/python.d.plugin/elasticsearch/README.md b/collectors/python.d.plugin/elasticsearch/README.md index a719a9431..211dfabfa 100644 --- a/collectors/python.d.plugin/elasticsearch/README.md +++ b/collectors/python.d.plugin/elasticsearch/README.md @@ -1,6 +1,6 @@ # elasticsearch -This module monitors Elasticsearch performance and health metrics. +This module monitors [Elasticsearch](https://www.elastic.co/products/elasticsearch) performance and health metrics. It produces: @@ -51,19 +51,28 @@ It produces: - Store statistics - Indices and shards statistics +9. **Indices** charts (per index statistics, disabled by default): + + - Docs count + - Store size + - Num of replicas + - Health status + ## configuration Sample: ```yaml local: - host : 'ipaddress' # Elasticsearch server ip address or hostname - port : 'port' # Port on which elasticsearch listens - cluster_health : True/False # Calls to cluster health elasticsearch API. Enabled by default. - cluster_stats : True/False # Calls to cluster stats elasticsearch API. Enabled by default. + host : 'ipaddress' # Elasticsearch server ip address or hostname. + port : 'port' # Port on which elasticsearch listens. + node_status : yes/no # Get metrics from "/_nodes/_local/stats". Enabled by default. + cluster_health : yes/no # Get metrics from "/_cluster/health". Enabled by default. + cluster_stats : yes/no # Get metrics from "'/_cluster/stats". Enabled by default. + indices_stats : yes/no # Get metrics from "/_cat/indices". Disabled by default. ``` -If no configuration is given, module will fail to run. +If no configuration is given, module will try to connect to `http://127.0.0.1:9200`. --- diff --git a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py index 20109c64f..8aaa08583 100644 --- a/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py +++ b/collectors/python.d.plugin/elasticsearch/elasticsearch.chart.py @@ -168,6 +168,10 @@ ORDER = [ 'cluster_stats_store', 'cluster_stats_indices', 'cluster_stats_shards_total', + 'index_docs_count', + 'index_store_size', + 'index_replica', + 'index_health', ] CHARTS = { @@ -196,7 +200,8 @@ CHARTS = { ] }, 'search_latency': { - 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency', 'stacked'], + 'options': [None, 'Query And Fetch Latency', 'milliseconds', 'search performance', 'elastic.search_latency', + 'stacked'], 'lines': [ ['query_latency', 'query', 'absolute', 1, 1000], ['fetch_latency', 'fetch', 'absolute', 1, 1000] @@ -397,9 +402,6 @@ CHARTS = { 'lines': [ ['status_green', 'green', 'absolute'], ['status_red', 'red', 'absolute'], - ['status_foo1', None, 'absolute'], - ['status_foo2', None, 'absolute'], - ['status_foo3', None, 'absolute'], ['status_yellow', 'yellow', 'absolute'] ] }, @@ -483,10 +485,61 @@ CHARTS = { 'lines': [ ['http_current_open', 'opened', 'absolute', 1, 1] ] - } + }, + 'index_docs_count': { + 'options': [None, 'Docs Count', 'count', 'indices', 'elastic.index_docs', 'line'], + 'lines': [] + }, + 'index_store_size': { + 'options': [None, 'Store Size', 'bytes', 'indices', 'elastic.index_store_size', 'line'], + 'lines': [] + }, + 'index_replica': { + 'options': [None, 'Replica', 'count', 'indices', 'elastic.index_replica', 'line'], + 'lines': [] + }, + 'index_health': { + 'options': [None, 'Health', 'status', 'indices', 'elastic.index_health', 'line'], + 'lines': [] + }, } +def convert_index_store_size_to_bytes(size): + # can be b, kb, mb, gb + if size.endswith('kb'): + return round(float(size[:-2]) * 1024) + elif size.endswith('mb'): + return round(float(size[:-2]) * 1024 * 1024) + elif size.endswith('gb'): + return round(float(size[:-2]) * 1024 * 1024 * 1024) + elif size.endswith('b'): + return round(float(size[:-1])) + return -1 + + +def convert_index_health(health): + if health == 'green': + return 0 + elif health == 'yellow': + return 1 + elif health == 'read': + return 2 + return -1 + + +def get_survive_any(method): + def w(*args): + try: + method(*args) + except Exception as error: + self, queue, url = args[0], args[1], args[2] + self.error("error during '{0}' : {1}".format(url, error)) + queue.put(dict()) + + return w + + class Service(UrlService): def __init__(self, configuration=None, name=None): UrlService.__init__(self, configuration=configuration, name=name) @@ -501,34 +554,41 @@ class Service(UrlService): ) self.latency = dict() self.methods = list() + self.collected_indices = set() def check(self): - if not all([self.host, - self.port, - isinstance(self.host, str), - isinstance(self.port, (str, int))]): + if not self.host: self.error('Host is not defined in the module configuration file') return False - # Hostname -> ip address try: self.host = gethostbyname(self.host) except gaierror as error: - self.error(str(error)) + self.error(repr(error)) return False - # Create URL for every Elasticsearch API - self.methods = [METHODS(get_data=self._get_node_stats, - url=self.url + '/_nodes/_local/stats', - run=self.configuration.get('node_stats', True)), - METHODS(get_data=self._get_cluster_health, - url=self.url + '/_cluster/health', - run=self.configuration.get('cluster_health', True)), - METHODS(get_data=self._get_cluster_stats, - url=self.url + '/_cluster/stats', - run=self.configuration.get('cluster_stats', True))] - - # Remove disabled API calls from 'avail methods' + self.methods = [ + METHODS( + get_data=self._get_node_stats, + url=self.url + '/_nodes/_local/stats', + run=self.configuration.get('node_stats', True), + ), + METHODS( + get_data=self._get_cluster_health, + url=self.url + '/_cluster/health', + run=self.configuration.get('cluster_health', True) + ), + METHODS( + get_data=self._get_cluster_stats, + url=self.url + '/_cluster/stats', + run=self.configuration.get('cluster_stats', True), + ), + METHODS( + get_data=self._get_indices, + url=self.url + '/_cat/indices?format=json', + run=self.configuration.get('indices_stats', False), + ), + ] return UrlService.check(self) def _get_data(self): @@ -539,8 +599,11 @@ class Service(UrlService): for method in self.methods: if not method.run: continue - th = threading.Thread(target=method.get_data, - args=(queue, method.url)) + th = threading.Thread( + target=method.get_data, + args=(queue, method.url), + ) + th.daemon = True th.start() threads.append(th) @@ -550,88 +613,128 @@ class Service(UrlService): return result or None - def _get_cluster_health(self, queue, url): - """ - Format data received from http request - :return: dict - """ - + def add_index_to_charts(self, idx_name): + for name in ('index_docs_count', 'index_store_size', 'index_replica', 'index_health'): + chart = self.charts[name] + dim = ['{0}_{1}'.format(idx_name, name), idx_name] + chart.add_dimension(dim) + + @get_survive_any + def _get_indices(self, queue, url): + # [ + # { + # "pri.store.size": "650b", + # "health": "yellow", + # "status": "open", + # "index": "twitter", + # "pri": "5", + # "rep": "1", + # "docs.count": "10", + # "docs.deleted": "3", + # "store.size": "650b" + # } + # ] raw_data = self._get_raw_data(url) - if not raw_data: return queue.put(dict()) - data = self.json_reply(raw_data) - - if not data: + indices = self.json_parse(raw_data) + if not indices: return queue.put(dict()) - to_netdata = fetch_data_(raw_data=data, - metrics=HEALTH_STATS) + charts_initialized = len(self.charts) != 0 + data = dict() + for idx in indices: + try: + name = idx['index'] + is_system_index = name.startswith('.') + if is_system_index: + continue + + v = { + '{0}_index_docs_count'.format(name): idx['docs.count'], + '{0}_index_replica'.format(name): idx['rep'], + '{0}_index_health'.format(name): convert_index_health(idx['health']), + } + size = convert_index_store_size_to_bytes(idx['store.size']) + if size != -1: + v['{0}_index_store_size'.format(name)] = size + except KeyError as error: + self.debug("error on parsing index : {0}".format(repr(error))) + continue - to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0, - 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0}) - current_status = 'status_' + data['status'] - to_netdata[current_status] = 1 + data.update(v) + if name not in self.collected_indices and charts_initialized: + self.collected_indices.add(name) + self.add_index_to_charts(name) - return queue.put(to_netdata) + return queue.put(data) - def _get_cluster_stats(self, queue, url): - """ - Format data received from http request - :return: dict - """ - - raw_data = self._get_raw_data(url) + @get_survive_any + def _get_cluster_health(self, queue, url): + raw = self._get_raw_data(url) + if not raw: + return queue.put(dict()) - if not raw_data: + parsed = self.json_parse(raw) + if not parsed: return queue.put(dict()) - data = self.json_reply(raw_data) + data = fetch_data(raw_data=parsed, metrics=HEALTH_STATS) + dummy = { + 'status_green': 0, + 'status_red': 0, + 'status_yellow': 0, + } + data.update(dummy) + current_status = 'status_' + parsed['status'] + data[current_status] = 1 - if not data: - return queue.put(dict()) + return queue.put(data) - to_netdata = fetch_data_(raw_data=data, - metrics=CLUSTER_STATS) + @get_survive_any + def _get_cluster_stats(self, queue, url): + raw = self._get_raw_data(url) + if not raw: + return queue.put(dict()) - return queue.put(to_netdata) + parsed = self.json_parse(raw) + if not parsed: + return queue.put(dict()) - def _get_node_stats(self, queue, url): - """ - Format data received from http request - :return: dict - """ + data = fetch_data(raw_data=parsed, metrics=CLUSTER_STATS) - raw_data = self._get_raw_data(url) + return queue.put(data) - if not raw_data: + @get_survive_any + def _get_node_stats(self, queue, url): + raw = self._get_raw_data(url) + if not raw: return queue.put(dict()) - data = self.json_reply(raw_data) - - if not data: + parsed = self.json_parse(raw) + if not parsed: return queue.put(dict()) - node = list(data['nodes'].keys())[0] - to_netdata = fetch_data_(raw_data=data['nodes'][node], - metrics=NODE_STATS) + node = list(parsed['nodes'].keys())[0] + data = fetch_data(raw_data=parsed['nodes'][node], metrics=NODE_STATS) # Search, index, flush, fetch performance latency for key in LATENCY: try: - to_netdata[key] = self.find_avg(total=to_netdata[LATENCY[key]['total']], - spent_time=to_netdata[LATENCY[key]['spent_time']], - key=key) + data[key] = self.find_avg( + total=data[LATENCY[key]['total']], + spent_time=data[LATENCY[key]['spent_time']], + key=key) except KeyError: continue - if 'process_open_file_descriptors' in to_netdata and 'process_max_file_descriptors' in to_netdata: - to_netdata['file_descriptors_used'] = round(float(to_netdata['process_open_file_descriptors']) - / to_netdata['process_max_file_descriptors'] * 1000) + if 'process_open_file_descriptors' in data and 'process_max_file_descriptors' in data: + v = float(data['process_open_file_descriptors']) / data['process_max_file_descriptors'] * 1000 + data['file_descriptors_used'] = round(v) - return queue.put(to_netdata) + return queue.put(data) - def json_reply(self, reply): + def json_parse(self, reply): try: return json.loads(reply) except ValueError as err: @@ -640,20 +743,22 @@ class Service(UrlService): def find_avg(self, total, spent_time, key): if key not in self.latency: - self.latency[key] = dict(total=total, - spent_time=spent_time) + self.latency[key] = dict(total=total, spent_time=spent_time) return 0 + if self.latency[key]['total'] != total: - latency = float(spent_time - self.latency[key]['spent_time'])\ - / float(total - self.latency[key]['total']) * 1000 + spent_diff = spent_time - self.latency[key]['spent_time'] + total_diff = total - self.latency[key]['total'] + latency = float(spent_diff) / float(total_diff) * 1000 self.latency[key]['total'] = total self.latency[key]['spent_time'] = spent_time return latency + self.latency[key]['spent_time'] = spent_time return 0 -def fetch_data_(raw_data, metrics): +def fetch_data(raw_data, metrics): data = dict() for metric in metrics: value = raw_data @@ -661,7 +766,7 @@ def fetch_data_(raw_data, metrics): try: for m in metrics_list: value = value[m] - except KeyError: + except (KeyError, TypeError): continue data['_'.join(metrics_list)] = value return data diff --git a/collectors/python.d.plugin/elasticsearch/elasticsearch.conf b/collectors/python.d.plugin/elasticsearch/elasticsearch.conf index 5d8e746f5..4058debac 100644 --- a/collectors/python.d.plugin/elasticsearch/elasticsearch.conf +++ b/collectors/python.d.plugin/elasticsearch/elasticsearch.conf @@ -61,11 +61,12 @@ # # Additionally to the above, elasticsearch plugin also supports the following: # -# host: 'ipaddress' # Server ip address or hostname. -# port: 'port' # Port on which elasticsearch listen. -# scheme: 'scheme' # URL scheme. Default is 'http'. -# cluster_health: False/True # Calls to cluster health elasticsearch API. Enabled by default. -# cluster_stats: False/True # Calls to cluster stats elasticsearch API. Enabled by default. +# host : 'ipaddress' # Elasticsearch server ip address or hostname. +# port : 'port' # Port on which elasticsearch listens. +# node_status : yes/no # Get metrics from "/_nodes/_local/stats". Enabled by default. +# cluster_health : yes/no # Get metrics from "/_cluster/health". Enabled by default. +# cluster_stats : yes/no # Get metrics from "'/_cluster/stats". Enabled by default. +# indices_stats : yes/no # Get metrics from "/_cat/indices". Disabled by default. # # # if the URL is password protected, the following are supported: diff --git a/collectors/python.d.plugin/gearman/Makefile.inc b/collectors/python.d.plugin/gearman/Makefile.inc new file mode 100644 index 000000000..97ce8d201 --- /dev/null +++ b/collectors/python.d.plugin/gearman/Makefile.inc @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# THIS IS NOT A COMPLETE Makefile +# IT IS INCLUDED BY ITS PARENT'S Makefile.am +# IT IS REQUIRED TO REFERENCE ALL FILES RELATIVE TO THE PARENT + +# install these files +dist_python_DATA += gearman/gearman.chart.py +dist_pythonconfig_DATA += gearman/gearman.conf + +# do not install these files, but include them in the distribution +dist_noinst_DATA += gearman/README.md gearman/Makefile.inc diff --git a/collectors/python.d.plugin/gearman/README.md b/collectors/python.d.plugin/gearman/README.md new file mode 100644 index 000000000..cbb4da3e2 --- /dev/null +++ b/collectors/python.d.plugin/gearman/README.md @@ -0,0 +1,39 @@ +# Gearman + +Module monitors Gearman worker statistics. A chart +is shown for each job as well as one showing a summary +of all workers. + +Note: Charts may show as a line graph rather than an area +graph if you load Netdata with no jobs running. To change +this go to "Settings" > "Which dimensions to show?" and +select "All". + +Plugin can obtain data from tcp socket **OR** unix socket. + +**Requirement:** +Socket MUST be readable by netdata user. + +It produces: + + * Workers queued + * Workers idle + * Workers running + +### configuration + +```yaml +localhost: + name : 'local' + host : 'localhost' + port : 4730 + + # TLS information can be provided as well + tls : no + cert : /path/to/cert + key : /path/to/key +``` + +When no configuration file is found, module tries to connect to TCP/IP socket: `localhost:4730`. + +--- diff --git a/collectors/python.d.plugin/gearman/gearman.chart.py b/collectors/python.d.plugin/gearman/gearman.chart.py new file mode 100644 index 000000000..26f3533c4 --- /dev/null +++ b/collectors/python.d.plugin/gearman/gearman.chart.py @@ -0,0 +1,229 @@ +# Description: dovecot netdata python.d module +# Author: Kyle Agronick (agronick) +# SPDX-License-Identifier: GPL-3.0+ + +# Gearman Netdata Plugin + +from bases.FrameworkServices.SocketService import SocketService +from copy import deepcopy + + +CHARTS = { + 'total_workers': { + 'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'], + 'lines': [ + ['total_pending', 'Pending', 'absolute'], + ['total_running', 'Running', 'absolute'], + ] + }, +} + + +def job_chart_template(job_name): + return { + 'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'], + 'lines': [ + ['{0}_pending'.format(job_name), 'Pending', 'absolute'], + ['{0}_idle'.format(job_name), 'Idle', 'absolute'], + ['{0}_running'.format(job_name), 'Running', 'absolute'], + ] + } + +def build_result_dict(job): + """ + Get the status for each job + :return: dict + """ + + total, running, available = job['metrics'] + + idle = available - running + pending = total - running + + return { + '{0}_pending'.format(job['job_name']): pending, + '{0}_idle'.format(job['job_name']): idle, + '{0}_running'.format(job['job_name']): running, + } + +def parse_worker_data(job): + job_name = job[0] + job_metrics = job[1:] + + return { + 'job_name': job_name, + 'metrics': job_metrics, + } + + +class GearmanReadException(BaseException): + pass + + +class Service(SocketService): + def __init__(self, configuration=None, name=None): + super(Service, self).__init__(configuration=configuration, name=name) + self.request = "status\n" + self._keep_alive = True + + self.host = self.configuration.get('host', 'localhost') + self.port = self.configuration.get('port', 4730) + + self.tls = self.configuration.get('tls', False) + self.cert = self.configuration.get('cert', None) + self.key = self.configuration.get('key', None) + + self.active_jobs = set() + self.definitions = deepcopy(CHARTS) + self.order = ['total_workers'] + + def _get_data(self): + """ + Format data received from socket + :return: dict + """ + + try: + active_jobs = self.get_active_jobs() + except GearmanReadException: + return None + + found_jobs, job_data = self.process_jobs(active_jobs) + self.remove_stale_jobs(found_jobs) + return job_data + + def get_active_jobs(self): + active_jobs = [] + + for job in self.get_worker_data(): + parsed_job = parse_worker_data(job) + + # Gearman does not clean up old jobs + # We only care about jobs that have + # some relevant data + if not any(parsed_job['metrics']): + continue + + active_jobs.append(parsed_job) + + return active_jobs + + def get_worker_data(self): + """ + Split the data returned from Gearman + into a list of lists + + This returns the same output that you + would get from a gearadmin --status + command. + + Example output returned from + _get_raw_data(): + generic_worker2 78 78 500 + generic_worker3 0 0 760 + generic_worker1 0 0 500 + + :return: list + """ + + try: + raw = self._get_raw_data() + except (ValueError, AttributeError): + raise GearmanReadException() + + if raw is None: + self.debug("Gearman returned no data") + raise GearmanReadException() + + job_lines = raw.splitlines()[:-1] + job_lines = [job.split() for job in sorted(job_lines)] + + for line in job_lines: + line[1:] = map(int, line[1:]) + + return job_lines + + def process_jobs(self, active_jobs): + + output = { + 'total_pending': 0, + 'total_idle': 0, + 'total_running': 0, + } + found_jobs = set() + + for parsed_job in active_jobs: + + job_name = self.add_job(parsed_job) + found_jobs.add(job_name) + job_data = build_result_dict(parsed_job) + + for sum_value in ('pending', 'running', 'idle'): + output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)] + + output.update(job_data) + + return found_jobs, output + + def remove_stale_jobs(self, active_job_list): + """ + Removes jobs that have no workers, pending jobs, + or running jobs + :param active_job_list: The latest list of active jobs + :type active_job_list: iterable + :return: None + """ + + for to_remove in self.active_jobs - active_job_list: + self.remove_job(to_remove) + + def add_job(self, parsed_job): + """ + Adds a job to the list of active jobs + :param parsed_job: A parsed job dict + :type parsed_job: dict + :return: None + """ + + def add_chart(job_name): + """ + Adds a new job chart + :param job_name: The name of the job to add + :type job_name: string + :return: None + """ + + job_key = 'job_{0}'.format(job_name) + template = job_chart_template(job_name) + new_chart = self.charts.add_chart([job_key] + template['options']) + for dimension in template['lines']: + new_chart.add_dimension(dimension) + + if parsed_job['job_name'] not in self.active_jobs: + add_chart(parsed_job['job_name']) + self.active_jobs.add(parsed_job['job_name']) + + return parsed_job['job_name'] + + def remove_job(self, job_name): + """ + Removes a job to the list of active jobs + :param job_name: The name of the job to remove + :type job_name: string + :return: None + """ + + def remove_chart(job_name): + """ + Removes a job chart + :param job_name: The name of the job to remove + :type job_name: string + :return: None + """ + + job_key = 'job_{0}'.format(job_name) + self.charts[job_key].obsolete() + del self.charts[job_key] + + remove_chart(job_name) + self.active_jobs.remove(job_name) diff --git a/collectors/python.d.plugin/gearman/gearman.conf b/collectors/python.d.plugin/gearman/gearman.conf new file mode 100644 index 000000000..c41fd9ffd --- /dev/null +++ b/collectors/python.d.plugin/gearman/gearman.conf @@ -0,0 +1,72 @@ +# netdata python.d.plugin configuration for gearman +# +# This file is in YaML format. Generally the format is: +# +# name: value +# +# There are 2 sections: +# - global variables +# - one or more JOBS +# +# JOBS allow you to collect values from multiple sources. +# Each source will have its own set of charts. +# +# JOB parameters have to be indented (using spaces only, example below). + +# ---------------------------------------------------------------------- +# Global Variables +# These variables set the defaults for all JOBs, however each JOB +# may define its own, overriding the defaults. + +# update_every sets the default data collection frequency. +# If unset, the python.d.plugin default is used. +# update_every: 1 + +# priority controls the order of charts at the netdata dashboard. +# Lower numbers move the charts towards the top of the page. +# If unset, the default for python.d.plugin is used. +# priority: 60000 + +# penalty indicates whether to apply penalty to update_every in case of failures. +# Penalty will increase every 5 failed updates in a row. Maximum penalty is 10 minutes. +# penalty: yes + +# autodetection_retry sets the job re-check interval in seconds. +# The job is not deleted if check fails. +# Attempts to start the job are made once every autodetection_retry. +# This feature is disabled by default. +# autodetection_retry: 0 + +# ---------------------------------------------------------------------- +# JOBS (data collection sources) +# +# The default JOBS share the same *name*. JOBS with the same name +# are mutually exclusive. Only one of them will be allowed running at +# any time. This allows autodetection to try several alternatives and +# pick the one that works. +# +# Any number of jobs is supported. +# +# All python.d.plugin JOBS (for all its modules) support a set of +# predefined parameters. These are: +# +# job_name: +# name: myname # the JOB's name as it will appear at the +# # dashboard (by default is the job_name) +# # JOBs sharing a name are mutually exclusive +# update_every: 1 # the JOB's data collection frequency +# priority: 60000 # the JOB's order on the dashboard +# penalty: yes # the JOB's penalty +# autodetection_retry: 0 # the JOB's re-check interval in seconds +# +# Additionally to the above, gearman also supports the following: +# +# hostname: localhost # The host running the Gearman server +# port: 4730 # Port of the Gearman server +# ---------------------------------------------------------------------- +# AUTO-DETECTION JOB + +localhost: + name : 'local' + host : 'localhost' + port : 4730
\ No newline at end of file diff --git a/collectors/python.d.plugin/mysql/README.md b/collectors/python.d.plugin/mysql/README.md index 45f842d42..037153220 100644 --- a/collectors/python.d.plugin/mysql/README.md +++ b/collectors/python.d.plugin/mysql/README.md @@ -241,12 +241,12 @@ It will produce following charts (if data is available): - sql - io -42. **Replicated Writesets** in writesets/s +42. **Galera Replicated Writesets** in writesets/s - rx - tx -43. **Replicated Bytes** in KiB/s +43. **Galera Replicated Bytes** in KiB/s - rx - tx @@ -256,16 +256,48 @@ It will produce following charts (if data is available): - rx - tx -45. **Replication Conflicts** in transactions +45. **Galera Replication Conflicts** in transactions - bf aborts - cert fails -46. **Flow Control** in ms +46. **Galera Flow Control** in ms - paused -47. **Users CPU time** in percentage +47. **Galera Cluster Status** in status + + - status + +48. **Galera Cluster State** in state + + - state + +49. **Galera Number of Nodes in the Cluster** in num + + - nodes + +50. **Galera Total Weight of the Current Members in the Cluster** in weight + + - weight + +51. **Galera Whether the Node is Connected to the Cluster** in boolean + + - connected + +52. **Galera Whether the Node is Ready to Accept Queries** in boolean + + - ready + +53. **Galera Open Transactions** in num + + - open transactions + +54. **Galera Total Number of WSRep (applier/rollbacker) Threads** in num + + - threads + +55. **Users CPU time** in percentage - users diff --git a/collectors/python.d.plugin/mysql/mysql.chart.py b/collectors/python.d.plugin/mysql/mysql.chart.py index 46d0712fb..f37315479 100644 --- a/collectors/python.d.plugin/mysql/mysql.chart.py +++ b/collectors/python.d.plugin/mysql/mysql.chart.py @@ -117,6 +117,14 @@ GLOBAL_STATS = [ 'Connection_errors_peer_address', 'Connection_errors_select', 'Connection_errors_tcpwrap', + 'Com_delete', + 'Com_insert', + 'Com_select', + 'Com_update', + 'Com_replace' +] + +GALERA_STATS = [ 'wsrep_local_recv_queue', 'wsrep_local_send_queue', 'wsrep_received', @@ -126,11 +134,14 @@ GLOBAL_STATS = [ 'wsrep_local_bf_aborts', 'wsrep_local_cert_failures', 'wsrep_flow_control_paused_ns', - 'Com_delete', - 'Com_insert', - 'Com_select', - 'Com_update', - 'Com_replace' + 'wsrep_cluster_weight', + 'wsrep_cluster_size', + 'wsrep_cluster_status', + 'wsrep_local_state', + 'wsrep_open_transactions', + 'wsrep_connected', + 'wsrep_ready', + 'wsrep_thread_count' ] @@ -216,7 +227,15 @@ ORDER = [ 'galera_queue', 'galera_conflicts', 'galera_flow_control', - 'userstats_cpu' + 'galera_cluster_status', + 'galera_cluster_state', + 'galera_cluster_size', + 'galera_cluster_weight', + 'galera_connected', + 'galera_ready', + 'galera_open_transactions', + 'galera_thread_count', + 'userstats_cpu', ] CHARTS = { @@ -594,6 +613,58 @@ CHARTS = { ['wsrep_flow_control_paused_ns', 'paused', 'incremental', 1, 1000000], ] }, + 'galera_cluster_status': { + 'options': [None, 'Cluster Component Status', 'status', 'galera', 'mysql.galera_cluster_status', 'line'], + 'lines': [ + ['wsrep_cluster_status', 'status', 'absolute'], + ] + }, + 'galera_cluster_state': { + 'options': [None, 'Cluster Component State', 'state', 'galera', 'mysql.galera_cluster_state', 'line'], + 'lines': [ + ['wsrep_local_state', 'state', 'absolute'], + ] + }, + 'galera_cluster_size': { + 'options': [None, 'Number of Nodes in the Cluster', 'num', 'galera', 'mysql.galera_cluster_size', 'line'], + 'lines': [ + ['wsrep_cluster_size', 'nodes', 'absolute'], + ] + }, + 'galera_cluster_weight': { + 'options': [None, 'The Total Weight of the Current Members in the Cluster', 'weight', 'galera', + 'mysql.galera_cluster_weight', 'line'], + 'lines': [ + ['wsrep_cluster_weight', 'weight', 'absolute'], + ] + }, + 'galera_connected': { + 'options': [None, 'Whether the Node is Connected to the Cluster', 'boolean', 'galera', + 'mysql.galera_connected', 'line'], + 'lines': [ + ['wsrep_connected', 'connected', 'absolute'], + ] + }, + 'galera_ready': { + 'options': [None, 'Whether the Node is Ready to Accept Queries', 'boolean', 'galera', + 'mysql.galera_ready', 'line'], + 'lines': [ + ['wsrep_ready', 'ready', 'absolute'], + ] + }, + 'galera_open_transactions': { + 'options': [None, 'Open Transactions', 'num', 'galera', 'mysql.galera_open_transactions', 'line'], + 'lines': [ + ['wsrep_open_transactions', 'open transactions', 'absolute'], + ] + }, + 'galera_thread_count': { + 'options': [None, 'Total Number of WSRep (applier/rollbacker) Threads', 'num', 'galera', + 'mysql.galera_thread_count', 'line'], + 'lines': [ + ['wsrep_thread_count', 'threads', 'absolute'], + ] + }, 'userstats_cpu': { 'options': [None, 'Users CPU time', 'percentage', 'userstats', 'mysql.userstats_cpu', 'stacked'], 'lines': [] @@ -663,6 +734,59 @@ def userstats_chart_template(name): DEFAULT_REPL_CHANNEL = '' +# Write Set REPlication +# https://galeracluster.com/library/documentation/galera-status-variables.html +# https://www.percona.com/doc/percona-xtradb-cluster/LATEST/wsrep-status-index.html +class WSRepDataConverter: + unknown_value = -1 + + def convert(self, key, value): + if key == 'wsrep_connected': + return self.convert_connected(value) + elif key == 'wsrep_ready': + return self.convert_ready(value) + elif key == 'wsrep_cluster_status': + return self.convert_cluster_status(value) + return value + + def convert_connected(self, value): + # https://www.percona.com/doc/percona-xtradb-cluster/LATEST/wsrep-status-index.html#wsrep_connected + if value == 'OFF': + return 0 + if value == 'ON': + return 1 + return self.unknown_value + + def convert_ready(self, value): + # https://www.percona.com/doc/percona-xtradb-cluster/LATEST/wsrep-status-index.html#wsrep_ready + if value == 'OFF': + return 0 + if value == 'ON': + return 1 + return self.unknown_value + + def convert_cluster_status(self, value): + # https://www.percona.com/doc/percona-xtradb-cluster/LATEST/wsrep-status-index.html#wsrep_cluster_status + # https://github.com/codership/wsrep-API/blob/eab2d5d5a31672c0b7d116ef1629ff18392fd7d0/wsrep_api.h + # typedef enum wsrep_view_status { + # WSREP_VIEW_PRIMARY, //!< primary group configuration (quorum present) + # WSREP_VIEW_NON_PRIMARY, //!< non-primary group configuration (quorum lost) + # WSREP_VIEW_DISCONNECTED, //!< not connected to group, retrying. + # WSREP_VIEW_MAX + # } wsrep_view_status_t; + value = value.lower() + if value == 'primary': + return 0 + elif value == 'non-primary': + return 1 + elif value == 'disconnected': + return 2 + return self.unknown_value + + +wsrep_converter = WSRepDataConverter() + + class Service(MySQLService): def __init__(self, configuration=None, name=None): MySQLService.__init__(self, configuration=configuration, name=name) @@ -686,12 +810,9 @@ class Service(MySQLService): data = dict() if 'global_status' in raw_data: - global_status = dict(raw_data['global_status'][0]) - for key in GLOBAL_STATS: - if key in global_status: - data[key] = global_status[key] - if 'Threads_created' in data and 'Connections' in data: - data['Thread_cache_misses'] = round(int(data['Threads_created']) / float(data['Connections']) * 10000) + global_status = self.get_global_status(raw_data['global_status']) + if global_status: + data.update(global_status) if 'slave_status' in raw_data: status = self.get_slave_status(raw_data['slave_status']) @@ -712,6 +833,52 @@ class Service(MySQLService): return data or None + @staticmethod + def convert_wsrep(key, value): + return wsrep_converter.convert(key, value) + + def get_global_status(self, raw_global_status): + # ( + # ( + # ('Aborted_clients', '18'), + # ('Aborted_connects', '33'), + # ('Access_denied_errors', '80'), + # ('Acl_column_grants', '0'), + # ('Acl_database_grants', '0'), + # ('Acl_function_grants', '0'), + # ('wsrep_ready', 'OFF'), + # ('wsrep_rollbacker_thread_count', '0'), + # ('wsrep_thread_count', '0') + # ), + # ( + # ('Variable_name', 253, 60, 64, 64, 0, 0), + # ('Value', 253, 48, 2048, 2048, 0, 0), + # ) + # ) + rows = raw_global_status[0] + if not rows: + return + + global_status = dict(rows) + data = dict() + + for key in GLOBAL_STATS: + if key not in global_status: + continue + value = global_status[key] + data[key] = value + + for key in GALERA_STATS: + if key not in global_status: + continue + value = global_status[key] + value = self.convert_wsrep(key, value) + data[key] = value + + if 'Threads_created' in data and 'Connections' in data: + data['Thread_cache_misses'] = round(int(data['Threads_created']) / float(data['Connections']) * 10000) + return data + def get_slave_status(self, slave_status_data): rows, description = slave_status_data[0], slave_status_data[1] description_keys = [v[0] for v in description] @@ -742,41 +909,39 @@ class Service(MySQLService): self.add_new_charts(slave_status_chart_template, name) def get_userstats(self, raw_data): - # raw_data['user_statistics'] contains the following data structure: - # ( - # ( - # ('netdata', 42L, 0L, 1264L, 3.111252999999968, 2.968510299999994, 110267L, 19741424L, 0L, 0L, 1265L, 0L, - # 0L, 0L, 3L, 0L, 1301L, 0L, 0L, 7633L, 0L, 83L, 44L, 0L, 0L), - # ('root', 60L, 0L, 184L, 0.22856499999999966, 0.1601419999999998, 11605L, 1516513L, 0L, 9L, 220L, 0L, 2L, 1L, - # 6L, 4L,127L, 0L, 0L, 45L, 0L, 45L, 0L, 0L, 0L) - # ), - # ( - # ('User', 253, 9, 128, 128, 0, 0), - # ('Total_connections', 3, 2, 11, 11, 0, 0), - # ('Concurrent_connections', 3, 1, 11, 11, 0, 0), - # ('Connected_time', 3, 4, 11, 11, 0, 0), - # ('Busy_time', 5, 21, 21, 21, 31, 0), - # ('Cpu_time', 5, 18, 21, 21, 31, 0), - # ('Bytes_received', 8, 6, 21, 21, 0, 0), - # ('Bytes_sent', 8, 8, 21, 21, 0, 0), - # ('Binlog_bytes_written', 8, 1, 21, 21, 0, 0), - # ('Rows_read', 8, 1, 21, 21, 0, 0), - # ('Rows_sent', 8, 4, 21, 21, 0, 0), - # ('Rows_deleted', 8, 1, 21, 21, 0, 0), - # ('Rows_inserted', 8, 1, 21, 21, 0, 0), - # ('Rows_updated', 8, 1, 21, 21, 0, 0), - # ('Select_commands', 8, 1, 21, 21, 0, 0), - # ('Update_commands', 8, 1, 21, 21, 0, 0), - # ('Other_commands', 8, 4, 21, 21, 0, 0), - # ('Commit_transactions', 8, 1, 21, 21, 0, 0), - # ('Rollback_transactions', 8, 1, 21, 21, 0, 0), - # ('Denied_connections', 8, 4, 21, 21, 0, 0), - # ('Lost_connections', 8, 1, 21, 21, 0, 0), - # ('Access_denied', 8, 2, 21, 21, 0, 0), - # ('Empty_queries', 8, 2, 21, 21, 0, 0), - # ('Total_ssl_connections', 8, 1, 21, 21, 0, 0), - # ('Max_statement_time_exceeded', 8, 1, 21, 21, 0, 0)), - # ) + # ( + # ( + # ('netdata', 1L, 0L, 60L, 0.15842499999999984, 0.15767439999999996, 5206L, 963957L, 0L, 0L, + # 61L, 0L, 0L, 0L, 0L, 0L, 62L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), + # ), + # ( + # ('User', 253, 7, 128, 128, 0, 0), + # ('Total_connections', 3, 2, 11, 11, 0, 0), + # ('Concurrent_connections', 3, 1, 11, 11, 0, 0), + # ('Connected_time', 3, 2, 11, 11, 0, 0), + # ('Busy_time', 5, 20, 21, 21, 31, 0), + # ('Cpu_time', 5, 20, 21, 21, 31, 0), + # ('Bytes_received', 8, 4, 21, 21, 0, 0), + # ('Bytes_sent', 8, 6, 21, 21, 0, 0), + # ('Binlog_bytes_written', 8, 1, 21, 21, 0, 0), + # ('Rows_read', 8, 1, 21, 21, 0, 0), + # ('Rows_sent', 8, 2, 21, 21, 0, 0), + # ('Rows_deleted', 8, 1, 21, 21, 0, 0), + # ('Rows_inserted', 8, 1, 21, 21, 0, 0), + # ('Rows_updated', 8, 1, 21, 21, 0, 0), + # ('Select_commands', 8, 2, 21, 21, 0, 0), + # ('Update_commands', 8, 1, 21, 21, 0, 0), + # ('Other_commands', 8, 2, 21, 21, 0, 0), + # ('Commit_transactions', 8, 1, 21, 21, 0, 0), + # ('Rollback_transactions', 8, 1, 21, 21, 0, 0), + # ('Denied_connections', 8, 1, 21, 21, 0, 0), + # ('Lost_connections', 8, 1, 21, 21, 0, 0), + # ('Access_denied', 8, 1, 21, 21, 0, 0), + # ('Empty_queries', 8, 2, 21, 21, 0, 0), + # ('Total_ssl_connections', 8, 1, 21, 21, 0, 0), + # ('Max_statement_time_exceeded', 8, 1, 21, 21, 0, 0) + # ) + # ) data = dict() userstats_vars = [e[0] for e in raw_data['user_statistics'][1]] for i, _ in enumerate(raw_data['user_statistics'][0]): diff --git a/collectors/python.d.plugin/python.d.conf b/collectors/python.d.plugin/python.d.conf index e2ee8eeec..65a5cba28 100644 --- a/collectors/python.d.plugin/python.d.conf +++ b/collectors/python.d.plugin/python.d.conf @@ -49,6 +49,7 @@ example: no # exim: yes # fail2ban: yes # freeradius: yes +# gearman: yes go_expvar: no # gunicorn_log has been replaced by web_log diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in index 5b8b50a67..7aa48cbdd 100644 --- a/collectors/python.d.plugin/python.d.plugin.in +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -1,7 +1,21 @@ #!/usr/bin/env bash '''':; -exec "$(command -v python || command -v python3 || command -v python2 || -echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' +pybinary=$(which python || which python3 || which python2) +filtered=() +for arg in "$@" +do + case $arg in + -p*) pybinary=${arg:2} + shift 1 ;; + *) filtered+=("$arg") ;; + esac +done +if [ "$pybinary" = "" ] +then + echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM" + exit 1 +fi +exec "$pybinary" "$0" "${filtered[@]}" # ''' # -*- coding: utf-8 -*- # Description: @@ -197,7 +211,7 @@ class ModuleConfig: return [v for v in self.config if isinstance(self.config.get(v), dict)] def single_job(self): - return [self.create_job(self.name)] + return [self.create_job(self.name, self.config)] def multi_job(self): return [self.create_job(n, self.config[n]) for n in self.job_names()] @@ -574,16 +588,16 @@ class Plugin: try: job.init() except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job", - job.module_name, job.real_name, repr(error)) + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue try: ok = job.check() except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job", - job.module_name, job.real_name, repr(error)) + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue if not ok: @@ -595,8 +609,8 @@ class Plugin: try: job.create() except Exception as error: - self.log.error("{0}[{1}] : unhandled exception on create : {2}, skipping the job", - job.module_name, job.real_name, repr(error)) + self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue diff --git a/collectors/python.d.plugin/rabbitmq/README.md b/collectors/python.d.plugin/rabbitmq/README.md index 346cc23f7..1d7ad956d 100644 --- a/collectors/python.d.plugin/rabbitmq/README.md +++ b/collectors/python.d.plugin/rabbitmq/README.md @@ -1,6 +1,6 @@ # rabbitmq -Module monitor rabbitmq performance and health metrics. +This module monitors [RabbitMQ](https://www.rabbitmq.com/) performance and health metrics. Following charts are drawn: @@ -48,6 +48,20 @@ Following charts are drawn: - free disk space in gigabytes + +Per Vhost charts: + +1. **Vhost Messages** + + - ack + - confirm + - deliver + - get + - get_no_ack + - publish + - redeliver + - return_unroutable + ## configuration ```yaml diff --git a/collectors/python.d.plugin/rabbitmq/rabbitmq.chart.py b/collectors/python.d.plugin/rabbitmq/rabbitmq.chart.py index d947121d6..ad7dcce21 100644 --- a/collectors/python.d.plugin/rabbitmq/rabbitmq.chart.py +++ b/collectors/python.d.plugin/rabbitmq/rabbitmq.chart.py @@ -9,6 +9,7 @@ from bases.FrameworkServices.UrlService import UrlService API_NODE = 'api/nodes' API_OVERVIEW = 'api/overview' +API_VHOSTS = 'api/vhosts' NODE_STATS = [ 'fd_used', @@ -33,6 +34,17 @@ OVERVIEW_STATS = [ 'message_stats.publish' ] +VHOST_MESSAGE_STATS = [ + 'message_stats.ack', + 'message_stats.confirm', + 'message_stats.deliver', + 'message_stats.get', + 'message_stats.get_no_ack', + 'message_stats.publish', + 'message_stats.redeliver', + 'message_stats.return_unroutable', +] + ORDER = [ 'queued_messages', 'message_rates', @@ -111,6 +123,51 @@ CHARTS = { } +def vhost_chart_template(name): + order = [ + 'vhost_{0}_message_stats'.format(name), + ] + family = 'vhost {0}'.format(name) + + charts = { + order[0]: { + 'options': [ + None, 'Vhost "{0}" Messages'.format(name), 'messages/s', family, 'rabbitmq.vhost_messages', 'stacked'], + 'lines': [ + ['vhost_{0}_message_stats_ack'.format(name), 'ack', 'incremental'], + ['vhost_{0}_message_stats_confirm'.format(name), 'confirm', 'incremental'], + ['vhost_{0}_message_stats_deliver'.format(name), 'deliver', 'incremental'], + ['vhost_{0}_message_stats_get'.format(name), 'get', 'incremental'], + ['vhost_{0}_message_stats_get_no_ack'.format(name), 'get_no_ack', 'incremental'], + ['vhost_{0}_message_stats_publish'.format(name), 'publish', 'incremental'], + ['vhost_{0}_message_stats_redeliver'.format(name), 'redeliver', 'incremental'], + ['vhost_{0}_message_stats_return_unroutable'.format(name), 'return_unroutable', 'incremental'], + ] + }, + } + + return order, charts + + +class VhostStatsBuilder: + def __init__(self): + self.stats = None + + def set(self, raw_stats): + self.stats = raw_stats + + def name(self): + return self.stats['name'] + + def has_msg_stats(self): + return bool(self.stats.get('message_stats')) + + def msg_stats(self): + name = self.name() + stats = fetch_data(raw_data=self.stats, metrics=VHOST_MESSAGE_STATS) + return dict(('vhost_{0}_{1}'.format(name, k), v) for k, v in stats.items()) + + class Service(UrlService): def __init__(self, configuration=None, name=None): UrlService.__init__(self, configuration=configuration, name=name) @@ -122,63 +179,107 @@ class Service(UrlService): configuration.get('port', 15672), ) self.node_name = str() + self.vhost = VhostStatsBuilder() + self.collected_vhosts = set() def _get_data(self): data = dict() stats = self.get_overview_stats() - if not stats: return None data.update(stats) stats = self.get_nodes_stats() - if not stats: return None data.update(stats) + stats = self.get_vhosts_stats() + if stats: + data.update(stats) + return data or None def get_overview_stats(self): url = '{0}/{1}'.format(self.url, API_OVERVIEW) - + self.debug("doing http request to '{0}'".format(url)) raw = self._get_raw_data(url) - if not raw: return None data = loads(raw) - self.node_name = data['node'] + self.debug("found node name: '{0}'".format(self.node_name)) - return fetch_data(raw_data=data, metrics=OVERVIEW_STATS) + stats = fetch_data(raw_data=data, metrics=OVERVIEW_STATS) + self.debug("number of metrics: {0}".format(len(stats))) + return stats def get_nodes_stats(self): - url = '{0}/{1}/{2}'.format(self.url, API_NODE, self.node_name) + if self.node_name == "": + self.error("trying to get node stats, but node name is not set") + return None + url = '{0}/{1}/{2}'.format(self.url, API_NODE, self.node_name) + self.debug("doing http request to '{0}'".format(url)) raw = self._get_raw_data(url) - if not raw: return None data = loads(raw) + stats = fetch_data(raw_data=data, metrics=NODE_STATS) + self.debug("number of metrics: {0}".format(len(stats))) + return stats - return fetch_data(raw_data=data, metrics=NODE_STATS) + def get_vhosts_stats(self): + url = '{0}/{1}'.format(self.url, API_VHOSTS) + self.debug("doing http request to '{0}'".format(url)) + raw = self._get_raw_data(url) + if not raw: + return None + + data = dict() + vhosts = loads(raw) + charts_initialized = len(self.charts) > 0 + + for vhost in vhosts: + self.vhost.set(vhost) + if not self.vhost.has_msg_stats(): + continue + + if charts_initialized and self.vhost.name() not in self.collected_vhosts: + self.collected_vhosts.add(self.vhost.name()) + self.add_vhost_charts(self.vhost.name()) + + data.update(self.vhost.msg_stats()) + + self.debug("number of vhosts: {0}, metrics: {1}".format(len(vhosts), len(data))) + return data + + def add_vhost_charts(self, vhost_name): + order, charts = vhost_chart_template(vhost_name) + + for chart_name in order: + params = [chart_name] + charts[chart_name]['options'] + dimensions = charts[chart_name]['lines'] + + new_chart = self.charts.add_chart(params) + for dimension in dimensions: + new_chart.add_dimension(dimension) def fetch_data(raw_data, metrics): data = dict() - for metric in metrics: value = raw_data metrics_list = metric.split('.') try: for m in metrics_list: value = value[m] - except KeyError: + except (KeyError, TypeError): continue data['_'.join(metrics_list)] = value |