diff options
Diffstat (limited to 'collectors/python.d.plugin/python_modules/bases')
12 files changed, 1918 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py new file mode 100644 index 0000000..dea50ee --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/ExecutableService.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +import os + +from subprocess import Popen, PIPE + +from bases.FrameworkServices.SimpleService import SimpleService +from bases.collection import find_binary + + +class ExecutableService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.command = None + + def _get_raw_data(self, stderr=False, command=None): + """ + Get raw data from executed command + :return: <list> + """ + command = command or self.command + self.debug("Executing command '{0}'".format(' '.join(command))) + try: + p = Popen(command, stdout=PIPE, stderr=PIPE) + except Exception as error: + self.error('Executing command {0} resulted in error: {1}'.format(command, error)) + return None + + data = list() + std = p.stderr if stderr else p.stdout + for line in std: + try: + data.append(line.decode('utf-8')) + except TypeError: + continue + + return data + + def check(self): + """ + Parse basic configuration, check if command is whitelisted and is returning values + :return: <boolean> + """ + # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified) + if 'command' in self.configuration: + self.command = self.configuration['command'] + + # "command" must be: 1.not None 2. type <str> + if not (self.command and isinstance(self.command, str)): + self.error('Command is not defined or command type is not <str>') + return False + + # Split "command" into: 1. command <str> 2. options <list> + command, opts = self.command.split()[0], self.command.split()[1:] + + # Check for "bad" symbols in options. No pipes, redirects etc. + opts_list = ['&', '|', ';', '>', '<'] + bad_opts = set(''.join(opts)) & set(opts_list) + if bad_opts: + self.error("Bad command argument(s): {opts}".format(opts=bad_opts)) + return False + + # Find absolute path ('echo' => '/bin/echo') + if '/' not in command: + command = find_binary(command) + if not command: + self.error('Can\'t locate "{command}" binary'.format(command=self.command)) + return False + # Check if binary exist and executable + else: + if not os.access(command, os.X_OK): + self.error('"{binary}" is not executable'.format(binary=command)) + return False + + self.command = [command] + opts if opts else [command] + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Command: {command}. Error: {error}'.format(command=self.command, + error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error('Command "{command}" returned no data'.format(command=self.command)) + return False diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py new file mode 100644 index 0000000..a55e33f --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/LogService.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +from glob import glob +import sys +import os + +from bases.FrameworkServices.SimpleService import SimpleService + + +class LogService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.log_path = self.configuration.get('path') + self.__glob_path = self.log_path + self._last_position = 0 + self.__re_find = dict(current=0, run=0, maximum=60) + self.__open_args = {'errors': 'replace'} if sys.version_info[0] > 2 else {} + + def _get_raw_data(self): + """ + Get log lines since last poll + :return: list + """ + lines = list() + try: + if self.__re_find['current'] == self.__re_find['run']: + self._find_recent_log_file() + size = os.path.getsize(self.log_path) + if size == self._last_position: + self.__re_find['current'] += 1 + return list() # return empty list if nothing has changed + elif size < self._last_position: + self._last_position = 0 # read from beginning if file has shrunk + + with open(self.log_path, **self.__open_args) as fp: + fp.seek(self._last_position) + for line in fp: + lines.append(line) + self._last_position = fp.tell() + self.__re_find['current'] = 0 + except (OSError, IOError) as error: + self.__re_find['current'] += 1 + self.error(str(error)) + + return lines or None + + def _find_recent_log_file(self): + """ + :return: + """ + self.__re_find['run'] = self.__re_find['maximum'] + self.__re_find['current'] = 0 + self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files + path_list = glob(self.__glob_path) + if path_list: + self.log_path = max(path_list) + return True + return False + + def check(self): + """ + Parse basic configuration and check if log file exists + :return: boolean + """ + if not self.log_path: + self.error('No path to log specified') + return None + + if self._find_recent_log_file() and os.access(self.log_path, os.R_OK) and os.path.isfile(self.log_path): + return True + self.error('Cannot access {0}'.format(self.log_path)) + return False + + def create(self): + # set cursor at last byte of log file + self._last_position = os.path.getsize(self.log_path) + status = SimpleService.create(self) + return status diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py new file mode 100644 index 0000000..7f5c7d2 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +from sys import exc_info + +try: + import MySQLdb + + PY_MYSQL = True +except ImportError: + try: + import pymysql as MySQLdb + + PY_MYSQL = True + except ImportError: + PY_MYSQL = False + +from bases.FrameworkServices.SimpleService import SimpleService + + +class MySQLService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.__connection = None + self.__conn_properties = dict() + self.extra_conn_properties = dict() + self.__queries = self.configuration.get('queries', dict()) + self.queries = dict() + + def __connect(self): + try: + connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties) + except (MySQLdb.MySQLError, TypeError, AttributeError) as error: + return None, str(error) + else: + return connection, None + + def check(self): + def get_connection_properties(conf, extra_conf): + properties = dict() + if conf.get('user'): + properties['user'] = conf['user'] + if conf.get('pass'): + properties['passwd'] = conf['pass'] + + if conf.get('socket'): + properties['unix_socket'] = conf['socket'] + elif conf.get('host'): + properties['host'] = conf['host'] + properties['port'] = int(conf.get('port', 3306)) + elif conf.get('my.cnf'): + properties['read_default_file'] = conf['my.cnf'] + + if conf.get('ssl'): + properties['ssl'] = conf['ssl'] + + if isinstance(extra_conf, dict) and extra_conf: + properties.update(extra_conf) + + return properties or None + + def is_valid_queries_dict(raw_queries, log_error): + """ + :param raw_queries: dict: + :param log_error: function: + :return: dict or None + + raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries) + """ + + def is_valid_query(query): + return all([isinstance(query, str), + query.startswith(('SELECT', 'select', 'SHOW', 'show'))]) + + if hasattr(raw_queries, 'keys') and raw_queries: + valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)]) + bad_queries = set(raw_queries) - set(valid_queries) + + if bad_queries: + log_error('Removed query(s): {queries}'.format(queries=bad_queries)) + return valid_queries + else: + log_error('Unsupported "queries" format. Must be not empty <dict>') + return None + + if not PY_MYSQL: + self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin') + return False + + # Preference: 1. "queries" from the configuration file 2. "queries" from the module + self.queries = self.__queries or self.queries + # Check if "self.queries" exist, not empty and all queries are in valid format + self.queries = is_valid_queries_dict(self.queries, self.error) + if not self.queries: + return None + + # Get connection properties + self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties) + if not self.__conn_properties: + self.error('Connection properties are missing') + return False + + # Create connection to the database + self.__connection, error = self.__connect() + if error: + self.error('Can\'t establish connection to MySQL: {error}'.format(error=error)) + return False + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Error: {error}'.format(error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error("_get_data() returned no data or type is not <dict>") + return False + + def _get_raw_data(self, description=None): + """ + Get raw data from MySQL server + :return: dict: fetchall() or (fetchall(), description) + """ + + if not self.__connection: + self.__connection, error = self.__connect() + if error: + return None + + raw_data = dict() + queries = dict(self.queries) + try: + cursor = self.__connection.cursor() + for name, query in queries.items(): + try: + cursor.execute(query) + except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error: + if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)): + cursor.close() + raise RuntimeError + self.error('Removed query: {name}[{query}]. Error: error'.format(name=name, + query=query, + error=error)) + self.queries.pop(name) + continue + else: + raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall() + cursor.close() + self.__connection.commit() + except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError): + self.__connection.close() + self.__connection = None + return None + else: + return raw_data or None + + @staticmethod + def __is_error_critical(err_class, err_text): + return err_class == MySQLdb.OperationalError and all(['denied' not in err_text, + 'Unknown column' not in err_text]) diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py new file mode 100644 index 0000000..c304cce --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + + +from time import sleep, time + +from third_party.monotonic import monotonic + +from bases.charts import Charts, ChartError, create_runtime_chart +from bases.collection import safe_print +from bases.loggers import PythonDLimitedLogger + +RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \ + 'SET run_time = {elapsed}\n' \ + 'END\n' + +PENALTY_EVERY = 5 +MAX_PENALTY = 10 * 60 # 10 minutes + + +class RuntimeCounters: + def __init__(self, configuration): + """ + :param configuration: <dict> + """ + self.update_every = int(configuration.pop('update_every')) + self.do_penalty = configuration.pop('penalty') + + self.start_mono = 0 + self.start_real = 0 + self.retries = 0 + self.penalty = 0 + self.elapsed = 0 + self.prev_update = 0 + + self.runs = 1 + + def calc_next(self): + self.start_mono = monotonic() + return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty + + def sleep_until_next(self): + next_time = self.calc_next() + while self.start_mono < next_time: + sleep(next_time - self.start_mono) + self.start_mono = monotonic() + self.start_real = time() + + def handle_retries(self): + self.retries += 1 + if self.do_penalty and self.retries % PENALTY_EVERY == 0: + self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY)) + + +def clean_module_name(name): + if name.startswith('pythond_'): + return name[8:] + return name + + +class SimpleService(PythonDLimitedLogger, object): + """ + Prototype of Service class. + Implemented basic functionality to run jobs by `python.d.plugin` + """ + + def __init__(self, configuration, name=''): + """ + :param configuration: <dict> + :param name: <str> + """ + PythonDLimitedLogger.__init__(self) + self.configuration = configuration + self.order = list() + self.definitions = dict() + + self.module_name = clean_module_name(self.__module__) + self.job_name = configuration.pop('job_name') + self.override_name = configuration.pop('override_name') + self.fake_name = None + + self._runtime_counters = RuntimeCounters(configuration=configuration) + self.charts = Charts(job_name=self.actual_name, + priority=configuration.pop('priority'), + cleanup=configuration.pop('chart_cleanup'), + get_update_every=self.get_update_every, + module_name=self.module_name) + + def __repr__(self): + return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__), + name=self.name) + + @property + def name(self): + if self.job_name and self.job_name != self.module_name: + return '_'.join([self.module_name, self.override_name or self.job_name]) + return self.module_name + + def actual_name(self): + return self.fake_name or self.name + + @property + def runs_counter(self): + return self._runtime_counters.runs + + @property + def update_every(self): + return self._runtime_counters.update_every + + @update_every.setter + def update_every(self, value): + """ + :param value: <int> + :return: + """ + self._runtime_counters.update_every = value + + def get_update_every(self): + return self.update_every + + def check(self): + """ + check() prototype + :return: boolean + """ + self.debug("job doesn't implement check() method. Using default which simply invokes get_data().") + data = self.get_data() + if data and isinstance(data, dict): + return True + self.debug('returned value is wrong: {0}'.format(data)) + return False + + @create_runtime_chart + def create(self): + for chart_name in self.order: + chart_config = self.definitions.get(chart_name) + + if not chart_config: + self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. " + "Skipping it.".format(chart_name=chart_name)) + continue + + # create chart + chart_params = [chart_name] + chart_config['options'] + try: + self.charts.add_chart(params=chart_params) + except ChartError as error: + self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name, + error=error)) + continue + + # add dimensions to chart + for dimension in chart_config['lines']: + try: + self.charts[chart_name].add_dimension(dimension) + except ChartError as error: + self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension, + error=error)) + continue + + # add variables to chart + if 'variables' in chart_config: + for variable in chart_config['variables']: + try: + self.charts[chart_name].add_variable(variable) + except ChartError as error: + self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable, + error=error)) + continue + + del self.order + del self.definitions + + # True if job has at least 1 chart else False + return bool(self.charts) + + def run(self): + """ + Runs job in thread. Handles retries. + Exits when job failed or timed out. + :return: None + """ + job = self._runtime_counters + self.debug('started, update frequency: {freq}'.format(freq=job.update_every)) + + while True: + job.sleep_until_next() + + since = 0 + if job.prev_update: + since = int((job.start_real - job.prev_update) * 1e6) + + try: + updated = self.update(interval=since) + except Exception as error: + self.error('update() unhandled exception: {error}'.format(error=error)) + updated = False + + job.runs += 1 + + if not updated: + job.handle_retries() + else: + job.elapsed = int((monotonic() - job.start_mono) * 1e3) + job.prev_update = job.start_real + job.retries, job.penalty = 0, 0 + safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name, + since_last=since, + elapsed=job.elapsed)) + self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format( + status='OK' if updated else 'FAILED', + elapsed=job.elapsed if updated else '-', + retries=job.retries)) + + def update(self, interval): + """ + :return: + """ + data = self.get_data() + if not data: + self.debug('get_data() returned no data') + return False + elif not isinstance(data, dict): + self.debug('get_data() returned incorrect type data') + return False + + updated = False + + for chart in self.charts: + if chart.flags.obsoleted: + if chart.can_be_updated(data): + chart.refresh() + else: + continue + elif self.charts.cleanup and chart.penalty >= self.charts.cleanup: + chart.obsolete() + self.info("chart '{0}' was suppressed due to non updating".format(chart.name)) + continue + + ok = chart.update(data, interval) + if ok: + updated = True + + if not updated: + self.debug('none of the charts has been updated') + + return updated + + def get_data(self): + return self._get_data() + + def _get_data(self): + raise NotImplementedError diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py new file mode 100644 index 0000000..d6c7550 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py @@ -0,0 +1,336 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +import errno +import socket + +try: + import ssl +except ImportError: + _TLS_SUPPORT = False +else: + _TLS_SUPPORT = True + +if _TLS_SUPPORT: + try: + PROTOCOL_TLS = ssl.PROTOCOL_TLS + except AttributeError: + PROTOCOL_TLS = ssl.PROTOCOL_SSLv23 + +from bases.FrameworkServices.SimpleService import SimpleService + + +DEFAULT_CONNECT_TIMEOUT = 2.0 +DEFAULT_READ_TIMEOUT = 2.0 +DEFAULT_WRITE_TIMEOUT = 2.0 + + +class SocketService(SimpleService): + def __init__(self, configuration=None, name=None): + self._sock = None + self._keep_alive = False + self.host = 'localhost' + self.port = None + self.unix_socket = None + self.dgram_socket = False + self.request = '' + self.tls = False + self.cert = None + self.key = None + self.__socket_config = None + self.__empty_request = "".encode() + SimpleService.__init__(self, configuration=configuration, name=name) + self.connect_timeout = configuration.get('connect_timeout', DEFAULT_CONNECT_TIMEOUT) + self.read_timeout = configuration.get('read_timeout', DEFAULT_READ_TIMEOUT) + self.write_timeout = configuration.get('write_timeout', DEFAULT_WRITE_TIMEOUT) + + def _socket_error(self, message=None): + if self.unix_socket is not None: + self.error('unix socket "{socket}": {message}'.format(socket=self.unix_socket, + message=message)) + else: + if self.__socket_config is not None: + _, _, _, _, sa = self.__socket_config + self.error('socket to "{address}" port {port}: {message}'.format(address=sa[0], + port=sa[1], + message=message)) + else: + self.error('unknown socket: {0}'.format(message)) + + def _connect2socket(self, res=None): + """ + Connect to a socket, passing the result of getaddrinfo() + :return: boolean + """ + if res is None: + res = self.__socket_config + if res is None: + self.error("Cannot create socket to 'None':") + return False + + af, sock_type, proto, _, sa = res + try: + self.debug('Creating socket to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self._sock = socket.socket(af, sock_type, proto) + except socket.error as error: + self.error('Failed to create socket "{address}", port {port}, error: {error}'.format(address=sa[0], + port=sa[1], + error=error)) + self._sock = None + self.__socket_config = None + return False + + if self.tls: + try: + self.debug('Encapsulating socket with TLS') + self.debug('Using keyfile: {0}, certfile: {1}, cert_reqs: {2}, ssl_version: {3}'.format( + self.key, self.cert, ssl.CERT_NONE, PROTOCOL_TLS + )) + self._sock = ssl.wrap_socket(self._sock, + keyfile=self.key, + certfile=self.cert, + server_side=False, + cert_reqs=ssl.CERT_NONE, + ssl_version=PROTOCOL_TLS, + ) + except (socket.error, ssl.SSLError, IOError, OSError) as error: + self.error('failed to wrap socket : {0}'.format(repr(error))) + self._disconnect() + self.__socket_config = None + return False + + try: + self.debug('connecting socket to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self._sock.settimeout(self.connect_timeout) + self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout())) + self._sock.connect(sa) + except (socket.error, ssl.SSLError) as error: + self.error('Failed to connect to "{address}", port {port}, error: {error}'.format(address=sa[0], + port=sa[1], + error=error)) + self._disconnect() + self.__socket_config = None + return False + + self.debug('connected to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self.__socket_config = res + return True + + def _connect2unixsocket(self): + """ + Connect to a unix socket, given its filename + :return: boolean + """ + if self.unix_socket is None: + self.error("cannot connect to unix socket 'None'") + return False + + try: + self.debug('attempting DGRAM unix socket "{0}"'.format(self.unix_socket)) + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.settimeout(self.connect_timeout) + self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout())) + self._sock.connect(self.unix_socket) + self.debug('connected DGRAM unix socket "{0}"'.format(self.unix_socket)) + return True + except socket.error as error: + self.debug('Failed to connect DGRAM unix socket "{socket}": {error}'.format(socket=self.unix_socket, + error=error)) + + try: + self.debug('attempting STREAM unix socket "{0}"'.format(self.unix_socket)) + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self.connect_timeout) + self.debug('set socket connect timeout to: {0}'.format(self._sock.gettimeout())) + self._sock.connect(self.unix_socket) + self.debug('connected STREAM unix socket "{0}"'.format(self.unix_socket)) + return True + except socket.error as error: + self.debug('Failed to connect STREAM unix socket "{socket}": {error}'.format(socket=self.unix_socket, + error=error)) + self._sock = None + return False + + def _connect(self): + """ + Recreate socket and connect to it since sockets cannot be reused after closing + Available configurations are IPv6, IPv4 or UNIX socket + :return: + """ + try: + if self.unix_socket is not None: + self._connect2unixsocket() + + else: + if self.__socket_config is not None: + self._connect2socket() + else: + if self.dgram_socket: + sock_type = socket.SOCK_DGRAM + else: + sock_type = socket.SOCK_STREAM + for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, sock_type): + if self._connect2socket(res): + break + + except Exception as error: + self.error('unhandled exception during connect : {0}'.format(repr(error))) + self._sock = None + self.__socket_config = None + + def _disconnect(self): + """ + Close socket connection + :return: + """ + if self._sock is not None: + try: + self.debug('closing socket') + self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all + self._sock.close() + except Exception as error: + if not (hasattr(error, 'errno') and error.errno == errno.ENOTCONN): + self.error(error) + self._sock = None + + def _send(self, request=None): + """ + Send request. + :return: boolean + """ + # Send request if it is needed + if self.request != self.__empty_request: + try: + self.debug('set socket write timeout to: {0}'.format(self._sock.gettimeout())) + self._sock.settimeout(self.write_timeout) + self.debug('sending request: {0}'.format(request or self.request)) + self._sock.send(request or self.request) + except Exception as error: + self._socket_error('error sending request: {0}'.format(error)) + self._disconnect() + return False + return True + + def _receive(self, raw=False): + """ + Receive data from socket + :param raw: set `True` to return bytes + :type raw: bool + :return: decoded str or raw bytes + :rtype: str/bytes + """ + data = "" if not raw else b"" + while True: + self.debug('receiving response') + try: + self.debug('set socket read timeout to: {0}'.format(self._sock.gettimeout())) + self._sock.settimeout(self.read_timeout) + buf = self._sock.recv(4096) + except Exception as error: + self._socket_error('failed to receive response: {0}'.format(error)) + self._disconnect() + break + + if buf is None or len(buf) == 0: # handle server disconnect + if data == "" or data == b"": + self._socket_error('unexpectedly disconnected') + else: + self.debug('server closed the connection') + self._disconnect() + break + + self.debug('received data') + data += buf.decode('utf-8', 'ignore') if not raw else buf + if self._check_raw_data(data): + break + + self.debug(u'final response: {0}'.format(data if not raw else u'binary data')) + return data + + def _get_raw_data(self, raw=False, request=None): + """ + Get raw data with low-level "socket" module. + :param raw: set `True` to return bytes + :type raw: bool + :return: decoded data (str) or raw data (bytes) + :rtype: str/bytes + """ + if self._sock is None: + self._connect() + if self._sock is None: + return None + + # Send request if it is needed + if not self._send(request): + return None + + data = self._receive(raw) + + if not self._keep_alive: + self._disconnect() + + return data + + @staticmethod + def _check_raw_data(data): + """ + Check if all data has been gathered from socket + :param data: str + :return: boolean + """ + return bool(data) + + def _parse_config(self): + """ + Parse configuration data + :return: boolean + """ + try: + self.unix_socket = str(self.configuration['socket']) + except (KeyError, TypeError): + self.debug('No unix socket specified. Trying TCP/IP socket.') + self.unix_socket = None + try: + self.host = str(self.configuration['host']) + except (KeyError, TypeError): + self.debug('No host specified. Using: "{0}"'.format(self.host)) + try: + self.port = int(self.configuration['port']) + except (KeyError, TypeError): + self.debug('No port specified. Using: "{0}"'.format(self.port)) + + self.tls = bool(self.configuration.get('tls', self.tls)) + if self.tls and not _TLS_SUPPORT: + self.warning('TLS requested but no TLS module found, disabling TLS support.') + self.tls = False + if _TLS_SUPPORT and not self.tls: + self.debug('No TLS preference specified, not using TLS.') + + if self.tls and _TLS_SUPPORT: + self.key = self.configuration.get('tls_key_file') + self.cert = self.configuration.get('tls_cert_file') + if not self.cert: + # If there's not a valid certificate, clear the key too. + self.debug('No valid TLS client certificate configuration found.') + self.key = None + self.cert = None + elif not self.key: + # If a key isn't listed, the config may still be + # valid, because there may be a key attached to the + # certificate. + self.info('No TLS client key specified, assuming it\'s attached to the certificate.') + self.key = None + + try: + self.request = str(self.configuration['request']) + except (KeyError, TypeError): + self.debug('No request specified. Using: "{0}"'.format(self.request)) + + self.request = self.request.encode() + + def check(self): + self._parse_config() + return SimpleService.check(self) diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py new file mode 100644 index 0000000..1faf036 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +import urllib3 + +from distutils.version import StrictVersion as version + +from bases.FrameworkServices.SimpleService import SimpleService + +try: + urllib3.disable_warnings() +except AttributeError: + pass + +# https://github.com/urllib3/urllib3/blob/master/CHANGES.rst#19-2014-07-04 +# New retry logic and urllib3.util.retry.Retry configuration object. (Issue https://github.com/urllib3/urllib3/pull/326) +URLLIB3_MIN_REQUIRED_VERSION = '1.9' +URLLIB3_VERSION = urllib3.__version__ +URLLIB3 = 'urllib3' + + +def version_check(): + if version(URLLIB3_VERSION) >= version(URLLIB3_MIN_REQUIRED_VERSION): + return + + err = '{0} version: {1}, minimum required version: {2}, please upgrade'.format( + URLLIB3, + URLLIB3_VERSION, + URLLIB3_MIN_REQUIRED_VERSION, + ) + raise Exception(err) + + +class UrlService(SimpleService): + def __init__(self, configuration=None, name=None): + version_check() + SimpleService.__init__(self, configuration=configuration, name=name) + self.debug("{0} version: {1}".format(URLLIB3, URLLIB3_VERSION)) + self.url = self.configuration.get('url') + self.user = self.configuration.get('user') + self.password = self.configuration.get('pass') + self.proxy_user = self.configuration.get('proxy_user') + self.proxy_password = self.configuration.get('proxy_pass') + self.proxy_url = self.configuration.get('proxy_url') + self.method = self.configuration.get('method', 'GET') + self.header = self.configuration.get('header') + self.body = self.configuration.get('body') + self.request_timeout = self.configuration.get('timeout', 1) + self.respect_retry_after_header = self.configuration.get('respect_retry_after_header') + self.tls_verify = self.configuration.get('tls_verify') + self.tls_ca_file = self.configuration.get('tls_ca_file') + self.tls_key_file = self.configuration.get('tls_key_file') + self.tls_cert_file = self.configuration.get('tls_cert_file') + self._manager = None + + def __make_headers(self, **header_kw): + user = header_kw.get('user') or self.user + password = header_kw.get('pass') or self.password + proxy_user = header_kw.get('proxy_user') or self.proxy_user + proxy_password = header_kw.get('proxy_pass') or self.proxy_password + custom_header = header_kw.get('header') or self.header + header_params = dict(keep_alive=True) + proxy_header_params = dict() + if user and password: + header_params['basic_auth'] = '{user}:{password}'.format(user=user, + password=password) + if proxy_user and proxy_password: + proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user, + password=proxy_password) + try: + header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params) + except TypeError as error: + self.error('build_header() error: {error}'.format(error=error)) + return None, None + else: + header.update(custom_header or dict()) + return header, proxy_header + + def _build_manager(self, **header_kw): + header, proxy_header = self.__make_headers(**header_kw) + if header is None or proxy_header is None: + return None + proxy_url = header_kw.get('proxy_url') or self.proxy_url + if proxy_url: + manager = urllib3.ProxyManager + params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header) + else: + manager = urllib3.PoolManager + params = dict(headers=header) + tls_cert_file = self.tls_cert_file + if tls_cert_file: + params['cert_file'] = tls_cert_file + # NOTE: key_file is useless without cert_file, but + # cert_file may include the key as well. + tls_key_file = self.tls_key_file + if tls_key_file: + params['key_file'] = tls_key_file + tls_ca_file = self.tls_ca_file + if tls_ca_file: + params['ca_certs'] = tls_ca_file + try: + url = header_kw.get('url') or self.url + is_https = url.startswith('https') + if skip_tls_verify(is_https, self.tls_verify, tls_ca_file): + params['ca_certs'] = None + params['cert_reqs'] = 'CERT_NONE' + if is_https: + params['assert_hostname'] = False + return manager(**params) + except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error: + self.error('build_manager() error:', str(error)) + return None + + def _get_raw_data(self, url=None, manager=None, **kwargs): + """ + Get raw data from http request + :return: str + """ + try: + response = self._do_request(url, manager, **kwargs) + except Exception as error: + self.error('Url: {url}. Error: {error}'.format(url=url or self.url, error=error)) + return None + + if response.status == 200: + if isinstance(response.data, str): + return response.data + return response.data.decode(errors='ignore') + else: + self.debug('Url: {url}. Http response status code: {code}'.format(url=url or self.url, code=response.status)) + return None + + def _get_raw_data_with_status(self, url=None, manager=None, retries=1, redirect=True, **kwargs): + """ + Get status and response body content from http request. Does not catch exceptions + :return: int, str + """ + response = self._do_request(url, manager, retries, redirect, **kwargs) + + if isinstance(response.data, str): + return response.status, response.data + return response.status, response.data.decode(errors='ignore') + + def _do_request(self, url=None, manager=None, retries=1, redirect=True, **kwargs): + """ + Get response from http request. Does not catch exceptions + :return: HTTPResponse + """ + url = url or self.url + manager = manager or self._manager + retry = urllib3.Retry(retries) + if hasattr(retry, 'respect_retry_after_header'): + retry.respect_retry_after_header = bool(self.respect_retry_after_header) + + if self.body: + kwargs['body'] = self.body + + response = manager.request( + method=self.method, + url=url, + timeout=self.request_timeout, + retries=retry, + headers=manager.headers, + redirect=redirect, + **kwargs + ) + return response + + def check(self): + """ + Format configuration data and try to connect to server + :return: boolean + """ + if not (self.url and isinstance(self.url, str)): + self.error('URL is not defined or type is not <str>') + return False + + self._manager = self._build_manager() + if not self._manager: + return False + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error('_get_data() returned no data or type is not <dict>') + return False + + +def skip_tls_verify(is_https, tls_verify, tls_ca_file): + # default 'tls_verify' value is None + # logic is: + # - never skip if there is 'tls_ca_file' file + # - skip by default for https + # - do not skip by default for http + if tls_ca_file: + return False + if is_https and not tls_verify: + return True + return tls_verify is False diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/__init__.py diff --git a/collectors/python.d.plugin/python_modules/bases/__init__.py b/collectors/python.d.plugin/python_modules/bases/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/__init__.py diff --git a/collectors/python.d.plugin/python_modules/bases/charts.py b/collectors/python.d.plugin/python_modules/bases/charts.py new file mode 100644 index 0000000..93be43d --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/charts.py @@ -0,0 +1,414 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +from bases.collection import safe_print + +CHART_PARAMS = ['type', 'id', 'name', 'title', 'units', 'family', 'context', 'chart_type', 'hidden'] +DIMENSION_PARAMS = ['id', 'name', 'algorithm', 'multiplier', 'divisor', 'hidden'] +VARIABLE_PARAMS = ['id', 'value'] + +CHART_TYPES = ['line', 'area', 'stacked'] +DIMENSION_ALGORITHMS = ['absolute', 'incremental', 'percentage-of-absolute-row', 'percentage-of-incremental-row'] + +CHART_BEGIN = 'BEGIN {type}.{id} {since_last}\n' +CHART_CREATE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \ + "{chart_type} {priority} {update_every} '{hidden}' 'python.d.plugin' '{module_name}'\n" +CHART_OBSOLETE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \ + "{chart_type} {priority} {update_every} '{hidden} obsolete'\n" + +DIMENSION_CREATE = "DIMENSION '{id}' '{name}' {algorithm} {multiplier} {divisor} '{hidden} {obsolete}'\n" +DIMENSION_SET = "SET '{id}' = {value}\n" + +CHART_VARIABLE_SET = "VARIABLE CHART '{id}' = {value}\n" + +RUNTIME_CHART_CREATE = "CHART netdata.runtime_{job_name} '' 'Execution time for {job_name}' 'ms' 'python.d' " \ + "netdata.pythond_runtime line 145000 {update_every}\n" \ + "DIMENSION run_time 'run time' absolute 1 1\n" + + +def create_runtime_chart(func): + """ + Calls a wrapped function, then prints runtime chart to stdout. + + Used as a decorator for SimpleService.create() method. + The whole point of making 'create runtime chart' functionality as a decorator was + to help users who re-implements create() in theirs classes. + + :param func: class method + :return: + """ + + def wrapper(*args, **kwargs): + self = args[0] + chart = RUNTIME_CHART_CREATE.format( + job_name=self.name, + update_every=self._runtime_counters.update_every, + ) + safe_print(chart) + ok = func(*args, **kwargs) + return ok + + return wrapper + + +class ChartError(Exception): + """Base-class for all exceptions raised by this module""" + + +class DuplicateItemError(ChartError): + """Occurs when user re-adds a chart or a dimension that has already been added""" + + +class ItemTypeError(ChartError): + """Occurs when user passes value of wrong type to Chart, Dimension or ChartVariable class""" + + +class ItemValueError(ChartError): + """Occurs when user passes inappropriate value to Chart, Dimension or ChartVariable class""" + + +class Charts: + """Represent a collection of charts + + All charts stored in a dict. + Chart is a instance of Chart class. + Charts adding must be done using Charts.add_chart() method only""" + + def __init__(self, job_name, priority, cleanup, get_update_every, module_name): + """ + :param job_name: <bound method> + :param priority: <int> + :param get_update_every: <bound method> + """ + self.job_name = job_name + self.priority = priority + self.cleanup = cleanup + self.get_update_every = get_update_every + self.module_name = module_name + self.charts = dict() + + def __len__(self): + return len(self.charts) + + def __iter__(self): + return iter(self.charts.values()) + + def __repr__(self): + return 'Charts({0})'.format(self) + + def __str__(self): + return str([chart for chart in self.charts]) + + def __contains__(self, item): + return item in self.charts + + def __getitem__(self, item): + return self.charts[item] + + def __delitem__(self, key): + del self.charts[key] + + def __bool__(self): + return bool(self.charts) + + def __nonzero__(self): + return self.__bool__() + + def add_chart(self, params): + """ + Create Chart instance and add it to the dict + + Manually adds job name, priority and update_every to params. + :param params: <list> + :return: + """ + params = [self.job_name()] + params + new_chart = Chart(params) + + new_chart.params['update_every'] = self.get_update_every() + new_chart.params['priority'] = self.priority + new_chart.params['module_name'] = self.module_name + + self.priority += 1 + self.charts[new_chart.id] = new_chart + + return new_chart + + def active_charts(self): + return [chart.id for chart in self if not chart.flags.obsoleted] + + +class Chart: + """Represent a chart""" + + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'chart' must be a list type") + if not len(params) >= 8: + raise ItemValueError("invalid value for 'chart', must be {0}".format(CHART_PARAMS)) + + self.params = dict(zip(CHART_PARAMS, (p or str() for p in params))) + self.name = '{type}.{id}'.format(type=self.params['type'], + id=self.params['id']) + if self.params.get('chart_type') not in CHART_TYPES: + self.params['chart_type'] = 'absolute' + hidden = str(self.params.get('hidden', '')) + self.params['hidden'] = 'hidden' if hidden == 'hidden' else '' + + self.dimensions = list() + self.variables = set() + self.flags = ChartFlags() + self.penalty = 0 + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __repr__(self): + return 'Chart({0})'.format(self.id) + + def __str__(self): + return self.id + + def __iter__(self): + return iter(self.dimensions) + + def __contains__(self, item): + return item in [dimension.id for dimension in self.dimensions] + + def add_variable(self, variable): + """ + :param variable: <list> + :return: + """ + self.variables.add(ChartVariable(variable)) + + def add_dimension(self, dimension): + """ + :param dimension: <list> + :return: + """ + dim = Dimension(dimension) + + if dim.id in self: + raise DuplicateItemError("'{dimension}' already in '{chart}' dimensions".format(dimension=dim.id, + chart=self.name)) + self.refresh() + self.dimensions.append(dim) + return dim + + def del_dimension(self, dimension_id, hide=True): + if dimension_id not in self: + return + idx = self.dimensions.index(dimension_id) + dimension = self.dimensions[idx] + if hide: + dimension.params['hidden'] = 'hidden' + dimension.params['obsolete'] = 'obsolete' + self.create() + self.dimensions.remove(dimension) + + def hide_dimension(self, dimension_id, reverse=False): + if dimension_id not in self: + return + idx = self.dimensions.index(dimension_id) + dimension = self.dimensions[idx] + dimension.params['hidden'] = 'hidden' if not reverse else str() + self.refresh() + + def create(self): + """ + :return: + """ + chart = CHART_CREATE.format(**self.params) + dimensions = ''.join([dimension.create() for dimension in self.dimensions]) + variables = ''.join([var.set(var.value) for var in self.variables if var]) + + self.flags.push = False + self.flags.created = True + + safe_print(chart + dimensions + variables) + + def can_be_updated(self, data): + for dim in self.dimensions: + if dim.get_value(data) is not None: + return True + return False + + def update(self, data, interval): + updated_dimensions, updated_variables = str(), str() + + for dim in self.dimensions: + value = dim.get_value(data) + if value is not None: + updated_dimensions += dim.set(value) + + for var in self.variables: + value = var.get_value(data) + if value is not None: + updated_variables += var.set(value) + + if updated_dimensions: + since_last = interval if self.flags.updated else 0 + + if self.flags.push: + self.create() + + chart_begin = CHART_BEGIN.format(type=self.type, id=self.id, since_last=since_last) + safe_print(chart_begin, updated_dimensions, updated_variables, 'END\n') + + self.flags.updated = True + self.penalty = 0 + else: + self.penalty += 1 + self.flags.updated = False + + return bool(updated_dimensions) + + def obsolete(self): + self.flags.obsoleted = True + if self.flags.created: + safe_print(CHART_OBSOLETE.format(**self.params)) + + def refresh(self): + self.penalty = 0 + self.flags.push = True + self.flags.obsoleted = False + + +class Dimension: + """Represent a dimension""" + + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'dimension' must be a list type") + if not params: + raise ItemValueError("invalid value for 'dimension', must be {0}".format(DIMENSION_PARAMS)) + + self.params = dict(zip(DIMENSION_PARAMS, (p or str() for p in params))) + self.params['name'] = self.params.get('name') or self.params['id'] + + if self.params.get('algorithm') not in DIMENSION_ALGORITHMS: + self.params['algorithm'] = 'absolute' + if not isinstance(self.params.get('multiplier'), int): + self.params['multiplier'] = 1 + if not isinstance(self.params.get('divisor'), int): + self.params['divisor'] = 1 + self.params.setdefault('hidden', '') + self.params.setdefault('obsolete', '') + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __repr__(self): + return 'Dimension({0})'.format(self.id) + + def __str__(self): + return self.id + + def __eq__(self, other): + if not isinstance(other, Dimension): + return self.id == other + return self.id == other.id + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(repr(self)) + + def create(self): + return DIMENSION_CREATE.format(**self.params) + + def set(self, value): + """ + :param value: <str>: must be a digit + :return: + """ + return DIMENSION_SET.format(id=self.id, + value=value) + + def get_value(self, data): + try: + return int(data[self.id]) + except (KeyError, TypeError): + return None + + +class ChartVariable: + """Represent a chart variable""" + + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'variable' must be a list type") + if not params: + raise ItemValueError("invalid value for 'variable' must be: {0}".format(VARIABLE_PARAMS)) + + self.params = dict(zip(VARIABLE_PARAMS, params)) + self.params.setdefault('value', None) + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __bool__(self): + return self.value is not None + + def __nonzero__(self): + return self.__bool__() + + def __repr__(self): + return 'ChartVariable({0})'.format(self.id) + + def __str__(self): + return self.id + + def __eq__(self, other): + if isinstance(other, ChartVariable): + return self.id == other.id + return False + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(repr(self)) + + def set(self, value): + return CHART_VARIABLE_SET.format(id=self.id, + value=value) + + def get_value(self, data): + try: + return int(data[self.id]) + except (KeyError, TypeError): + return None + + +class ChartFlags: + def __init__(self): + self.push = True + self.created = False + self.updated = False + self.obsoleted = False diff --git a/collectors/python.d.plugin/python_modules/bases/collection.py b/collectors/python.d.plugin/python_modules/bases/collection.py new file mode 100644 index 0000000..93bf8cf --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/collection.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +import os + +from threading import Lock + +PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':') + +CHART_BEGIN = 'BEGIN {0} {1}\n' +CHART_CREATE = "CHART {0} '{1}' '{2}' '{3}' '{4}' '{5}' {6} {7} {8}\n" +DIMENSION_CREATE = "DIMENSION '{0}' '{1}' {2} {3} {4} '{5}'\n" +DIMENSION_SET = "SET '{0}' = {1}\n" + +print_lock = Lock() + + +def setdefault_values(config, base_dict): + for key, value in base_dict.items(): + config.setdefault(key, value) + return config + + +def run_and_exit(func): + def wrapper(*args, **kwargs): + func(*args, **kwargs) + exit(1) + + return wrapper + + +def on_try_except_finally(on_except=(None,), on_finally=(None,)): + except_func = on_except[0] + finally_func = on_finally[0] + + def decorator(func): + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + if except_func: + except_func(*on_except[1:]) + finally: + if finally_func: + finally_func(*on_finally[1:]) + + return wrapper + + return decorator + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + + return decorate + + +@on_try_except_finally(on_except=(exit, 1)) +def safe_print(*msg): + """ + :param msg: + :return: + """ + print_lock.acquire() + print(''.join(msg)) + print_lock.release() + + +def find_binary(binary): + """ + :param binary: <str> + :return: + """ + for directory in PATH: + binary_name = os.path.join(directory, binary) + if os.path.isfile(binary_name) and os.access(binary_name, os.X_OK): + return binary_name + return None + + +def read_last_line(f): + with open(f, 'rb') as opened: + opened.seek(-2, 2) + while opened.read(1) != b'\n': + opened.seek(-2, 1) + if opened.tell() == 0: + break + result = opened.readline() + return result.decode() + + +def unicode_str(arg): + """Return the argument as a unicode string. + + The `unicode` function has been removed from Python3 and `str` takes its + place. This function is a helper which will try using Python 2's `unicode` + and if it doesn't exist, assume we're using Python 3 and use `str`. + + :param arg: + :return: <str> + """ + # TODO: fix + try: + # https://github.com/netdata/netdata/issues/7613 + if isinstance(arg, unicode): + return arg + return unicode(arg, errors='ignore') + # https://github.com/netdata/netdata/issues/7642 + except TypeError: + return unicode(arg) + except NameError: + return str(arg) diff --git a/collectors/python.d.plugin/python_modules/bases/loaders.py b/collectors/python.d.plugin/python_modules/bases/loaders.py new file mode 100644 index 0000000..095f3a3 --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/loaders.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + + +from sys import version_info + +PY_VERSION = version_info[:2] + +try: + if PY_VERSION > (3, 1): + from pyyaml3 import SafeLoader as YamlSafeLoader + else: + from pyyaml2 import SafeLoader as YamlSafeLoader +except ImportError: + from yaml import SafeLoader as YamlSafeLoader + + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + + +DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map' if PY_VERSION > (3, 1) else u'tag:yaml.org,2002:map' + + +def dict_constructor(loader, node): + return OrderedDict(loader.construct_pairs(node)) + + +YamlSafeLoader.add_constructor(DEFAULT_MAPPING_TAG, dict_constructor) + + +def load_yaml(stream): + loader = YamlSafeLoader(stream) + try: + return loader.get_single_data() + finally: + loader.dispose() + + +def load_config(file_name): + with open(file_name, 'r') as stream: + return load_yaml(stream) diff --git a/collectors/python.d.plugin/python_modules/bases/loggers.py b/collectors/python.d.plugin/python_modules/bases/loggers.py new file mode 100644 index 0000000..47f196a --- /dev/null +++ b/collectors/python.d.plugin/python_modules/bases/loggers.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (ilyam8) +# SPDX-License-Identifier: GPL-3.0-or-later + +import logging +import traceback + +from sys import exc_info + +try: + from time import monotonic as time +except ImportError: + from time import time + +from bases.collection import on_try_except_finally, unicode_str + + +LOGGING_LEVELS = {'CRITICAL': 50, + 'ERROR': 40, + 'WARNING': 30, + 'INFO': 20, + 'DEBUG': 10, + 'NOTSET': 0} + +DEFAULT_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s : %(message)s' +DEFAULT_LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + +PYTHON_D_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s: %(module_name)s[%(job_name)s] : %(message)s' +PYTHON_D_LOG_NAME = 'python.d' + + +def limiter(log_max_count=30, allowed_in_seconds=60): + def on_decorator(func): + + def on_call(*args): + current_time = args[0]._runtime_counters.start_mono + lc = args[0]._logger_counters + + if lc.logged and lc.logged % log_max_count == 0: + if current_time - lc.time_to_compare <= allowed_in_seconds: + lc.dropped += 1 + return + lc.time_to_compare = current_time + + lc.logged += 1 + func(*args) + + return on_call + return on_decorator + + +def add_traceback(func): + def on_call(*args): + self = args[0] + + if not self.log_traceback: + func(*args) + else: + if exc_info()[0]: + func(*args) + func(self, traceback.format_exc()) + else: + func(*args) + + return on_call + + +class LoggerCounters: + def __init__(self): + self.logged = 0 + self.dropped = 0 + self.time_to_compare = time() + + def __repr__(self): + return 'LoggerCounter(logged: {logged}, dropped: {dropped})'.format(logged=self.logged, + dropped=self.dropped) + + +class BaseLogger(object): + def __init__(self, logger_name, log_fmt=DEFAULT_LOG_LINE_FORMAT, date_fmt=DEFAULT_LOG_TIME_FORMAT, + handler=logging.StreamHandler): + """ + :param logger_name: <str> + :param log_fmt: <str> + :param date_fmt: <str> + :param handler: <logging handler> + """ + self.logger = logging.getLogger(logger_name) + if not self.has_handlers(): + self.severity = 'INFO' + self.logger.addHandler(handler()) + self.set_formatter(fmt=log_fmt, date_fmt=date_fmt) + + def __repr__(self): + return '<Logger: {name})>'.format(name=self.logger.name) + + def set_formatter(self, fmt, date_fmt=DEFAULT_LOG_TIME_FORMAT): + """ + :param fmt: <str> + :param date_fmt: <str> + :return: + """ + if self.has_handlers(): + self.logger.handlers[0].setFormatter(logging.Formatter(fmt=fmt, datefmt=date_fmt)) + + def has_handlers(self): + return self.logger.handlers + + @property + def severity(self): + return self.logger.getEffectiveLevel() + + @severity.setter + def severity(self, level): + """ + :param level: <str> or <int> + :return: + """ + if level in LOGGING_LEVELS: + self.logger.setLevel(LOGGING_LEVELS[level]) + + def debug(self, *msg, **kwargs): + self.logger.debug(' '.join(map(unicode_str, msg)), **kwargs) + + def info(self, *msg, **kwargs): + self.logger.info(' '.join(map(unicode_str, msg)), **kwargs) + + def warning(self, *msg, **kwargs): + self.logger.warning(' '.join(map(unicode_str, msg)), **kwargs) + + def error(self, *msg, **kwargs): + self.logger.error(' '.join(map(unicode_str, msg)), **kwargs) + + def alert(self, *msg, **kwargs): + self.logger.critical(' '.join(map(unicode_str, msg)), **kwargs) + + @on_try_except_finally(on_finally=(exit, 1)) + def fatal(self, *msg, **kwargs): + self.logger.critical(' '.join(map(unicode_str, msg)), **kwargs) + + +class PythonDLogger(object): + def __init__(self, logger_name=PYTHON_D_LOG_NAME, log_fmt=PYTHON_D_LOG_LINE_FORMAT): + """ + :param logger_name: <str> + :param log_fmt: <str> + """ + self.logger = BaseLogger(logger_name, log_fmt=log_fmt) + self.module_name = 'plugin' + self.job_name = 'main' + self._logger_counters = LoggerCounters() + + _LOG_TRACEBACK = False + + @property + def log_traceback(self): + return PythonDLogger._LOG_TRACEBACK + + @log_traceback.setter + def log_traceback(self, value): + PythonDLogger._LOG_TRACEBACK = value + + def debug(self, *msg): + self.logger.debug(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def info(self, *msg): + self.logger.info(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def warning(self, *msg): + self.logger.warning(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + @add_traceback + def error(self, *msg): + self.logger.error(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + @add_traceback + def alert(self, *msg): + self.logger.alert(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def fatal(self, *msg): + self.logger.fatal(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + +class PythonDLimitedLogger(PythonDLogger): + @limiter() + def info(self, *msg): + PythonDLogger.info(self, *msg) + + @limiter() + def warning(self, *msg): + PythonDLogger.warning(self, *msg) + + @limiter() + def error(self, *msg): + PythonDLogger.error(self, *msg) + + @limiter() + def alert(self, *msg): + PythonDLogger.alert(self, *msg) |