summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python_modules/bases
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/python_modules/bases')
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py91
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py82
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py163
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py256
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py336
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py207
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py0
-rw-r--r--collectors/python.d.plugin/python_modules/bases/__init__.py0
-rw-r--r--collectors/python.d.plugin/python_modules/bases/charts.py414
-rw-r--r--collectors/python.d.plugin/python_modules/bases/collection.py117
-rw-r--r--collectors/python.d.plugin/python_modules/bases/loaders.py46
-rw-r--r--collectors/python.d.plugin/python_modules/bases/loggers.py206
12 files changed, 1918 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py
new file mode 100644
index 0000000..dea50ee
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import os
+
+from subprocess import Popen, PIPE
+
+from bases.FrameworkServices.SimpleService import SimpleService
+from bases.collection import find_binary
+
+
+class ExecutableService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.command = None
+
+ def _get_raw_data(self, stderr=False, command=None):
+ """
+ Get raw data from executed command
+ :return: <list>
+ """
+ command = command or self.command
+ self.debug("Executing command '{0}'".format(' '.join(command)))
+ try:
+ p = Popen(command, stdout=PIPE, stderr=PIPE)
+ except Exception as error:
+ self.error('Executing command {0} resulted in error: {1}'.format(command, error))
+ return None
+
+ data = list()
+ std = p.stderr if stderr else p.stdout
+ for line in std:
+ try:
+ data.append(line.decode('utf-8'))
+ except TypeError:
+ continue
+
+ return data
+
+ def check(self):
+ """
+ Parse basic configuration, check if command is whitelisted and is returning values
+ :return: <boolean>
+ """
+ # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified)
+ if 'command' in self.configuration:
+ self.command = self.configuration['command']
+
+ # "command" must be: 1.not None 2. type <str>
+ if not (self.command and isinstance(self.command, str)):
+ self.error('Command is not defined or command type is not <str>')
+ return False
+
+ # Split "command" into: 1. command <str> 2. options <list>
+ command, opts = self.command.split()[0], self.command.split()[1:]
+
+ # Check for "bad" symbols in options. No pipes, redirects etc.
+ opts_list = ['&', '|', ';', '>', '<']
+ bad_opts = set(''.join(opts)) & set(opts_list)
+ if bad_opts:
+ self.error("Bad command argument(s): {opts}".format(opts=bad_opts))
+ return False
+
+ # Find absolute path ('echo' => '/bin/echo')
+ if '/' not in command:
+ command = find_binary(command)
+ if not command:
+ self.error('Can\'t locate "{command}" binary'.format(command=self.command))
+ return False
+ # Check if binary exist and executable
+ else:
+ if not os.access(command, os.X_OK):
+ self.error('"{binary}" is not executable'.format(binary=command))
+ return False
+
+ self.command = [command] + opts if opts else [command]
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Command: {command}. Error: {error}'.format(command=self.command,
+ error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error('Command "{command}" returned no data'.format(command=self.command))
+ return False
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py
new file mode 100644
index 0000000..a55e33f
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from glob import glob
+import sys
+import os
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+class LogService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.log_path = self.configuration.get('path')
+ self.__glob_path = self.log_path
+ self._last_position = 0
+ self.__re_find = dict(current=0, run=0, maximum=60)
+ self.__open_args = {'errors': 'replace'} if sys.version_info[0] > 2 else {}
+
+ def _get_raw_data(self):
+ """
+ Get log lines since last poll
+ :return: list
+ """
+ lines = list()
+ try:
+ if self.__re_find['current'] == self.__re_find['run']:
+ self._find_recent_log_file()
+ size = os.path.getsize(self.log_path)
+ if size == self._last_position:
+ self.__re_find['current'] += 1
+ return list() # return empty list if nothing has changed
+ elif size < self._last_position:
+ self._last_position = 0 # read from beginning if file has shrunk
+
+ with open(self.log_path, **self.__open_args) as fp:
+ fp.seek(self._last_position)
+ for line in fp:
+ lines.append(line)
+ self._last_position = fp.tell()
+ self.__re_find['current'] = 0
+ except (OSError, IOError) as error:
+ self.__re_find['current'] += 1
+ self.error(str(error))
+
+ return lines or None
+
+ def _find_recent_log_file(self):
+ """
+ :return:
+ """
+ self.__re_find['run'] = self.__re_find['maximum']
+ self.__re_find['current'] = 0
+ self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files
+ path_list = glob(self.__glob_path)
+ if path_list:
+ self.log_path = max(path_list)
+ return True
+ return False
+
+ def check(self):
+ """
+ Parse basic configuration and check if log file exists
+ :return: boolean
+ """
+ if not self.log_path:
+ self.error('No path to log specified')
+ return None
+
+ if self._find_recent_log_file() and os.access(self.log_path, os.R_OK) and os.path.isfile(self.log_path):
+ return True
+ self.error('Cannot access {0}'.format(self.log_path))
+ return False
+
+ def create(self):
+ # set cursor at last byte of log file
+ self._last_position = os.path.getsize(self.log_path)
+ status = SimpleService.create(self)
+ return status
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py
new file mode 100644
index 0000000..7f5c7d2
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py
@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from sys import exc_info
+
+try:
+ import MySQLdb
+
+ PY_MYSQL = True
+except ImportError:
+ try:
+ import pymysql as MySQLdb
+
+ PY_MYSQL = True
+ except ImportError:
+ PY_MYSQL = False
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+class MySQLService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.__connection = None
+ self.__conn_properties = dict()
+ self.extra_conn_properties = dict()
+ self.__queries = self.configuration.get('queries', dict())
+ self.queries = dict()
+
+ def __connect(self):
+ try:
+ connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties)
+ except (MySQLdb.MySQLError, TypeError, AttributeError) as error:
+ return None, str(error)
+ else:
+ return connection, None
+
+ def check(self):
+ def get_connection_properties(conf, extra_conf):
+ properties = dict()
+ if conf.get('user'):
+ properties['user'] = conf['user']
+ if conf.get('pass'):
+ properties['passwd'] = conf['pass']
+
+ if conf.get('socket'):
+ properties['unix_socket'] = conf['socket']
+ elif conf.get('host'):
+ properties['host'] = conf['host']
+ properties['port'] = int(conf.get('port', 3306))
+ elif conf.get('my.cnf'):
+ properties['read_default_file'] = conf['my.cnf']
+
+ if conf.get('ssl'):
+ properties['ssl'] = conf['ssl']
+
+ if isinstance(extra_conf, dict) and extra_conf:
+ properties.update(extra_conf)
+
+ return properties or None
+
+ def is_valid_queries_dict(raw_queries, log_error):
+ """
+ :param raw_queries: dict:
+ :param log_error: function:
+ :return: dict or None
+
+ raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries)
+ """
+
+ def is_valid_query(query):
+ return all([isinstance(query, str),
+ query.startswith(('SELECT', 'select', 'SHOW', 'show'))])
+
+ if hasattr(raw_queries, 'keys') and raw_queries:
+ valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)])
+ bad_queries = set(raw_queries) - set(valid_queries)
+
+ if bad_queries:
+ log_error('Removed query(s): {queries}'.format(queries=bad_queries))
+ return valid_queries
+ else:
+ log_error('Unsupported "queries" format. Must be not empty <dict>')
+ return None
+
+ if not PY_MYSQL:
+ self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin')
+ return False
+
+ # Preference: 1. "queries" from the configuration file 2. "queries" from the module
+ self.queries = self.__queries or self.queries
+ # Check if "self.queries" exist, not empty and all queries are in valid format
+ self.queries = is_valid_queries_dict(self.queries, self.error)
+ if not self.queries:
+ return None
+
+ # Get connection properties
+ self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties)
+ if not self.__conn_properties:
+ self.error('Connection properties are missing')
+ return False
+
+ # Create connection to the database
+ self.__connection, error = self.__connect()
+ if error:
+ self.error('Can\'t establish connection to MySQL: {error}'.format(error=error))
+ return False
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Error: {error}'.format(error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error("_get_data() returned no data or type is not <dict>")
+ return False
+
+ def _get_raw_data(self, description=None):
+ """
+ Get raw data from MySQL server
+ :return: dict: fetchall() or (fetchall(), description)
+ """
+
+ if not self.__connection:
+ self.__connection, error = self.__connect()
+ if error:
+ return None
+
+ raw_data = dict()
+ queries = dict(self.queries)
+ try:
+ cursor = self.__connection.cursor()
+ for name, query in queries.items():
+ try:
+ cursor.execute(query)
+ except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
+ if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
+ cursor.close()
+ raise RuntimeError
+ self.error('Removed query: {name}[{query}]. Error: error'.format(name=name,
+ query=query,
+ error=error))
+ self.queries.pop(name)
+ continue
+ else:
+ raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
+ cursor.close()
+ self.__connection.commit()
+ except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError):
+ self.__connection.close()
+ self.__connection = None
+ return None
+ else:
+ return raw_data or None
+
+ @staticmethod
+ def __is_error_critical(err_class, err_text):
+ return err_class == MySQLdb.OperationalError and all(['denied' not in err_text,
+ 'Unknown column' not in err_text])
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
new file mode 100644
index 0000000..c304cce
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+
+from time import sleep, time
+
+from third_party.monotonic import monotonic
+
+from bases.charts import Charts, ChartError, create_runtime_chart
+from bases.collection import safe_print
+from bases.loggers import PythonDLimitedLogger
+
+RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \
+ 'SET run_time = {elapsed}\n' \
+ 'END\n'
+
+PENALTY_EVERY = 5
+MAX_PENALTY = 10 * 60 # 10 minutes
+
+
+class RuntimeCounters:
+ def __init__(self, configuration):
+ """
+ :param configuration: <dict>
+ """
+ self.update_every = int(configuration.pop('update_every'))
+ self.do_penalty = configuration.pop('penalty')
+
+ self.start_mono = 0
+ self.start_real = 0
+ self.retries = 0
+ self.penalty = 0
+ self.elapsed = 0
+ self.prev_update = 0
+
+ self.runs = 1
+
+ def calc_next(self):
+ self.start_mono = monotonic()
+ return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty
+
+ def sleep_until_next(self):
+ next_time = self.calc_next()
+ while self.start_mono < next_time:
+ sleep(next_time - self.start_mono)
+ self.start_mono = monotonic()
+ self.start_real = time()
+
+ def handle_retries(self):
+ self.retries += 1
+ if self.do_penalty and self.retries % PENALTY_EVERY == 0:
+ self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY))
+
+
+def clean_module_name(name):
+ if name.startswith('pythond_'):
+ return name[8:]
+ return name
+
+
+class SimpleService(PythonDLimitedLogger, object):
+ """
+ Prototype of Service class.
+ Implemented basic functionality to run jobs by `python.d.plugin`
+ """
+
+ def __init__(self, configuration, name=''):
+ """
+ :param configuration: <dict>
+ :param name: <str>
+ """
+ PythonDLimitedLogger.__init__(self)
+ self.configuration = configuration
+ self.order = list()
+ self.definitions = dict()
+
+ self.module_name = clean_module_name(self.__module__)
+ self.job_name = configuration.pop('job_name')
+ self.override_name = configuration.pop('override_name')
+ self.fake_name = None
+
+ self._runtime_counters = RuntimeCounters(configuration=configuration)
+ self.charts = Charts(job_name=self.actual_name,
+ priority=configuration.pop('priority'),
+ cleanup=configuration.pop('chart_cleanup'),
+ get_update_every=self.get_update_every,
+ module_name=self.module_name)
+
+ def __repr__(self):
+ return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__),
+ name=self.name)
+
+ @property
+ def name(self):
+ if self.job_name and self.job_name != self.module_name:
+ return '_'.join([self.module_name, self.override_name or self.job_name])
+ return self.module_name
+
+ def actual_name(self):
+ return self.fake_name or self.name
+
+ @property
+ def runs_counter(self):
+ return self._runtime_counters.runs
+
+ @property
+ def update_every(self):
+ return self._runtime_counters.update_every
+
+ @update_every.setter
+ def update_every(self, value):
+ """
+ :param value: <int>
+ :return:
+ """
+ self._runtime_counters.update_every = value
+
+ def get_update_every(self):
+ return self.update_every
+
+ def check(self):
+ """
+ check() prototype
+ :return: boolean
+ """
+ self.debug("job doesn't implement check() method. Using default which simply invokes get_data().")
+ data = self.get_data()
+ if data and isinstance(data, dict):
+ return True
+ self.debug('returned value is wrong: {0}'.format(data))
+ return False
+
+ @create_runtime_chart
+ def create(self):
+ for chart_name in self.order:
+ chart_config = self.definitions.get(chart_name)
+
+ if not chart_config:
+ self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. "
+ "Skipping it.".format(chart_name=chart_name))
+ continue
+
+ # create chart
+ chart_params = [chart_name] + chart_config['options']
+ try:
+ self.charts.add_chart(params=chart_params)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name,
+ error=error))
+ continue
+
+ # add dimensions to chart
+ for dimension in chart_config['lines']:
+ try:
+ self.charts[chart_name].add_dimension(dimension)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension,
+ error=error))
+ continue
+
+ # add variables to chart
+ if 'variables' in chart_config:
+ for variable in chart_config['variables']:
+ try:
+ self.charts[chart_name].add_variable(variable)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable,
+ error=error))
+ continue
+
+ del self.order
+ del self.definitions
+
+ # True if job has at least 1 chart else False
+ return bool(self.charts)
+
+ def run(self):
+ """
+ Runs job in thread. Handles retries.
+ Exits when job failed or timed out.
+ :return: None
+ """
+ job = self._runtime_counters
+ self.debug('started, update frequency: {freq}'.format(freq=job.update_every))
+
+ while True:
+ job.sleep_until_next()
+
+ since = 0
+ if job.prev_update:
+ since = int((job.start_real - job.prev_update) * 1e6)
+
+ try:
+ updated = self.update(interval=since)
+ except Exception as error:
+ self.error('update() unhandled exception: {error}'.format(error=error))
+ updated = False
+
+ job.runs += 1
+
+ if not updated:
+ job.handle_retries()
+ else:
+ job.elapsed = int((monotonic() - job.start_mono) * 1e3)
+ job.prev_update = job.start_real
+ job.retries, job.penalty = 0, 0
+ safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name,
+ since_last=since,
+ elapsed=job.elapsed))
+ self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format(
+ status='OK' if updated else 'FAILED',
+ elapsed=job.elapsed if updated else '-',
+ retries=job.retries))
+
+ def update(self, interval):
+ """
+ :return:
+ """
+ data = self.get_data()
+ if not data:
+ self.debug('get_data() returned no data')
+ return False
+ elif not isinstance(data, dict):
+ self.debug('get_data() returned incorrect type data')
+ return False
+
+ updated = False
+
+ for chart in self.charts:
+ if chart.flags.obsoleted:
+ if chart.can_be_updated(data):
+ chart.refresh()
+ else:
+ continue
+ elif self.charts.cleanup and chart.penalty >= self.charts.cleanup:
+ chart.obsolete()
+ self.info("chart '{0}' was suppressed due to non updating".format(chart.name))
+ continue
+
+ ok = chart.update(data, interval)
+ if ok:
+ updated = True
+
+ if not updated:
+ self.debug('none of the charts has been updated')
+
+ return updated
+
+ def get_data(self):
+ return self._get_data()
+
+ def _get_data(self):
+ raise NotImplementedError
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py
new file mode 100644
index 0000000..d6c7550
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py
@@ -0,0 +1,336 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import errno
+import socket
+
+try:
+ import ssl
+except ImportError:
+ _TLS_SUPPORT = False
+else:
+ _TLS_SUPPORT = True
+
+if _TLS_SUPPORT:
+ try:
+ PROTOCOL_TLS = ssl.PROTOCOL_TLS
+ except AttributeError:
+ PROTOCOL_TLS = ssl.PROTOCOL_SSLv23
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+DEFAULT_CONNECT_TIMEOUT = 2.0
+DEFAULT_READ_TIMEOUT = 2.0
+DEFAULT_WRITE_TIMEOUT = 2.0
+
+
+class SocketService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ self._sock = None
+ self._keep_alive = False
+ self.host = 'localhost'
+ self.port = None
+ self.unix_socket = None
+ self.dgram_socket = False
+ self.request = ''
+ self.tls = False
+ self.cert = None
+ self.key = None
+ self.__socket_config = None
+ self.__empty_request = "".encode()
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.connect_timeout = configuration.get('connect_timeout', DEFAULT_CONNECT_TIMEOUT)
+ self.read_timeout = configuration.get('read_timeout', DEFAULT_READ_TIMEOUT)
+ self.write_timeout = configuration.get('write_timeout', DEFAULT_WRITE_TIMEOUT)
+
+ def _socket_error(self, message=None):
+ if self.unix_socket is not None:
+ self.error('unix socket "{socket}": {message}'.format(socket=self.unix_socket,
+ message=message))
+ else:
+ if self.__socket_config is not None:
+ _, _, _, _, sa = self.__socket_config
+ self.error('socket to "{address}" port {port}: {message}'.format(address=sa[0],
+ port=sa[1],
+ message=message))
+ else:
+ self.error('unknown socket: {0}'.format(message))
+
+ def _connect2socket(self, res=None):
+ """
+ Connect to a socket, passing the result of getaddrinfo()
+ :return: boolean
+ """
+ if res is None:
+ res = self.__socket_config
+ if res is None:
+ self.error("Cannot create socket to 'None':")
+ return False
+
+ af, sock_type, proto, _, sa = res
+ try:
+ self.debug('Creating socket to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self._sock = socket.socket(af, sock_type, proto)
+ except socket.error as error:
+ self.error('Failed to create socket "{address}", port {port}, error: {error}'.format(address=sa[0],
+ port=sa[1],
+ error=error))
+ self._sock = None
+ self.__socket_config = None
+ return False
+
+ if self.tls:
+ try:
+ self.debug('Encapsulating socket with TLS')
+ self.debug('Using keyfile: {0}, certfile: {1}, cert_reqs: {2}, ssl_version: {3}'.format(
+ self.key, self.cert, ssl.CERT_NONE, PROTOCOL_TLS
+ ))
+ self._sock = ssl.wrap_socket(self._sock,
+ keyfile=self.key,
+ certfile=self.cert,
+ server_side=False,
+ cert_reqs=ssl.CERT_NONE,
+ ssl_version=PROTOCOL_TLS,
+ )
+ except (socket.error, ssl.SSLError, IOError, OSError) as error:
+ self.error('failed to wrap socket : {0}'.format(repr(error)))
+ self._disconnect()
+ self.__socket_config = None
+ return False
+
+ try:
+ self.debug('connecting socket to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self._sock.settimeout(self.connect_timeout)
+ self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout()))
+ self._sock.connect(sa)
+ except (socket.error, ssl.SSLError) as error:
+ self.error('Failed to connect to "{address}", port {port}, error: {error}'.format(address=sa[0],
+ port=sa[1],
+ error=error))
+ self._disconnect()
+ self.__socket_config = None
+ return False
+
+ self.debug('connected to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self.__socket_config = res
+ return True
+
+ def _connect2unixsocket(self):
+ """
+ Connect to a unix socket, given its filename
+ :return: boolean
+ """
+ if self.unix_socket is None:
+ self.error("cannot connect to unix socket 'None'")
+ return False
+
+ try:
+ self.debug('attempting DGRAM unix socket "{0}"'.format(self.unix_socket))
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self._sock.settimeout(self.connect_timeout)
+ self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout()))
+ self._sock.connect(self.unix_socket)
+ self.debug('connected DGRAM unix socket "{0}"'.format(self.unix_socket))
+ return True
+ except socket.error as error:
+ self.debug('Failed to connect DGRAM unix socket "{socket}": {error}'.format(socket=self.unix_socket,
+ error=error))
+
+ try:
+ self.debug('attempting STREAM unix socket "{0}"'.format(self.unix_socket))
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self._sock.settimeout(self.connect_timeout)
+ self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout()))
+ self._sock.connect(self.unix_socket)
+ self.debug('connected STREAM unix socket "{0}"'.format(self.unix_socket))
+ return True
+ except socket.error as error:
+ self.debug('Failed to connect STREAM unix socket "{socket}": {error}'.format(socket=self.unix_socket,
+ error=error))
+ self._sock = None
+ return False
+
+ def _connect(self):
+ """
+ Recreate socket and connect to it since sockets cannot be reused after closing
+ Available configurations are IPv6, IPv4 or UNIX socket
+ :return:
+ """
+ try:
+ if self.unix_socket is not None:
+ self._connect2unixsocket()
+
+ else:
+ if self.__socket_config is not None:
+ self._connect2socket()
+ else:
+ if self.dgram_socket:
+ sock_type = socket.SOCK_DGRAM
+ else:
+ sock_type = socket.SOCK_STREAM
+ for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, sock_type):
+ if self._connect2socket(res):
+ break
+
+ except Exception as error:
+ self.error('unhandled exception during connect : {0}'.format(repr(error)))
+ self._sock = None
+ self.__socket_config = None
+
+ def _disconnect(self):
+ """
+ Close socket connection
+ :return:
+ """
+ if self._sock is not None:
+ try:
+ self.debug('closing socket')
+ self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all
+ self._sock.close()
+ except Exception as error:
+ if not (hasattr(error, 'errno') and error.errno == errno.ENOTCONN):
+ self.error(error)
+ self._sock = None
+
+ def _send(self, request=None):
+ """
+ Send request.
+ :return: boolean
+ """
+ # Send request if it is needed
+ if self.request != self.__empty_request:
+ try:
+ self.debug('set socket write timeout to: {0}'.format(self._sock.gettimeout()))
+ self._sock.settimeout(self.write_timeout)
+ self.debug('sending request: {0}'.format(request or self.request))
+ self._sock.send(request or self.request)
+ except Exception as error:
+ self._socket_error('error sending request: {0}'.format(error))
+ self._disconnect()
+ return False
+ return True
+
+ def _receive(self, raw=False):
+ """
+ Receive data from socket
+ :param raw: set `True` to return bytes
+ :type raw: bool
+ :return: decoded str or raw bytes
+ :rtype: str/bytes
+ """
+ data = "" if not raw else b""
+ while True:
+ self.debug('receiving response')
+ try:
+ self.debug('set socket read timeout to: {0}'.format(self._sock.gettimeout()))
+ self._sock.settimeout(self.read_timeout)
+ buf = self._sock.recv(4096)
+ except Exception as error:
+ self._socket_error('failed to receive response: {0}'.format(error))
+ self._disconnect()
+ break
+
+ if buf is None or len(buf) == 0: # handle server disconnect
+ if data == "" or data == b"":
+ self._socket_error('unexpectedly disconnected')
+ else:
+ self.debug('server closed the connection')
+ self._disconnect()
+ break
+
+ self.debug('received data')
+ data += buf.decode('utf-8', 'ignore') if not raw else buf
+ if self._check_raw_data(data):
+ break
+
+ self.debug(u'final response: {0}'.format(data if not raw else u'binary data'))
+ return data
+
+ def _get_raw_data(self, raw=False, request=None):
+ """
+ Get raw data with low-level "socket" module.
+ :param raw: set `True` to return bytes
+ :type raw: bool
+ :return: decoded data (str) or raw data (bytes)
+ :rtype: str/bytes
+ """
+ if self._sock is None:
+ self._connect()
+ if self._sock is None:
+ return None
+
+ # Send request if it is needed
+ if not self._send(request):
+ return None
+
+ data = self._receive(raw)
+
+ if not self._keep_alive:
+ self._disconnect()
+
+ return data
+
+ @staticmethod
+ def _check_raw_data(data):
+ """
+ Check if all data has been gathered from socket
+ :param data: str
+ :return: boolean
+ """
+ return bool(data)
+
+ def _parse_config(self):
+ """
+ Parse configuration data
+ :return: boolean
+ """
+ try:
+ self.unix_socket = str(self.configuration['socket'])
+ except (KeyError, TypeError):
+ self.debug('No unix socket specified. Trying TCP/IP socket.')
+ self.unix_socket = None
+ try:
+ self.host = str(self.configuration['host'])
+ except (KeyError, TypeError):
+ self.debug('No host specified. Using: "{0}"'.format(self.host))
+ try:
+ self.port = int(self.configuration['port'])
+ except (KeyError, TypeError):
+ self.debug('No port specified. Using: "{0}"'.format(self.port))
+
+ self.tls = bool(self.configuration.get('tls', self.tls))
+ if self.tls and not _TLS_SUPPORT:
+ self.warning('TLS requested but no TLS module found, disabling TLS support.')
+ self.tls = False
+ if _TLS_SUPPORT and not self.tls:
+ self.debug('No TLS preference specified, not using TLS.')
+
+ if self.tls and _TLS_SUPPORT:
+ self.key = self.configuration.get('tls_key_file')
+ self.cert = self.configuration.get('tls_cert_file')
+ if not self.cert:
+ # If there's not a valid certificate, clear the key too.
+ self.debug('No valid TLS client certificate configuration found.')
+ self.key = None
+ self.cert = None
+ elif not self.key:
+ # If a key isn't listed, the config may still be
+ # valid, because there may be a key attached to the
+ # certificate.
+ self.info('No TLS client key specified, assuming it\'s attached to the certificate.')
+ self.key = None
+
+ try:
+ self.request = str(self.configuration['request'])
+ except (KeyError, TypeError):
+ self.debug('No request specified. Using: "{0}"'.format(self.request))
+
+ self.request = self.request.encode()
+
+ def check(self):
+ self._parse_config()
+ return SimpleService.check(self)
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py
new file mode 100644
index 0000000..1faf036
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import urllib3
+
+from distutils.version import StrictVersion as version
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+try:
+ urllib3.disable_warnings()
+except AttributeError:
+ pass
+
+# https://github.com/urllib3/urllib3/blob/master/CHANGES.rst#19-2014-07-04
+# New retry logic and urllib3.util.retry.Retry configuration object. (Issue https://github.com/urllib3/urllib3/pull/326)
+URLLIB3_MIN_REQUIRED_VERSION = '1.9'
+URLLIB3_VERSION = urllib3.__version__
+URLLIB3 = 'urllib3'
+
+
+def version_check():
+ if version(URLLIB3_VERSION) >= version(URLLIB3_MIN_REQUIRED_VERSION):
+ return
+
+ err = '{0} version: {1}, minimum required version: {2}, please upgrade'.format(
+ URLLIB3,
+ URLLIB3_VERSION,
+ URLLIB3_MIN_REQUIRED_VERSION,
+ )
+ raise Exception(err)
+
+
+class UrlService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ version_check()
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.debug("{0} version: {1}".format(URLLIB3, URLLIB3_VERSION))
+ self.url = self.configuration.get('url')
+ self.user = self.configuration.get('user')
+ self.password = self.configuration.get('pass')
+ self.proxy_user = self.configuration.get('proxy_user')
+ self.proxy_password = self.configuration.get('proxy_pass')
+ self.proxy_url = self.configuration.get('proxy_url')
+ self.method = self.configuration.get('method', 'GET')
+ self.header = self.configuration.get('header')
+ self.body = self.configuration.get('body')
+ self.request_timeout = self.configuration.get('timeout', 1)
+ self.respect_retry_after_header = self.configuration.get('respect_retry_after_header')
+ self.tls_verify = self.configuration.get('tls_verify')
+ self.tls_ca_file = self.configuration.get('tls_ca_file')
+ self.tls_key_file = self.configuration.get('tls_key_file')
+ self.tls_cert_file = self.configuration.get('tls_cert_file')
+ self._manager = None
+
+ def __make_headers(self, **header_kw):
+ user = header_kw.get('user') or self.user
+ password = header_kw.get('pass') or self.password
+ proxy_user = header_kw.get('proxy_user') or self.proxy_user
+ proxy_password = header_kw.get('proxy_pass') or self.proxy_password
+ custom_header = header_kw.get('header') or self.header
+ header_params = dict(keep_alive=True)
+ proxy_header_params = dict()
+ if user and password:
+ header_params['basic_auth'] = '{user}:{password}'.format(user=user,
+ password=password)
+ if proxy_user and proxy_password:
+ proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user,
+ password=proxy_password)
+ try:
+ header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params)
+ except TypeError as error:
+ self.error('build_header() error: {error}'.format(error=error))
+ return None, None
+ else:
+ header.update(custom_header or dict())
+ return header, proxy_header
+
+ def _build_manager(self, **header_kw):
+ header, proxy_header = self.__make_headers(**header_kw)
+ if header is None or proxy_header is None:
+ return None
+ proxy_url = header_kw.get('proxy_url') or self.proxy_url
+ if proxy_url:
+ manager = urllib3.ProxyManager
+ params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header)
+ else:
+ manager = urllib3.PoolManager
+ params = dict(headers=header)
+ tls_cert_file = self.tls_cert_file
+ if tls_cert_file:
+ params['cert_file'] = tls_cert_file
+ # NOTE: key_file is useless without cert_file, but
+ # cert_file may include the key as well.
+ tls_key_file = self.tls_key_file
+ if tls_key_file:
+ params['key_file'] = tls_key_file
+ tls_ca_file = self.tls_ca_file
+ if tls_ca_file:
+ params['ca_certs'] = tls_ca_file
+ try:
+ url = header_kw.get('url') or self.url
+ is_https = url.startswith('https')
+ if skip_tls_verify(is_https, self.tls_verify, tls_ca_file):
+ params['ca_certs'] = None
+ params['cert_reqs'] = 'CERT_NONE'
+ if is_https:
+ params['assert_hostname'] = False
+ return manager(**params)
+ except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error:
+ self.error('build_manager() error:', str(error))
+ return None
+
+ def _get_raw_data(self, url=None, manager=None, **kwargs):
+ """
+ Get raw data from http request
+ :return: str
+ """
+ try:
+ response = self._do_request(url, manager, **kwargs)
+ except Exception as error:
+ self.error('Url: {url}. Error: {error}'.format(url=url or self.url, error=error))
+ return None
+
+ if response.status == 200:
+ if isinstance(response.data, str):
+ return response.data
+ return response.data.decode(errors='ignore')
+ else:
+ self.debug('Url: {url}. Http response status code: {code}'.format(url=url or self.url, code=response.status))
+ return None
+
+ def _get_raw_data_with_status(self, url=None, manager=None, retries=1, redirect=True, **kwargs):
+ """
+ Get status and response body content from http request. Does not catch exceptions
+ :return: int, str
+ """
+ response = self._do_request(url, manager, retries, redirect, **kwargs)
+
+ if isinstance(response.data, str):
+ return response.status, response.data
+ return response.status, response.data.decode(errors='ignore')
+
+ def _do_request(self, url=None, manager=None, retries=1, redirect=True, **kwargs):
+ """
+ Get response from http request. Does not catch exceptions
+ :return: HTTPResponse
+ """
+ url = url or self.url
+ manager = manager or self._manager
+ retry = urllib3.Retry(retries)
+ if hasattr(retry, 'respect_retry_after_header'):
+ retry.respect_retry_after_header = bool(self.respect_retry_after_header)
+
+ if self.body:
+ kwargs['body'] = self.body
+
+ response = manager.request(
+ method=self.method,
+ url=url,
+ timeout=self.request_timeout,
+ retries=retry,
+ headers=manager.headers,
+ redirect=redirect,
+ **kwargs
+ )
+ return response
+
+ def check(self):
+ """
+ Format configuration data and try to connect to server
+ :return: boolean
+ """
+ if not (self.url and isinstance(self.url, str)):
+ self.error('URL is not defined or type is not <str>')
+ return False
+
+ self._manager = self._build_manager()
+ if not self._manager:
+ return False
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error('_get_data() returned no data or type is not <dict>')
+ return False
+
+
+def skip_tls_verify(is_https, tls_verify, tls_ca_file):
+ # default 'tls_verify' value is None
+ # logic is:
+ # - never skip if there is 'tls_ca_file' file
+ # - skip by default for https
+ # - do not skip by default for http
+ if tls_ca_file:
+ return False
+ if is_https and not tls_verify:
+ return True
+ return tls_verify is False
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py
diff --git a/collectors/python.d.plugin/python_modules/bases/__init__.py b/collectors/python.d.plugin/python_modules/bases/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/__init__.py
diff --git a/collectors/python.d.plugin/python_modules/bases/charts.py b/collectors/python.d.plugin/python_modules/bases/charts.py
new file mode 100644
index 0000000..93be43d
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/charts.py
@@ -0,0 +1,414 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from bases.collection import safe_print
+
+CHART_PARAMS = ['type', 'id', 'name', 'title', 'units', 'family', 'context', 'chart_type', 'hidden']
+DIMENSION_PARAMS = ['id', 'name', 'algorithm', 'multiplier', 'divisor', 'hidden']
+VARIABLE_PARAMS = ['id', 'value']
+
+CHART_TYPES = ['line', 'area', 'stacked']
+DIMENSION_ALGORITHMS = ['absolute', 'incremental', 'percentage-of-absolute-row', 'percentage-of-incremental-row']
+
+CHART_BEGIN = 'BEGIN {type}.{id} {since_last}\n'
+CHART_CREATE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \
+ "{chart_type} {priority} {update_every} '{hidden}' 'python.d.plugin' '{module_name}'\n"
+CHART_OBSOLETE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \
+ "{chart_type} {priority} {update_every} '{hidden} obsolete'\n"
+
+DIMENSION_CREATE = "DIMENSION '{id}' '{name}' {algorithm} {multiplier} {divisor} '{hidden} {obsolete}'\n"
+DIMENSION_SET = "SET '{id}' = {value}\n"
+
+CHART_VARIABLE_SET = "VARIABLE CHART '{id}' = {value}\n"
+
+RUNTIME_CHART_CREATE = "CHART netdata.runtime_{job_name} '' 'Execution time for {job_name}' 'ms' 'python.d' " \
+ "netdata.pythond_runtime line 145000 {update_every}\n" \
+ "DIMENSION run_time 'run time' absolute 1 1\n"
+
+
+def create_runtime_chart(func):
+ """
+ Calls a wrapped function, then prints runtime chart to stdout.
+
+ Used as a decorator for SimpleService.create() method.
+ The whole point of making 'create runtime chart' functionality as a decorator was
+ to help users who re-implements create() in theirs classes.
+
+ :param func: class method
+ :return:
+ """
+
+ def wrapper(*args, **kwargs):
+ self = args[0]
+ chart = RUNTIME_CHART_CREATE.format(
+ job_name=self.name,
+ update_every=self._runtime_counters.update_every,
+ )
+ safe_print(chart)
+ ok = func(*args, **kwargs)
+ return ok
+
+ return wrapper
+
+
+class ChartError(Exception):
+ """Base-class for all exceptions raised by this module"""
+
+
+class DuplicateItemError(ChartError):
+ """Occurs when user re-adds a chart or a dimension that has already been added"""
+
+
+class ItemTypeError(ChartError):
+ """Occurs when user passes value of wrong type to Chart, Dimension or ChartVariable class"""
+
+
+class ItemValueError(ChartError):
+ """Occurs when user passes inappropriate value to Chart, Dimension or ChartVariable class"""
+
+
+class Charts:
+ """Represent a collection of charts
+
+ All charts stored in a dict.
+ Chart is a instance of Chart class.
+ Charts adding must be done using Charts.add_chart() method only"""
+
+ def __init__(self, job_name, priority, cleanup, get_update_every, module_name):
+ """
+ :param job_name: <bound method>
+ :param priority: <int>
+ :param get_update_every: <bound method>
+ """
+ self.job_name = job_name
+ self.priority = priority
+ self.cleanup = cleanup
+ self.get_update_every = get_update_every
+ self.module_name = module_name
+ self.charts = dict()
+
+ def __len__(self):
+ return len(self.charts)
+
+ def __iter__(self):
+ return iter(self.charts.values())
+
+ def __repr__(self):
+ return 'Charts({0})'.format(self)
+
+ def __str__(self):
+ return str([chart for chart in self.charts])
+
+ def __contains__(self, item):
+ return item in self.charts
+
+ def __getitem__(self, item):
+ return self.charts[item]
+
+ def __delitem__(self, key):
+ del self.charts[key]
+
+ def __bool__(self):
+ return bool(self.charts)
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def add_chart(self, params):
+ """
+ Create Chart instance and add it to the dict
+
+ Manually adds job name, priority and update_every to params.
+ :param params: <list>
+ :return:
+ """
+ params = [self.job_name()] + params
+ new_chart = Chart(params)
+
+ new_chart.params['update_every'] = self.get_update_every()
+ new_chart.params['priority'] = self.priority
+ new_chart.params['module_name'] = self.module_name
+
+ self.priority += 1
+ self.charts[new_chart.id] = new_chart
+
+ return new_chart
+
+ def active_charts(self):
+ return [chart.id for chart in self if not chart.flags.obsoleted]
+
+
+class Chart:
+ """Represent a chart"""
+
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'chart' must be a list type")
+ if not len(params) >= 8:
+ raise ItemValueError("invalid value for 'chart', must be {0}".format(CHART_PARAMS))
+
+ self.params = dict(zip(CHART_PARAMS, (p or str() for p in params)))
+ self.name = '{type}.{id}'.format(type=self.params['type'],
+ id=self.params['id'])
+ if self.params.get('chart_type') not in CHART_TYPES:
+ self.params['chart_type'] = 'absolute'
+ hidden = str(self.params.get('hidden', ''))
+ self.params['hidden'] = 'hidden' if hidden == 'hidden' else ''
+
+ self.dimensions = list()
+ self.variables = set()
+ self.flags = ChartFlags()
+ self.penalty = 0
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __repr__(self):
+ return 'Chart({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __iter__(self):
+ return iter(self.dimensions)
+
+ def __contains__(self, item):
+ return item in [dimension.id for dimension in self.dimensions]
+
+ def add_variable(self, variable):
+ """
+ :param variable: <list>
+ :return:
+ """
+ self.variables.add(ChartVariable(variable))
+
+ def add_dimension(self, dimension):
+ """
+ :param dimension: <list>
+ :return:
+ """
+ dim = Dimension(dimension)
+
+ if dim.id in self:
+ raise DuplicateItemError("'{dimension}' already in '{chart}' dimensions".format(dimension=dim.id,
+ chart=self.name))
+ self.refresh()
+ self.dimensions.append(dim)
+ return dim
+
+ def del_dimension(self, dimension_id, hide=True):
+ if dimension_id not in self:
+ return
+ idx = self.dimensions.index(dimension_id)
+ dimension = self.dimensions[idx]
+ if hide:
+ dimension.params['hidden'] = 'hidden'
+ dimension.params['obsolete'] = 'obsolete'
+ self.create()
+ self.dimensions.remove(dimension)
+
+ def hide_dimension(self, dimension_id, reverse=False):
+ if dimension_id not in self:
+ return
+ idx = self.dimensions.index(dimension_id)
+ dimension = self.dimensions[idx]
+ dimension.params['hidden'] = 'hidden' if not reverse else str()
+ self.refresh()
+
+ def create(self):
+ """
+ :return:
+ """
+ chart = CHART_CREATE.format(**self.params)
+ dimensions = ''.join([dimension.create() for dimension in self.dimensions])
+ variables = ''.join([var.set(var.value) for var in self.variables if var])
+
+ self.flags.push = False
+ self.flags.created = True
+
+ safe_print(chart + dimensions + variables)
+
+ def can_be_updated(self, data):
+ for dim in self.dimensions:
+ if dim.get_value(data) is not None:
+ return True
+ return False
+
+ def update(self, data, interval):
+ updated_dimensions, updated_variables = str(), str()
+
+ for dim in self.dimensions:
+ value = dim.get_value(data)
+ if value is not None:
+ updated_dimensions += dim.set(value)
+
+ for var in self.variables:
+ value = var.get_value(data)
+ if value is not None:
+ updated_variables += var.set(value)
+
+ if updated_dimensions:
+ since_last = interval if self.flags.updated else 0
+
+ if self.flags.push:
+ self.create()
+
+ chart_begin = CHART_BEGIN.format(type=self.type, id=self.id, since_last=since_last)
+ safe_print(chart_begin, updated_dimensions, updated_variables, 'END\n')
+
+ self.flags.updated = True
+ self.penalty = 0
+ else:
+ self.penalty += 1
+ self.flags.updated = False
+
+ return bool(updated_dimensions)
+
+ def obsolete(self):
+ self.flags.obsoleted = True
+ if self.flags.created:
+ safe_print(CHART_OBSOLETE.format(**self.params))
+
+ def refresh(self):
+ self.penalty = 0
+ self.flags.push = True
+ self.flags.obsoleted = False
+
+
+class Dimension:
+ """Represent a dimension"""
+
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'dimension' must be a list type")
+ if not params:
+ raise ItemValueError("invalid value for 'dimension', must be {0}".format(DIMENSION_PARAMS))
+
+ self.params = dict(zip(DIMENSION_PARAMS, (p or str() for p in params)))
+ self.params['name'] = self.params.get('name') or self.params['id']
+
+ if self.params.get('algorithm') not in DIMENSION_ALGORITHMS:
+ self.params['algorithm'] = 'absolute'
+ if not isinstance(self.params.get('multiplier'), int):
+ self.params['multiplier'] = 1
+ if not isinstance(self.params.get('divisor'), int):
+ self.params['divisor'] = 1
+ self.params.setdefault('hidden', '')
+ self.params.setdefault('obsolete', '')
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __repr__(self):
+ return 'Dimension({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __eq__(self, other):
+ if not isinstance(other, Dimension):
+ return self.id == other
+ return self.id == other.id
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash(repr(self))
+
+ def create(self):
+ return DIMENSION_CREATE.format(**self.params)
+
+ def set(self, value):
+ """
+ :param value: <str>: must be a digit
+ :return:
+ """
+ return DIMENSION_SET.format(id=self.id,
+ value=value)
+
+ def get_value(self, data):
+ try:
+ return int(data[self.id])
+ except (KeyError, TypeError):
+ return None
+
+
+class ChartVariable:
+ """Represent a chart variable"""
+
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'variable' must be a list type")
+ if not params:
+ raise ItemValueError("invalid value for 'variable' must be: {0}".format(VARIABLE_PARAMS))
+
+ self.params = dict(zip(VARIABLE_PARAMS, params))
+ self.params.setdefault('value', None)
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __bool__(self):
+ return self.value is not None
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def __repr__(self):
+ return 'ChartVariable({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __eq__(self, other):
+ if isinstance(other, ChartVariable):
+ return self.id == other.id
+ return False
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash(repr(self))
+
+ def set(self, value):
+ return CHART_VARIABLE_SET.format(id=self.id,
+ value=value)
+
+ def get_value(self, data):
+ try:
+ return int(data[self.id])
+ except (KeyError, TypeError):
+ return None
+
+
+class ChartFlags:
+ def __init__(self):
+ self.push = True
+ self.created = False
+ self.updated = False
+ self.obsoleted = False
diff --git a/collectors/python.d.plugin/python_modules/bases/collection.py b/collectors/python.d.plugin/python_modules/bases/collection.py
new file mode 100644
index 0000000..93bf8cf
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/collection.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import os
+
+from threading import Lock
+
+PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':')
+
+CHART_BEGIN = 'BEGIN {0} {1}\n'
+CHART_CREATE = "CHART {0} '{1}' '{2}' '{3}' '{4}' '{5}' {6} {7} {8}\n"
+DIMENSION_CREATE = "DIMENSION '{0}' '{1}' {2} {3} {4} '{5}'\n"
+DIMENSION_SET = "SET '{0}' = {1}\n"
+
+print_lock = Lock()
+
+
+def setdefault_values(config, base_dict):
+ for key, value in base_dict.items():
+ config.setdefault(key, value)
+ return config
+
+
+def run_and_exit(func):
+ def wrapper(*args, **kwargs):
+ func(*args, **kwargs)
+ exit(1)
+
+ return wrapper
+
+
+def on_try_except_finally(on_except=(None,), on_finally=(None,)):
+ except_func = on_except[0]
+ finally_func = on_finally[0]
+
+ def decorator(func):
+ def wrapper(*args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except Exception:
+ if except_func:
+ except_func(*on_except[1:])
+ finally:
+ if finally_func:
+ finally_func(*on_finally[1:])
+
+ return wrapper
+
+ return decorator
+
+
+def static_vars(**kwargs):
+ def decorate(func):
+ for k in kwargs:
+ setattr(func, k, kwargs[k])
+ return func
+
+ return decorate
+
+
+@on_try_except_finally(on_except=(exit, 1))
+def safe_print(*msg):
+ """
+ :param msg:
+ :return:
+ """
+ print_lock.acquire()
+ print(''.join(msg))
+ print_lock.release()
+
+
+def find_binary(binary):
+ """
+ :param binary: <str>
+ :return:
+ """
+ for directory in PATH:
+ binary_name = os.path.join(directory, binary)
+ if os.path.isfile(binary_name) and os.access(binary_name, os.X_OK):
+ return binary_name
+ return None
+
+
+def read_last_line(f):
+ with open(f, 'rb') as opened:
+ opened.seek(-2, 2)
+ while opened.read(1) != b'\n':
+ opened.seek(-2, 1)
+ if opened.tell() == 0:
+ break
+ result = opened.readline()
+ return result.decode()
+
+
+def unicode_str(arg):
+ """Return the argument as a unicode string.
+
+ The `unicode` function has been removed from Python3 and `str` takes its
+ place. This function is a helper which will try using Python 2's `unicode`
+ and if it doesn't exist, assume we're using Python 3 and use `str`.
+
+ :param arg:
+ :return: <str>
+ """
+ # TODO: fix
+ try:
+ # https://github.com/netdata/netdata/issues/7613
+ if isinstance(arg, unicode):
+ return arg
+ return unicode(arg, errors='ignore')
+ # https://github.com/netdata/netdata/issues/7642
+ except TypeError:
+ return unicode(arg)
+ except NameError:
+ return str(arg)
diff --git a/collectors/python.d.plugin/python_modules/bases/loaders.py b/collectors/python.d.plugin/python_modules/bases/loaders.py
new file mode 100644
index 0000000..095f3a3
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/loaders.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+
+from sys import version_info
+
+PY_VERSION = version_info[:2]
+
+try:
+ if PY_VERSION > (3, 1):
+ from pyyaml3 import SafeLoader as YamlSafeLoader
+ else:
+ from pyyaml2 import SafeLoader as YamlSafeLoader
+except ImportError:
+ from yaml import SafeLoader as YamlSafeLoader
+
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
+
+
+DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map' if PY_VERSION > (3, 1) else u'tag:yaml.org,2002:map'
+
+
+def dict_constructor(loader, node):
+ return OrderedDict(loader.construct_pairs(node))
+
+
+YamlSafeLoader.add_constructor(DEFAULT_MAPPING_TAG, dict_constructor)
+
+
+def load_yaml(stream):
+ loader = YamlSafeLoader(stream)
+ try:
+ return loader.get_single_data()
+ finally:
+ loader.dispose()
+
+
+def load_config(file_name):
+ with open(file_name, 'r') as stream:
+ return load_yaml(stream)
diff --git a/collectors/python.d.plugin/python_modules/bases/loggers.py b/collectors/python.d.plugin/python_modules/bases/loggers.py
new file mode 100644
index 0000000..47f196a
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/loggers.py
@@ -0,0 +1,206 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import logging
+import traceback
+
+from sys import exc_info
+
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
+
+from bases.collection import on_try_except_finally, unicode_str
+
+
+LOGGING_LEVELS = {'CRITICAL': 50,
+ 'ERROR': 40,
+ 'WARNING': 30,
+ 'INFO': 20,
+ 'DEBUG': 10,
+ 'NOTSET': 0}
+
+DEFAULT_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s : %(message)s'
+DEFAULT_LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
+
+PYTHON_D_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s: %(module_name)s[%(job_name)s] : %(message)s'
+PYTHON_D_LOG_NAME = 'python.d'
+
+
+def limiter(log_max_count=30, allowed_in_seconds=60):
+ def on_decorator(func):
+
+ def on_call(*args):
+ current_time = args[0]._runtime_counters.start_mono
+ lc = args[0]._logger_counters
+
+ if lc.logged and lc.logged % log_max_count == 0:
+ if current_time - lc.time_to_compare <= allowed_in_seconds:
+ lc.dropped += 1
+ return
+ lc.time_to_compare = current_time
+
+ lc.logged += 1
+ func(*args)
+
+ return on_call
+ return on_decorator
+
+
+def add_traceback(func):
+ def on_call(*args):
+ self = args[0]
+
+ if not self.log_traceback:
+ func(*args)
+ else:
+ if exc_info()[0]:
+ func(*args)
+ func(self, traceback.format_exc())
+ else:
+ func(*args)
+
+ return on_call
+
+
+class LoggerCounters:
+ def __init__(self):
+ self.logged = 0
+ self.dropped = 0
+ self.time_to_compare = time()
+
+ def __repr__(self):
+ return 'LoggerCounter(logged: {logged}, dropped: {dropped})'.format(logged=self.logged,
+ dropped=self.dropped)
+
+
+class BaseLogger(object):
+ def __init__(self, logger_name, log_fmt=DEFAULT_LOG_LINE_FORMAT, date_fmt=DEFAULT_LOG_TIME_FORMAT,
+ handler=logging.StreamHandler):
+ """
+ :param logger_name: <str>
+ :param log_fmt: <str>
+ :param date_fmt: <str>
+ :param handler: <logging handler>
+ """
+ self.logger = logging.getLogger(logger_name)
+ if not self.has_handlers():
+ self.severity = 'INFO'
+ self.logger.addHandler(handler())
+ self.set_formatter(fmt=log_fmt, date_fmt=date_fmt)
+
+ def __repr__(self):
+ return '<Logger: {name})>'.format(name=self.logger.name)
+
+ def set_formatter(self, fmt, date_fmt=DEFAULT_LOG_TIME_FORMAT):
+ """
+ :param fmt: <str>
+ :param date_fmt: <str>
+ :return:
+ """
+ if self.has_handlers():
+ self.logger.handlers[0].setFormatter(logging.Formatter(fmt=fmt, datefmt=date_fmt))
+
+ def has_handlers(self):
+ return self.logger.handlers
+
+ @property
+ def severity(self):
+ return self.logger.getEffectiveLevel()
+
+ @severity.setter
+ def severity(self, level):
+ """
+ :param level: <str> or <int>
+ :return:
+ """
+ if level in LOGGING_LEVELS:
+ self.logger.setLevel(LOGGING_LEVELS[level])
+
+ def debug(self, *msg, **kwargs):
+ self.logger.debug(' '.join(map(unicode_str, msg)), **kwargs)
+
+ def info(self, *msg, **kwargs):
+ self.logger.info(' '.join(map(unicode_str, msg)), **kwargs)
+
+ def warning(self, *msg, **kwargs):
+ self.logger.warning(' '.join(map(unicode_str, msg)), **kwargs)
+
+ def error(self, *msg, **kwargs):
+ self.logger.error(' '.join(map(unicode_str, msg)), **kwargs)
+
+ def alert(self, *msg, **kwargs):
+ self.logger.critical(' '.join(map(unicode_str, msg)), **kwargs)
+
+ @on_try_except_finally(on_finally=(exit, 1))
+ def fatal(self, *msg, **kwargs):
+ self.logger.critical(' '.join(map(unicode_str, msg)), **kwargs)
+
+
+class PythonDLogger(object):
+ def __init__(self, logger_name=PYTHON_D_LOG_NAME, log_fmt=PYTHON_D_LOG_LINE_FORMAT):
+ """
+ :param logger_name: <str>
+ :param log_fmt: <str>
+ """
+ self.logger = BaseLogger(logger_name, log_fmt=log_fmt)
+ self.module_name = 'plugin'
+ self.job_name = 'main'
+ self._logger_counters = LoggerCounters()
+
+ _LOG_TRACEBACK = False
+
+ @property
+ def log_traceback(self):
+ return PythonDLogger._LOG_TRACEBACK
+
+ @log_traceback.setter
+ def log_traceback(self, value):
+ PythonDLogger._LOG_TRACEBACK = value
+
+ def debug(self, *msg):
+ self.logger.debug(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def info(self, *msg):
+ self.logger.info(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def warning(self, *msg):
+ self.logger.warning(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ @add_traceback
+ def error(self, *msg):
+ self.logger.error(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ @add_traceback
+ def alert(self, *msg):
+ self.logger.alert(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def fatal(self, *msg):
+ self.logger.fatal(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+
+class PythonDLimitedLogger(PythonDLogger):
+ @limiter()
+ def info(self, *msg):
+ PythonDLogger.info(self, *msg)
+
+ @limiter()
+ def warning(self, *msg):
+ PythonDLogger.warning(self, *msg)
+
+ @limiter()
+ def error(self, *msg):
+ PythonDLogger.error(self, *msg)
+
+ @limiter()
+ def alert(self, *msg):
+ PythonDLogger.alert(self, *msg)