diff options
author | Federico Ceratto <federico.ceratto@gmail.com> | 2017-12-19 23:39:21 +0000 |
---|---|---|
committer | Federico Ceratto <federico.ceratto@gmail.com> | 2017-12-19 23:39:21 +0000 |
commit | 61aedf201c2c4bf0e5aa4db32e74f4d860b88593 (patch) | |
tree | bcf4f9a0cd8bc2daf38b2ff9f29bfcc1e5ed8968 /python.d/python_modules | |
parent | New upstream version 1.8.0+dfsg (diff) | |
download | netdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.tar.xz netdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.zip |
New upstream version 1.9.0+dfsgupstream/1.9.0+dfsg
Diffstat (limited to 'python.d/python_modules')
17 files changed, 1865 insertions, 1219 deletions
diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py index 1d5417ec2..7c6e1d2f2 100644 --- a/python.d/python_modules/base.py +++ b/python.d/python_modules/base.py @@ -1,1119 +1,9 @@ # -*- coding: utf-8 -*- -# Description: netdata python modules framework -# Author: Pawel Krupa (paulfantom) - -# Remember: -# ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1 -# Follow PEP8 as much as it is possible -# "check" and "create" CANNOT be blocking. -# "update" CAN be blocking -# "update" function needs to be fast, so follow: -# https://wiki.python.org/moin/PythonSpeed/PerformanceTips -# basically: -# - use local variables wherever it is possible -# - avoid dots in expressions that are executed many times -# - use "join()" instead of "+" -# - use "import" only at the beginning -# -# using ".encode()" in one thread can block other threads as well (only in python2) - -import os -import re -import socket -import time -import threading - -import urllib3 - -from glob import glob -from subprocess import Popen, PIPE -from sys import exc_info - -try: - import MySQLdb - PY_MYSQL = True -except ImportError: - try: - import pymysql as MySQLdb - PY_MYSQL = True - except ImportError: - PY_MYSQL = False - -import msg - - -PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':') -try: - urllib3.disable_warnings() -except AttributeError: - msg.error('urllib3: warnings were not disabled') - - -# class BaseService(threading.Thread): -class SimpleService(threading.Thread): - """ - Prototype of Service class. - Implemented basic functionality to run jobs by `python.d.plugin` - """ - def __init__(self, configuration=None, name=None): - """ - This needs to be initialized in child classes - :param configuration: dict - :param name: str - """ - threading.Thread.__init__(self) - self._data_stream = "" - self.daemon = True - self.retries = 0 - self.retries_left = 0 - self.priority = 140000 - self.update_every = 1 - self.name = name - self.override_name = None - self.chart_name = "" - self._dimensions = [] - self._charts = [] - self.__chart_set = False - self.__first_run = True - self.order = [] - self.definitions = {} - self._data_from_check = dict() - if configuration is None: - self.error("BaseService: no configuration parameters supplied. Cannot create Service.") - raise RuntimeError - else: - self._extract_base_config(configuration) - self.timetable = {} - self.create_timetable() - - # --- BASIC SERVICE CONFIGURATION --- - - def _extract_base_config(self, config): - """ - Get basic parameters to run service - Minimum config: - config = {'update_every':1, - 'priority':100000, - 'retries':0} - :param config: dict - """ - pop = config.pop - try: - self.override_name = pop('name') - except KeyError: - pass - self.update_every = int(pop('update_every')) - self.priority = int(pop('priority')) - self.retries = int(pop('retries')) - self.retries_left = self.retries - self.configuration = config - - def create_timetable(self, freq=None): - """ - Create service timetable. - `freq` is optional - Example: - timetable = {'last': 1466370091.3767564, - 'next': 1466370092, - 'freq': 1} - :param freq: int - """ - if freq is None: - freq = self.update_every - now = time.time() - self.timetable = {'last': now, - 'next': now - (now % freq) + freq, - 'freq': freq} - - # --- THREAD CONFIGURATION --- - - def _run_once(self): - """ - Executes self.update(interval) and draws run time chart. - Return value presents exit status of update() - :return: boolean - """ - t_start = float(time.time()) - chart_name = self.chart_name - - since_last = int((t_start - self.timetable['last']) * 1000000) - if self.__first_run: - since_last = 0 - - if not self.update(since_last): - self.error("update function failed.") - return False - - # draw performance graph - run_time = int((time.time() - t_start) * 1000) - print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % - (self.chart_name, str(since_last), str(run_time))) - - self.debug(chart_name, "updated in", str(run_time), "ms") - self.timetable['last'] = t_start - self.__first_run = False - return True - - def run(self): - """ - Runs job in thread. Handles retries. - Exits when job failed or timed out. - :return: None - """ - step = float(self.timetable['freq']) - penalty = 0 - self.timetable['last'] = float(time.time() - step) - self.debug("starting data collection - update frequency:", str(step), " retries allowed:", str(self.retries)) - while True: # run forever, unless something is wrong - now = float(time.time()) - next = self.timetable['next'] = now - (now % step) + step + penalty - - # it is important to do this in a loop - # sleep() is interruptable - while now < next: - self.debug("sleeping for", str(next - now), "secs to reach frequency of", - str(step), "secs, now:", str(now), " next:", str(next), " penalty:", str(penalty)) - time.sleep(next - now) - now = float(time.time()) - - # do the job - try: - status = self._run_once() - except Exception: - status = False - - if status: - # it is good - self.retries_left = self.retries - penalty = 0 - else: - # it failed - self.retries_left -= 1 - if self.retries_left <= 0: - if penalty == 0: - penalty = float(self.retries * step) / 2 - else: - penalty *= 1.5 - - if penalty > 600: - penalty = 600 - - self.retries_left = self.retries - self.alert("failed to collect data for " + str(self.retries) + - " times - increasing penalty to " + str(penalty) + " sec and trying again") - - else: - self.error("failed to collect data - " + str(self.retries_left) - + " retries left - penalty: " + str(penalty) + " sec") - - # --- CHART --- - - @staticmethod - def _format(*args): - """ - Escape and convert passed arguments. - :param args: anything - :return: list - """ - params = [] - append = params.append - for p in args: - if p is None: - append(p) - continue - if type(p) is not str: - p = str(p) - if ' ' in p: - p = "'" + p + "'" - append(p) - return params - - def _line(self, instruction, *params): - """ - Converts *params to string and joins them with one space between every one. - Result is appended to self._data_stream - :param params: str/int/float - """ - tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params)) - self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp))) - - def chart(self, type_id, name="", title="", units="", family="", - category="", chart_type="line", priority="", update_every=""): - """ - Defines a new chart. - :param type_id: str - :param name: str - :param title: str - :param units: str - :param family: str - :param category: str - :param chart_type: str - :param priority: int/str - :param update_every: int/str - """ - self._charts.append(type_id) - - p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every) - self._line("CHART", *p) - - def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False): - """ - Defines a new dimension for the chart - :param id: str - :param name: str - :param algorithm: str - :param multiplier: int/str - :param divisor: int/str - :param hidden: boolean - :return: - """ - try: - int(multiplier) - except TypeError: - self.error("malformed dimension: multiplier is not a number:", multiplier) - multiplier = 1 - try: - int(divisor) - except TypeError: - self.error("malformed dimension: divisor is not a number:", divisor) - divisor = 1 - if name is None: - name = id - if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"): - algorithm = "absolute" - - self._dimensions.append(str(id)) - if hidden: - p = self._format(id, name, algorithm, multiplier, divisor, "hidden") - else: - p = self._format(id, name, algorithm, multiplier, divisor) - - self._line("DIMENSION", *p) - - def begin(self, type_id, microseconds=0): - """ - Begin data set - :param type_id: str - :param microseconds: int - :return: boolean - """ - if type_id not in self._charts: - self.error("wrong chart type_id:", type_id) - return False - try: - int(microseconds) - except TypeError: - self.error("malformed begin statement: microseconds are not a number:", microseconds) - microseconds = "" - - self._line("BEGIN", type_id, str(microseconds)) - return True - - def set(self, id, value): - """ - Set value to dimension - :param id: str - :param value: int/float - :return: boolean - """ - if id not in self._dimensions: - self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions) - return False - try: - value = str(int(value)) - except TypeError: - self.error("cannot set non-numeric value:", str(value)) - return False - self._line("SET", id, "=", str(value)) - self.__chart_set = True - return True - - def end(self): - if self.__chart_set: - self._line("END") - self.__chart_set = False - else: - pos = self._data_stream.rfind("BEGIN") - self._data_stream = self._data_stream[:pos] - - def commit(self): - """ - Upload new data to netdata. - """ - try: - print(self._data_stream) - except Exception as e: - msg.fatal('cannot send data to netdata:', str(e)) - self._data_stream = "" - - # --- ERROR HANDLING --- - - def error(self, *params): - """ - Show error message on stderr - """ - msg.error(self.chart_name, *params) - - def alert(self, *params): - """ - Show error message on stderr - """ - msg.alert(self.chart_name, *params) - - def debug(self, *params): - """ - Show debug message on stderr - """ - msg.debug(self.chart_name, *params) - - def info(self, *params): - """ - Show information message on stderr - """ - msg.info(self.chart_name, *params) - - # --- MAIN METHODS --- - - def _get_data(self): - """ - Get some data - :return: dict - """ - return {} - - def check(self): - """ - check() prototype - :return: boolean - """ - self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.") - data = self._get_data() - - if data is None: - self.debug("failed to receive data during check().") - return False - - if len(data) == 0: - self.debug("empty data during check().") - return False - - self.debug("successfully received data during check(): '" + str(data) + "'") - return True - - def create(self): - """ - Create charts - :return: boolean - """ - data = self._data_from_check or self._get_data() - if data is None: - self.debug("failed to receive data during create().") - return False - - idx = 0 - for name in self.order: - options = self.definitions[name]['options'] + [self.priority + idx, self.update_every] - self.chart(self.chart_name + "." + name, *options) - # check if server has this datapoint - for line in self.definitions[name]['lines']: - if line[0] in data: - self.dimension(*line) - idx += 1 - - self.commit() - return True - - def update(self, interval): - """ - Update charts - :param interval: int - :return: boolean - """ - data = self._get_data() - if data is None: - self.debug("failed to receive data during update().") - return False - - updated = False - for chart in self.order: - if self.begin(self.chart_name + "." + chart, interval): - updated = True - for dim in self.definitions[chart]['lines']: - try: - self.set(dim[0], data[dim[0]]) - except KeyError: - pass - self.end() - - self.commit() - if not updated: - self.error("no charts to update") - - return updated - - @staticmethod - def find_binary(binary): - try: - if isinstance(binary, str): - binary = os.path.basename(binary) - return next(('/'.join([p, binary]) for p in PATH - if os.path.isfile('/'.join([p, binary])) - and os.access('/'.join([p, binary]), os.X_OK))) - return None - except StopIteration: - return None - - def _add_new_dimension(self, dimension_id, chart_name, dimension=None, algorithm='incremental', - multiplier=1, divisor=1, priority=65000): - """ - :param dimension_id: - :param chart_name: - :param dimension: - :param algorithm: - :param multiplier: - :param divisor: - :param priority: - :return: - """ - if not all([dimension_id not in self._dimensions, - chart_name in self.order, - chart_name in self.definitions]): - return - self._dimensions.append(dimension_id) - dimension_list = list(map(str, [dimension_id, - dimension if dimension else dimension_id, - algorithm, - multiplier, - divisor])) - self.definitions[chart_name]['lines'].append(dimension_list) - add_to_name = self.override_name or self.name - job_name = ('_'.join([self.__module__, re.sub('\s+', '_', add_to_name)]) - if add_to_name != 'None' else self.__module__) - chart = 'CHART {0}.{1} '.format(job_name, chart_name) - options = '"" "{0}" {1} "{2}" {3} {4} '.format(*self.definitions[chart_name]['options'][1:6]) - other = '{0} {1}\n'.format(priority, self.update_every) - new_dimension = "DIMENSION {0}\n".format(' '.join(dimension_list)) - print(chart + options + other + new_dimension) - - -class UrlService(SimpleService): - def __init__(self, configuration=None, name=None): - SimpleService.__init__(self, configuration=configuration, name=name) - self.url = self.configuration.get('url') - self.user = self.configuration.get('user') - self.password = self.configuration.get('pass') - self.proxy_user = self.configuration.get('proxy_user') - self.proxy_password = self.configuration.get('proxy_pass') - self.proxy_url = self.configuration.get('proxy_url') - self._manager = None - - def __make_headers(self, **header_kw): - user = header_kw.get('user') or self.user - password = header_kw.get('pass') or self.password - proxy_user = header_kw.get('proxy_user') or self.proxy_user - proxy_password = header_kw.get('proxy_pass') or self.proxy_password - header_params = dict(keep_alive=True) - proxy_header_params = dict() - if user and password: - header_params['basic_auth'] = '{user}:{password}'.format(user=user, - password=password) - if proxy_user and proxy_password: - proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user, - password=proxy_password) - try: - return urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params) - except TypeError as error: - self.error('build_header() error: {error}'.format(error=error)) - return None, None - - def _build_manager(self, **header_kw): - header, proxy_header = self.__make_headers(**header_kw) - if header is None or proxy_header is None: - return None - proxy_url = header_kw.get('proxy_url') or self.proxy_url - if proxy_url: - manager = urllib3.ProxyManager - params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header) - else: - manager = urllib3.PoolManager - params = dict(headers=header) - try: - return manager(**params) - except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error: - self.error('build_manager() error:', str(error)) - return None - - def _get_raw_data(self, url=None, manager=None): - """ - Get raw data from http request - :return: str - """ - try: - url = url or self.url - manager = manager or self._manager - # TODO: timeout, retries and method hardcoded.. - response = manager.request(method='GET', - url=url, - timeout=1, - retries=1, - headers=manager.headers) - except (urllib3.exceptions.HTTPError, TypeError, AttributeError) as error: - self.error('Url: {url}. Error: {error}'.format(url=url, error=error)) - return None - if response.status == 200: - return response.data.decode() - self.debug('Url: {url}. Http response status code: {code}'.format(url=url, code=response.status)) - return None - - def check(self): - """ - Format configuration data and try to connect to server - :return: boolean - """ - if not (self.url and isinstance(self.url, str)): - self.error('URL is not defined or type is not <str>') - return False - - self._manager = self._build_manager() - if not self._manager: - return False - - try: - data = self._get_data() - except Exception as error: - self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error)) - return False - - if isinstance(data, dict) and data: - self._data_from_check = data - return True - self.error('_get_data() returned no data or type is not <dict>') - return False - - -class SocketService(SimpleService): - def __init__(self, configuration=None, name=None): - self._sock = None - self._keep_alive = False - self.host = "localhost" - self.port = None - self.unix_socket = None - self.request = "" - self.__socket_config = None - self.__empty_request = "".encode() - SimpleService.__init__(self, configuration=configuration, name=name) - - def _socketerror(self, message=None): - if self.unix_socket is not None: - self.error("unix socket '" + self.unix_socket + "':", message) - else: - if self.__socket_config is not None: - af, socktype, proto, canonname, sa = self.__socket_config - self.error("socket to '" + str(sa[0]) + "' port " + str(sa[1]) + ":", message) - else: - self.error("unknown socket:", message) - - def _connect2socket(self, res=None): - """ - Connect to a socket, passing the result of getaddrinfo() - :return: boolean - """ - if res is None: - res = self.__socket_config - if res is None: - self.error("Cannot create socket to 'None':") - return False - - af, socktype, proto, canonname, sa = res - try: - self.debug("creating socket to '" + str(sa[0]) + "', port " + str(sa[1])) - self._sock = socket.socket(af, socktype, proto) - except socket.error as e: - self.error("Failed to create socket to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) - self._sock = None - self.__socket_config = None - return False - - try: - self.debug("connecting socket to '" + str(sa[0]) + "', port " + str(sa[1])) - self._sock.connect(sa) - except socket.error as e: - self.error("Failed to connect to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) - self._disconnect() - self.__socket_config = None - return False - - self.debug("connected to '" + str(sa[0]) + "', port " + str(sa[1])) - self.__socket_config = res - return True - - def _connect2unixsocket(self): - """ - Connect to a unix socket, given its filename - :return: boolean - """ - if self.unix_socket is None: - self.error("cannot connect to unix socket 'None'") - return False - - try: - self.debug("attempting DGRAM unix socket '" + str(self.unix_socket) + "'") - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self._sock.connect(self.unix_socket) - self.debug("connected DGRAM unix socket '" + str(self.unix_socket) + "'") - return True - except socket.error as e: - self.debug("Failed to connect DGRAM unix socket '" + str(self.unix_socket) + "':", str(e)) - - try: - self.debug("attempting STREAM unix socket '" + str(self.unix_socket) + "'") - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._sock.connect(self.unix_socket) - self.debug("connected STREAM unix socket '" + str(self.unix_socket) + "'") - return True - except socket.error as e: - self.debug("Failed to connect STREAM unix socket '" + str(self.unix_socket) + "':", str(e)) - self.error("Failed to connect to unix socket '" + str(self.unix_socket) + "':", str(e)) - self._sock = None - return False - - def _connect(self): - """ - Recreate socket and connect to it since sockets cannot be reused after closing - Available configurations are IPv6, IPv4 or UNIX socket - :return: - """ - try: - if self.unix_socket is not None: - self._connect2unixsocket() - - else: - if self.__socket_config is not None: - self._connect2socket() - else: - for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): - if self._connect2socket(res): break - - except Exception as e: - self._sock = None - self.__socket_config = None - - if self._sock is not None: - self._sock.setblocking(0) - self._sock.settimeout(5) - self.debug("set socket timeout to: " + str(self._sock.gettimeout())) - - def _disconnect(self): - """ - Close socket connection - :return: - """ - if self._sock is not None: - try: - self.debug("closing socket") - self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all - self._sock.close() - except Exception: - pass - self._sock = None - - def _send(self): - """ - Send request. - :return: boolean - """ - # Send request if it is needed - if self.request != self.__empty_request: - try: - self.debug("sending request:", str(self.request)) - self._sock.send(self.request) - except Exception as e: - self._socketerror("error sending request:" + str(e)) - self._disconnect() - return False - return True - - def _receive(self): - """ - Receive data from socket - :return: str - """ - data = "" - while True: - self.debug("receiving response") - try: - buf = self._sock.recv(4096) - except Exception as e: - self._socketerror("failed to receive response:" + str(e)) - self._disconnect() - break - - if buf is None or len(buf) == 0: # handle server disconnect - if data == "": - self._socketerror("unexpectedly disconnected") - else: - self.debug("server closed the connection") - self._disconnect() - break - - self.debug("received data:", str(buf)) - data += buf.decode('utf-8', 'ignore') - if self._check_raw_data(data): - break - - self.debug("final response:", str(data)) - return data - - def _get_raw_data(self): - """ - Get raw data with low-level "socket" module. - :return: str - """ - if self._sock is None: - self._connect() - if self._sock is None: - return None - - # Send request if it is needed - if not self._send(): - return None - - data = self._receive() - - if not self._keep_alive: - self._disconnect() - - return data - - def _check_raw_data(self, data): - """ - Check if all data has been gathered from socket - :param data: str - :return: boolean - """ - return True - - def _parse_config(self): - """ - Parse configuration data - :return: boolean - """ - if self.name is None or self.name == str(None): - self.name = "" - else: - self.name = str(self.name) - - try: - self.unix_socket = str(self.configuration['socket']) - except (KeyError, TypeError): - self.debug("No unix socket specified. Trying TCP/IP socket.") - self.unix_socket = None - try: - self.host = str(self.configuration['host']) - except (KeyError, TypeError): - self.debug("No host specified. Using: '" + self.host + "'") - try: - self.port = int(self.configuration['port']) - except (KeyError, TypeError): - self.debug("No port specified. Using: '" + str(self.port) + "'") - - try: - self.request = str(self.configuration['request']) - except (KeyError, TypeError): - self.debug("No request specified. Using: '" + str(self.request) + "'") - - self.request = self.request.encode() - - def check(self): - self._parse_config() - return SimpleService.check(self) - - -class LogService(SimpleService): - def __init__(self, configuration=None, name=None): - SimpleService.__init__(self, configuration=configuration, name=name) - self.log_path = self.configuration.get('path') - self.__glob_path = self.log_path - self._last_position = 0 - self.retries = 100000 # basically always retry - self.__re_find = dict(current=0, run=0, maximum=60) - - def _get_raw_data(self): - """ - Get log lines since last poll - :return: list - """ - lines = list() - try: - if self.__re_find['current'] == self.__re_find['run']: - self._find_recent_log_file() - size = os.path.getsize(self.log_path) - if size == self._last_position: - self.__re_find['current'] += 1 - return list() # return empty list if nothing has changed - elif size < self._last_position: - self._last_position = 0 # read from beginning if file has shrunk - - with open(self.log_path) as fp: - fp.seek(self._last_position) - for line in fp: - lines.append(line) - self._last_position = fp.tell() - self.__re_find['current'] = 0 - except (OSError, IOError) as error: - self.__re_find['current'] += 1 - self.error(str(error)) - - return lines or None - - def _find_recent_log_file(self): - """ - :return: - """ - self.__re_find['run'] = self.__re_find['maximum'] - self.__re_find['current'] = 0 - self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files - path_list = glob(self.__glob_path) - if path_list: - self.log_path = max(path_list) - return True - return False - - def check(self): - """ - Parse basic configuration and check if log file exists - :return: boolean - """ - if not self.log_path: - self.error("No path to log specified") - return None - - if all([self._find_recent_log_file(), - os.access(self.log_path, os.R_OK), - os.path.isfile(self.log_path)]): - return True - self.error("Cannot access %s" % self.log_path) - return False - - def create(self): - # set cursor at last byte of log file - self._last_position = os.path.getsize(self.log_path) - status = SimpleService.create(self) - # self._last_position = 0 - return status - - -class ExecutableService(SimpleService): - - def __init__(self, configuration=None, name=None): - SimpleService.__init__(self, configuration=configuration, name=name) - self.command = None - - def _get_raw_data(self, stderr=False): - """ - Get raw data from executed command - :return: <list> - """ - try: - p = Popen(self.command, stdout=PIPE, stderr=PIPE) - except Exception as error: - self.error("Executing command", " ".join(self.command), "resulted in error:", str(error)) - return None - data = list() - std = p.stderr if stderr else p.stdout - for line in std.readlines(): - data.append(line.decode()) - - return data or None - - def check(self): - """ - Parse basic configuration, check if command is whitelisted and is returning values - :return: <boolean> - """ - # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified) - if 'command' in self.configuration: - self.command = self.configuration['command'] - - # "command" must be: 1.not None 2. type <str> - if not (self.command and isinstance(self.command, str)): - self.error('Command is not defined or command type is not <str>') - return False - - # Split "command" into: 1. command <str> 2. options <list> - command, opts = self.command.split()[0], self.command.split()[1:] - - # Check for "bad" symbols in options. No pipes, redirects etc. TODO: what is missing? - bad_opts = set(''.join(opts)) & set(['&', '|', ';', '>', '<']) - if bad_opts: - self.error("Bad command argument(s): %s" % bad_opts) - return False - - # Find absolute path ('echo' => '/bin/echo') - if '/' not in command: - command = self.find_binary(command) - if not command: - self.error('Can\'t locate "%s" binary in PATH(%s)' % (self.command, PATH)) - return False - # Check if binary exist and executable - else: - if not (os.path.isfile(command) and os.access(command, os.X_OK)): - self.error('"%s" is not a file or not executable' % command) - return False - - self.command = [command] + opts if opts else [command] - - try: - data = self._get_data() - except Exception as error: - self.error('_get_data() failed. Command: %s. Error: %s' % (self.command, error)) - return False - - if isinstance(data, dict) and data: - # We need this for create() method. No reason to execute get_data() again if result is not empty dict() - self._data_from_check = data - return True - else: - self.error("Command", str(self.command), "returned no data") - return False - - -class MySQLService(SimpleService): - - def __init__(self, configuration=None, name=None): - SimpleService.__init__(self, configuration=configuration, name=name) - self.__connection = None - self.__conn_properties = dict() - self.extra_conn_properties = dict() - self.__queries = self.configuration.get('queries', dict()) - self.queries = dict() - - def __connect(self): - try: - connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties) - except (MySQLdb.MySQLError, TypeError, AttributeError) as error: - return None, str(error) - else: - return connection, None - - def check(self): - def get_connection_properties(conf, extra_conf): - properties = dict() - if conf.get('user'): - properties['user'] = conf['user'] - if conf.get('pass'): - properties['passwd'] = conf['pass'] - if conf.get('socket'): - properties['unix_socket'] = conf['socket'] - elif conf.get('host'): - properties['host'] = conf['host'] - properties['port'] = int(conf.get('port', 3306)) - elif conf.get('my.cnf'): - if MySQLdb.__name__ == 'pymysql': - self.error('"my.cnf" parsing is not working for pymysql') - else: - properties['read_default_file'] = conf['my.cnf'] - if isinstance(extra_conf, dict) and extra_conf: - properties.update(extra_conf) - - return properties or None - - def is_valid_queries_dict(raw_queries, log_error): - """ - :param raw_queries: dict: - :param log_error: function: - :return: dict or None - - raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries) - """ - def is_valid_query(query): - return all([isinstance(query, str), - query.startswith(('SELECT', 'select', 'SHOW', 'show'))]) - - if hasattr(raw_queries, 'keys') and raw_queries: - valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)]) - bad_queries = set(raw_queries) - set(valid_queries) - - if bad_queries: - log_error('Removed query(s): %s' % bad_queries) - return valid_queries - else: - log_error('Unsupported "queries" format. Must be not empty <dict>') - return None - - if not PY_MYSQL: - self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin') - return False - - # Preference: 1. "queries" from the configuration file 2. "queries" from the module - self.queries = self.__queries or self.queries - # Check if "self.queries" exist, not empty and all queries are in valid format - self.queries = is_valid_queries_dict(self.queries, self.error) - if not self.queries: - return None - - # Get connection properties - self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties) - if not self.__conn_properties: - self.error('Connection properties are missing') - return False - - # Create connection to the database - self.__connection, error = self.__connect() - if error: - self.error('Can\'t establish connection to MySQL: %s' % error) - return False - - try: - data = self._get_data() - except Exception as error: - self.error('_get_data() failed. Error: %s' % error) - return False - - if isinstance(data, dict) and data: - # We need this for create() method - self._data_from_check = data - return True - else: - self.error("_get_data() returned no data or type is not <dict>") - return False - - def _get_raw_data(self, description=None): - """ - Get raw data from MySQL server - :return: dict: fetchall() or (fetchall(), description) - """ - - if not self.__connection: - self.__connection, error = self.__connect() - if error: - return None - - raw_data = dict() - queries = dict(self.queries) - try: - with self.__connection as cursor: - for name, query in queries.items(): - try: - cursor.execute(query) - except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error: - if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)): - raise RuntimeError - self.error('Removed query: %s[%s]. Error: %s' - % (name, query, error)) - self.queries.pop(name) - continue - else: - raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall() - self.__connection.commit() - except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError): - self.__connection.close() - self.__connection = None - return None - else: - return raw_data or None - - @staticmethod - def __is_error_critical(err_class, err_text): - return err_class == MySQLdb.OperationalError and all(['denied' not in err_text, - 'Unknown column' not in err_text]) +# Description: backward compatibility with old version + +from bases.FrameworkServices.SimpleService import SimpleService +from bases.FrameworkServices.UrlService import UrlService +from bases.FrameworkServices.SocketService import SocketService +from bases.FrameworkServices.LogService import LogService +from bases.FrameworkServices.ExecutableService import ExecutableService +from bases.FrameworkServices.MySQLService import MySQLService diff --git a/python.d/python_modules/bases/FrameworkServices/ExecutableService.py b/python.d/python_modules/bases/FrameworkServices/ExecutableService.py new file mode 100644 index 000000000..b6d7871fa --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/ExecutableService.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) + +import os + +from subprocess import Popen, PIPE + +from bases.FrameworkServices.SimpleService import SimpleService +from bases.collection import find_binary + + +class ExecutableService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.command = None + + def _get_raw_data(self, stderr=False): + """ + Get raw data from executed command + :return: <list> + """ + try: + p = Popen(self.command, stdout=PIPE, stderr=PIPE) + except Exception as error: + self.error('Executing command {command} resulted in error: {error}'.format(command=self.command, + error=error)) + return None + data = list() + std = p.stderr if stderr else p.stdout + for line in std: + data.append(line.decode()) + + return data or None + + def check(self): + """ + Parse basic configuration, check if command is whitelisted and is returning values + :return: <boolean> + """ + # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified) + if 'command' in self.configuration: + self.command = self.configuration['command'] + + # "command" must be: 1.not None 2. type <str> + if not (self.command and isinstance(self.command, str)): + self.error('Command is not defined or command type is not <str>') + return False + + # Split "command" into: 1. command <str> 2. options <list> + command, opts = self.command.split()[0], self.command.split()[1:] + + # Check for "bad" symbols in options. No pipes, redirects etc. + opts_list = ['&', '|', ';', '>', '<'] + bad_opts = set(''.join(opts)) & set(opts_list) + if bad_opts: + self.error("Bad command argument(s): {opts}".format(opts=bad_opts)) + return False + + # Find absolute path ('echo' => '/bin/echo') + if '/' not in command: + command = find_binary(command) + if not command: + self.error('Can\'t locate "{command}" binary'.format(command=self.command)) + return False + # Check if binary exist and executable + else: + if not os.access(command, os.X_OK): + self.error('"{binary}" is not executable'.format(binary=command)) + return False + + self.command = [command] + opts if opts else [command] + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Command: {command}. Error: {error}'.format(command=self.command, + error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error('Command "{command}" returned no data'.format(command=self.command)) + return False diff --git a/python.d/python_modules/bases/FrameworkServices/LogService.py b/python.d/python_modules/bases/FrameworkServices/LogService.py new file mode 100644 index 000000000..45daa2446 --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/LogService.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) + +from glob import glob +import os + +from bases.FrameworkServices.SimpleService import SimpleService + + +class LogService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.log_path = self.configuration.get('path') + self.__glob_path = self.log_path + self._last_position = 0 + self.__re_find = dict(current=0, run=0, maximum=60) + + def _get_raw_data(self): + """ + Get log lines since last poll + :return: list + """ + lines = list() + try: + if self.__re_find['current'] == self.__re_find['run']: + self._find_recent_log_file() + size = os.path.getsize(self.log_path) + if size == self._last_position: + self.__re_find['current'] += 1 + return list() # return empty list if nothing has changed + elif size < self._last_position: + self._last_position = 0 # read from beginning if file has shrunk + + with open(self.log_path) as fp: + fp.seek(self._last_position) + for line in fp: + lines.append(line) + self._last_position = fp.tell() + self.__re_find['current'] = 0 + except (OSError, IOError) as error: + self.__re_find['current'] += 1 + self.error(str(error)) + + return lines or None + + def _find_recent_log_file(self): + """ + :return: + """ + self.__re_find['run'] = self.__re_find['maximum'] + self.__re_find['current'] = 0 + self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files + path_list = glob(self.__glob_path) + if path_list: + self.log_path = max(path_list) + return True + return False + + def check(self): + """ + Parse basic configuration and check if log file exists + :return: boolean + """ + if not self.log_path: + self.error('No path to log specified') + return None + + if self._find_recent_log_file() and os.access(self.log_path, os.R_OK) and os.path.isfile(self.log_path): + return True + self.error('Cannot access {0}'.format(self.log_path)) + return False + + def create(self): + # set cursor at last byte of log file + self._last_position = os.path.getsize(self.log_path) + status = SimpleService.create(self) + return status diff --git a/python.d/python_modules/bases/FrameworkServices/MySQLService.py b/python.d/python_modules/bases/FrameworkServices/MySQLService.py new file mode 100644 index 000000000..3acc5b109 --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/MySQLService.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (l2isbad) + +from sys import exc_info + +try: + import MySQLdb + + PY_MYSQL = True +except ImportError: + try: + import pymysql as MySQLdb + + PY_MYSQL = True + except ImportError: + PY_MYSQL = False + +from bases.FrameworkServices.SimpleService import SimpleService + + +class MySQLService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.__connection = None + self.__conn_properties = dict() + self.extra_conn_properties = dict() + self.__queries = self.configuration.get('queries', dict()) + self.queries = dict() + + def __connect(self): + try: + connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties) + except (MySQLdb.MySQLError, TypeError, AttributeError) as error: + return None, str(error) + else: + return connection, None + + def check(self): + def get_connection_properties(conf, extra_conf): + properties = dict() + if conf.get('user'): + properties['user'] = conf['user'] + if conf.get('pass'): + properties['passwd'] = conf['pass'] + if conf.get('socket'): + properties['unix_socket'] = conf['socket'] + elif conf.get('host'): + properties['host'] = conf['host'] + properties['port'] = int(conf.get('port', 3306)) + elif conf.get('my.cnf'): + if MySQLdb.__name__ == 'pymysql': + self.error('"my.cnf" parsing is not working for pymysql') + else: + properties['read_default_file'] = conf['my.cnf'] + if isinstance(extra_conf, dict) and extra_conf: + properties.update(extra_conf) + + return properties or None + + def is_valid_queries_dict(raw_queries, log_error): + """ + :param raw_queries: dict: + :param log_error: function: + :return: dict or None + + raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries) + """ + + def is_valid_query(query): + return all([isinstance(query, str), + query.startswith(('SELECT', 'select', 'SHOW', 'show'))]) + + if hasattr(raw_queries, 'keys') and raw_queries: + valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)]) + bad_queries = set(raw_queries) - set(valid_queries) + + if bad_queries: + log_error('Removed query(s): {queries}'.format(queries=bad_queries)) + return valid_queries + else: + log_error('Unsupported "queries" format. Must be not empty <dict>') + return None + + if not PY_MYSQL: + self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin') + return False + + # Preference: 1. "queries" from the configuration file 2. "queries" from the module + self.queries = self.__queries or self.queries + # Check if "self.queries" exist, not empty and all queries are in valid format + self.queries = is_valid_queries_dict(self.queries, self.error) + if not self.queries: + return None + + # Get connection properties + self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties) + if not self.__conn_properties: + self.error('Connection properties are missing') + return False + + # Create connection to the database + self.__connection, error = self.__connect() + if error: + self.error('Can\'t establish connection to MySQL: {error}'.format(error=error)) + return False + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Error: {error}'.format(error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error("_get_data() returned no data or type is not <dict>") + return False + + def _get_raw_data(self, description=None): + """ + Get raw data from MySQL server + :return: dict: fetchall() or (fetchall(), description) + """ + + if not self.__connection: + self.__connection, error = self.__connect() + if error: + return None + + raw_data = dict() + queries = dict(self.queries) + try: + with self.__connection as cursor: + for name, query in queries.items(): + try: + cursor.execute(query) + except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error: + if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)): + raise RuntimeError + self.error('Removed query: {name}[{query}]. Error: error'.format(name=name, + query=query, + error=error)) + self.queries.pop(name) + continue + else: + raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall() + self.__connection.commit() + except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError): + self.__connection.close() + self.__connection = None + return None + else: + return raw_data or None + + @staticmethod + def __is_error_critical(err_class, err_text): + return err_class == MySQLdb.OperationalError and all(['denied' not in err_text, + 'Unknown column' not in err_text]) diff --git a/python.d/python_modules/bases/FrameworkServices/SimpleService.py b/python.d/python_modules/bases/FrameworkServices/SimpleService.py new file mode 100644 index 000000000..14c839101 --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/SimpleService.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) + +from threading import Thread + +try: + from time import sleep, monotonic as time +except ImportError: + from time import sleep, time + +from bases.charts import Charts, ChartError, create_runtime_chart +from bases.collection import OldVersionCompatibility, safe_print +from bases.loggers import PythonDLimitedLogger + +RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \ + 'SET run_time = {elapsed}\n' \ + 'END\n' + + +class RuntimeCounters: + def __init__(self, configuration): + """ + :param configuration: <dict> + """ + self.FREQ = int(configuration.pop('update_every')) + self.START_RUN = 0 + self.NEXT_RUN = 0 + self.PREV_UPDATE = 0 + self.SINCE_UPDATE = 0 + self.ELAPSED = 0 + self.RETRIES = 0 + self.RETRIES_MAX = configuration.pop('retries') + self.PENALTY = 0 + + def is_sleep_time(self): + return self.START_RUN < self.NEXT_RUN + + +class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, object): + """ + Prototype of Service class. + Implemented basic functionality to run jobs by `python.d.plugin` + """ + def __init__(self, configuration, name=''): + """ + :param configuration: <dict> + :param name: <str> + """ + Thread.__init__(self) + self.daemon = True + PythonDLimitedLogger.__init__(self) + OldVersionCompatibility.__init__(self) + self.configuration = configuration + self.order = list() + self.definitions = dict() + + self.module_name = self.__module__ + self.job_name = configuration.pop('job_name') + self.override_name = configuration.pop('override_name') + self.fake_name = None + + self._runtime_counters = RuntimeCounters(configuration=configuration) + self.charts = Charts(job_name=self.actual_name, + priority=configuration.pop('priority'), + cleanup=configuration.pop('chart_cleanup'), + get_update_every=self.get_update_every, + module_name=self.module_name) + + def __repr__(self): + return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__), + name=self.name) + + @property + def name(self): + if self.job_name: + return '_'.join([self.module_name, self.override_name or self.job_name]) + return self.module_name + + def actual_name(self): + return self.fake_name or self.name + + @property + def update_every(self): + return self._runtime_counters.FREQ + + @update_every.setter + def update_every(self, value): + """ + :param value: <int> + :return: + """ + self._runtime_counters.FREQ = value + + def get_update_every(self): + return self.update_every + + def check(self): + """ + check() prototype + :return: boolean + """ + self.debug("job doesn't implement check() method. Using default which simply invokes get_data().") + data = self.get_data() + if data and isinstance(data, dict): + return True + self.debug('returned value is wrong: {0}'.format(data)) + return False + + @create_runtime_chart + def create(self): + for chart_name in self.order: + chart_config = self.definitions.get(chart_name) + + if not chart_config: + self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. " + "Skipping it.".format(chart_name=chart_name)) + continue + + # create chart + chart_params = [chart_name] + chart_config['options'] + try: + self.charts.add_chart(params=chart_params) + except ChartError as error: + self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name, + error=error)) + continue + + # add dimensions to chart + for dimension in chart_config['lines']: + try: + self.charts[chart_name].add_dimension(dimension) + except ChartError as error: + self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension, + error=error)) + continue + + # add variables to chart + if 'variables' in chart_config: + for variable in chart_config['variables']: + try: + self.charts[chart_name].add_variable(variable) + except ChartError as error: + self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable, + error=error)) + continue + + del self.order + del self.definitions + + # True if job has at least 1 chart else False + return bool(self.charts) + + def run(self): + """ + Runs job in thread. Handles retries. + Exits when job failed or timed out. + :return: None + """ + job = self._runtime_counters + self.debug('started, update frequency: {freq}, ' + 'retries: {retries}'.format(freq=job.FREQ, retries=job.RETRIES_MAX - job.RETRIES)) + + while True: + job.START_RUN = time() + + job.NEXT_RUN = job.START_RUN - (job.START_RUN % job.FREQ) + job.FREQ + job.PENALTY + + self.sleep_until_next_run() + + if job.PREV_UPDATE: + job.SINCE_UPDATE = int((job.START_RUN - job.PREV_UPDATE) * 1e6) + + try: + updated = self.update(interval=job.SINCE_UPDATE) + except Exception as error: + self.error('update() unhandled exception: {error}'.format(error=error)) + updated = False + + if not updated: + if not self.manage_retries(): + return + else: + job.ELAPSED = int((time() - job.START_RUN) * 1e3) + job.PREV_UPDATE = job.START_RUN + job.RETRIES, job.PENALTY = 0, 0 + safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name, + since_last=job.SINCE_UPDATE, + elapsed=job.ELAPSED)) + self.debug('update => [{status}] (elapsed time: {elapsed}, ' + 'retries left: {retries})'.format(status='OK' if updated else 'FAILED', + elapsed=job.ELAPSED if updated else '-', + retries=job.RETRIES_MAX - job.RETRIES)) + + def update(self, interval): + """ + :return: + """ + data = self.get_data() + if not data: + self.debug('get_data() returned no data') + return False + elif not isinstance(data, dict): + self.debug('get_data() returned incorrect type data') + return False + + updated = False + + for chart in self.charts: + if chart.flags.obsoleted: + continue + elif self.charts.cleanup and chart.penalty >= self.charts.cleanup: + chart.obsolete() + self.error("chart '{0}' was suppressed due to non updating".format(chart.name)) + continue + + ok = chart.update(data, interval) + if ok: + updated = True + + if not updated: + self.debug('none of the charts has been updated') + + return updated + + def manage_retries(self): + rc = self._runtime_counters + rc.RETRIES += 1 + if rc.RETRIES % 5 == 0: + rc.PENALTY = int(rc.RETRIES * self.update_every / 2) + if rc.RETRIES >= rc.RETRIES_MAX: + self.error('stopped after {0} data collection failures in a row'.format(rc.RETRIES_MAX)) + return False + return True + + def sleep_until_next_run(self): + job = self._runtime_counters + + # sleep() is interruptable + while job.is_sleep_time(): + sleep_time = job.NEXT_RUN - job.START_RUN + self.debug('sleeping for {sleep_time} to reach frequency of {freq} sec'.format(sleep_time=sleep_time, + freq=job.FREQ + job.PENALTY)) + sleep(sleep_time) + job.START_RUN = time() + + def get_data(self): + return self._get_data() + + def _get_data(self): + raise NotImplementedError diff --git a/python.d/python_modules/bases/FrameworkServices/SocketService.py b/python.d/python_modules/bases/FrameworkServices/SocketService.py new file mode 100644 index 000000000..90631df16 --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/SocketService.py @@ -0,0 +1,250 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) + +import socket + +from bases.FrameworkServices.SimpleService import SimpleService + + +class SocketService(SimpleService): + def __init__(self, configuration=None, name=None): + self._sock = None + self._keep_alive = False + self.host = 'localhost' + self.port = None + self.unix_socket = None + self.request = '' + self.__socket_config = None + self.__empty_request = "".encode() + SimpleService.__init__(self, configuration=configuration, name=name) + + def _socket_error(self, message=None): + if self.unix_socket is not None: + self.error('unix socket "{socket}": {message}'.format(socket=self.unix_socket, + message=message)) + else: + if self.__socket_config is not None: + af, sock_type, proto, canon_name, sa = self.__socket_config + self.error('socket to "{address}" port {port}: {message}'.format(address=sa[0], + port=sa[1], + message=message)) + else: + self.error('unknown socket: {0}'.format(message)) + + def _connect2socket(self, res=None): + """ + Connect to a socket, passing the result of getaddrinfo() + :return: boolean + """ + if res is None: + res = self.__socket_config + if res is None: + self.error("Cannot create socket to 'None':") + return False + + af, sock_type, proto, canon_name, sa = res + try: + self.debug('Creating socket to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self._sock = socket.socket(af, sock_type, proto) + except socket.error as error: + self.error('Failed to create socket "{address}", port {port}, error: {error}'.format(address=sa[0], + port=sa[1], + error=error)) + self._sock = None + self.__socket_config = None + return False + + try: + self.debug('connecting socket to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self._sock.connect(sa) + except socket.error as error: + self.error('Failed to connect to "{address}", port {port}, error: {error}'.format(address=sa[0], + port=sa[1], + error=error)) + self._disconnect() + self.__socket_config = None + return False + + self.debug('connected to "{address}", port {port}'.format(address=sa[0], port=sa[1])) + self.__socket_config = res + return True + + def _connect2unixsocket(self): + """ + Connect to a unix socket, given its filename + :return: boolean + """ + if self.unix_socket is None: + self.error("cannot connect to unix socket 'None'") + return False + + try: + self.debug('attempting DGRAM unix socket "{0}"'.format(self.unix_socket)) + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.connect(self.unix_socket) + self.debug('connected DGRAM unix socket "{0}"'.format(self.unix_socket)) + return True + except socket.error as error: + self.debug('Failed to connect DGRAM unix socket "{socket}": {error}'.format(socket=self.unix_socket, + error=error)) + + try: + self.debug('attempting STREAM unix socket "{0}"'.format(self.unix_socket)) + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.connect(self.unix_socket) + self.debug('connected STREAM unix socket "{0}"'.format(self.unix_socket)) + return True + except socket.error as error: + self.debug('Failed to connect STREAM unix socket "{socket}": {error}'.format(socket=self.unix_socket, + error=error)) + self._sock = None + return False + + def _connect(self): + """ + Recreate socket and connect to it since sockets cannot be reused after closing + Available configurations are IPv6, IPv4 or UNIX socket + :return: + """ + try: + if self.unix_socket is not None: + self._connect2unixsocket() + + else: + if self.__socket_config is not None: + self._connect2socket() + else: + for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): + if self._connect2socket(res): + break + + except Exception: + self._sock = None + self.__socket_config = None + + if self._sock is not None: + self._sock.setblocking(0) + self._sock.settimeout(5) + self.debug('set socket timeout to: {0}'.format(self._sock.gettimeout())) + + def _disconnect(self): + """ + Close socket connection + :return: + """ + if self._sock is not None: + try: + self.debug('closing socket') + self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all + self._sock.close() + except Exception: + pass + self._sock = None + + def _send(self): + """ + Send request. + :return: boolean + """ + # Send request if it is needed + if self.request != self.__empty_request: + try: + self.debug('sending request: {0}'.format(self.request)) + self._sock.send(self.request) + except Exception as error: + self._socket_error('error sending request: {0}'.format(error)) + self._disconnect() + return False + return True + + def _receive(self): + """ + Receive data from socket + :return: str + """ + data = "" + while True: + self.debug('receiving response') + try: + buf = self._sock.recv(4096) + except Exception as error: + self._socket_error('failed to receive response: {0}'.format(error)) + self._disconnect() + break + + if buf is None or len(buf) == 0: # handle server disconnect + if data == "": + self._socket_error('unexpectedly disconnected') + else: + self.debug('server closed the connection') + self._disconnect() + break + + self.debug('received data') + data += buf.decode('utf-8', 'ignore') + if self._check_raw_data(data): + break + + self.debug('final response: {0}'.format(data)) + return data + + def _get_raw_data(self): + """ + Get raw data with low-level "socket" module. + :return: str + """ + if self._sock is None: + self._connect() + if self._sock is None: + return None + + # Send request if it is needed + if not self._send(): + return None + + data = self._receive() + + if not self._keep_alive: + self._disconnect() + + return data + + @staticmethod + def _check_raw_data(data): + """ + Check if all data has been gathered from socket + :param data: str + :return: boolean + """ + return bool(data) + + def _parse_config(self): + """ + Parse configuration data + :return: boolean + """ + try: + self.unix_socket = str(self.configuration['socket']) + except (KeyError, TypeError): + self.debug('No unix socket specified. Trying TCP/IP socket.') + self.unix_socket = None + try: + self.host = str(self.configuration['host']) + except (KeyError, TypeError): + self.debug('No host specified. Using: "{0}"'.format(self.host)) + try: + self.port = int(self.configuration['port']) + except (KeyError, TypeError): + self.debug('No port specified. Using: "{0}"'.format(self.port)) + + try: + self.request = str(self.configuration['request']) + except (KeyError, TypeError): + self.debug('No request specified. Using: "{0}"'.format(self.request)) + + self.request = self.request.encode() + + def check(self): + self._parse_config() + return SimpleService.check(self) diff --git a/python.d/python_modules/bases/FrameworkServices/UrlService.py b/python.d/python_modules/bases/FrameworkServices/UrlService.py new file mode 100644 index 000000000..0941ab168 --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/UrlService.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) + +import urllib3 + +from bases.FrameworkServices.SimpleService import SimpleService + +try: + urllib3.disable_warnings() +except AttributeError: + pass + + +class UrlService(SimpleService): + def __init__(self, configuration=None, name=None): + SimpleService.__init__(self, configuration=configuration, name=name) + self.url = self.configuration.get('url') + self.user = self.configuration.get('user') + self.password = self.configuration.get('pass') + self.proxy_user = self.configuration.get('proxy_user') + self.proxy_password = self.configuration.get('proxy_pass') + self.proxy_url = self.configuration.get('proxy_url') + self.header = self.configuration.get('header') + self.request_timeout = self.configuration.get('timeout', 1) + self._manager = None + + def __make_headers(self, **header_kw): + user = header_kw.get('user') or self.user + password = header_kw.get('pass') or self.password + proxy_user = header_kw.get('proxy_user') or self.proxy_user + proxy_password = header_kw.get('proxy_pass') or self.proxy_password + custom_header = header_kw.get('header') or self.header + header_params = dict(keep_alive=True) + proxy_header_params = dict() + if user and password: + header_params['basic_auth'] = '{user}:{password}'.format(user=user, + password=password) + if proxy_user and proxy_password: + proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user, + password=proxy_password) + try: + header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params) + except TypeError as error: + self.error('build_header() error: {error}'.format(error=error)) + return None, None + else: + header.update(custom_header or dict()) + return header, proxy_header + + def _build_manager(self, **header_kw): + header, proxy_header = self.__make_headers(**header_kw) + if header is None or proxy_header is None: + return None + proxy_url = header_kw.get('proxy_url') or self.proxy_url + if proxy_url: + manager = urllib3.ProxyManager + params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header) + else: + manager = urllib3.PoolManager + params = dict(headers=header) + try: + url = header_kw.get('url') or self.url + if url.startswith('https'): + return manager(assert_hostname=False, cert_reqs='CERT_NONE', **params) + return manager(**params) + except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error: + self.error('build_manager() error:', str(error)) + return None + + def _get_raw_data(self, url=None, manager=None): + """ + Get raw data from http request + :return: str + """ + try: + url = url or self.url + manager = manager or self._manager + response = manager.request(method='GET', + url=url, + timeout=self.request_timeout, + retries=1, + headers=manager.headers) + except (urllib3.exceptions.HTTPError, TypeError, AttributeError) as error: + self.error('Url: {url}. Error: {error}'.format(url=url, error=error)) + return None + if response.status == 200: + return response.data.decode() + self.debug('Url: {url}. Http response status code: {code}'.format(url=url, code=response.status)) + return None + + def check(self): + """ + Format configuration data and try to connect to server + :return: boolean + """ + if not (self.url and isinstance(self.url, str)): + self.error('URL is not defined or type is not <str>') + return False + + self._manager = self._build_manager() + if not self._manager: + return False + + try: + data = self._get_data() + except Exception as error: + self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error)) + return False + + if isinstance(data, dict) and data: + return True + self.error('_get_data() returned no data or type is not <dict>') + return False diff --git a/python.d/python_modules/bases/FrameworkServices/__init__.py b/python.d/python_modules/bases/FrameworkServices/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python.d/python_modules/bases/FrameworkServices/__init__.py diff --git a/python.d/python_modules/bases/__init__.py b/python.d/python_modules/bases/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python.d/python_modules/bases/__init__.py diff --git a/python.d/python_modules/bases/charts.py b/python.d/python_modules/bases/charts.py new file mode 100644 index 000000000..1e9348e59 --- /dev/null +++ b/python.d/python_modules/bases/charts.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (l2isbad) + +from bases.collection import safe_print + +CHART_PARAMS = ['type', 'id', 'name', 'title', 'units', 'family', 'context', 'chart_type'] +DIMENSION_PARAMS = ['id', 'name', 'algorithm', 'multiplier', 'divisor', 'hidden'] +VARIABLE_PARAMS = ['id', 'value'] + +CHART_TYPES = ['line', 'area', 'stacked'] +DIMENSION_ALGORITHMS = ['absolute', 'incremental', 'percentage-of-absolute-row', 'percentage-of-incremental-row'] + +CHART_BEGIN = 'BEGIN {type}.{id} {since_last}\n' +CHART_CREATE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \ + "{chart_type} {priority} {update_every} '' 'python.d.plugin' '{module_name}'\n" +CHART_OBSOLETE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \ + "{chart_type} {priority} {update_every} 'obsolete'\n" + + +DIMENSION_CREATE = "DIMENSION '{id}' '{name}' {algorithm} {multiplier} {divisor} '{hidden}'\n" +DIMENSION_SET = "SET '{id}' = {value}\n" + +CHART_VARIABLE_SET = "VARIABLE CHART '{id}' = {value}\n" + +RUNTIME_CHART_CREATE = "CHART netdata.runtime_{job_name} '' 'Execution time for {job_name}' 'ms' 'python.d' " \ + "netdata.pythond_runtime line 145000 {update_every}\n" \ + "DIMENSION run_time 'run time' absolute 1 1\n" + + +def create_runtime_chart(func): + """ + Calls a wrapped function, then prints runtime chart to stdout. + + Used as a decorator for SimpleService.create() method. + The whole point of making 'create runtime chart' functionality as a decorator was + to help users who re-implements create() in theirs classes. + + :param func: class method + :return: + """ + def wrapper(*args, **kwargs): + self = args[0] + ok = func(*args, **kwargs) + if ok: + safe_print(RUNTIME_CHART_CREATE.format(job_name=self.name, + update_every=self._runtime_counters.FREQ)) + return ok + return wrapper + + +class ChartError(Exception): + """Base-class for all exceptions raised by this module""" + + +class DuplicateItemError(ChartError): + """Occurs when user re-adds a chart or a dimension that has already been added""" + + +class ItemTypeError(ChartError): + """Occurs when user passes value of wrong type to Chart, Dimension or ChartVariable class""" + + +class ItemValueError(ChartError): + """Occurs when user passes inappropriate value to Chart, Dimension or ChartVariable class""" + + +class Charts: + """Represent a collection of charts + + All charts stored in a dict. + Chart is a instance of Chart class. + Charts adding must be done using Charts.add_chart() method only""" + def __init__(self, job_name, priority, cleanup, get_update_every, module_name): + """ + :param job_name: <bound method> + :param priority: <int> + :param get_update_every: <bound method> + """ + self.job_name = job_name + self.priority = priority + self.cleanup = cleanup + self.get_update_every = get_update_every + self.module_name = module_name + self.charts = dict() + + def __len__(self): + return len(self.charts) + + def __iter__(self): + return iter(self.charts.values()) + + def __repr__(self): + return 'Charts({0})'.format(self) + + def __str__(self): + return str([chart for chart in self.charts]) + + def __contains__(self, item): + return item in self.charts + + def __getitem__(self, item): + return self.charts[item] + + def __delitem__(self, key): + del self.charts[key] + + def __bool__(self): + return bool(self.charts) + + def __nonzero__(self): + return self.__bool__() + + def add_chart(self, params): + """ + Create Chart instance and add it to the dict + + Manually adds job name, priority and update_every to params. + :param params: <list> + :return: + """ + params = [self.job_name()] + params + new_chart = Chart(params) + + new_chart.params['update_every'] = self.get_update_every() + new_chart.params['priority'] = self.priority + new_chart.params['module_name'] = self.module_name + + self.priority += 1 + self.charts[new_chart.id] = new_chart + + return new_chart + + def active_charts(self): + return [chart.id for chart in self if not chart.flags.obsoleted] + + +class Chart: + """Represent a chart""" + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'chart' must be a list type") + if not len(params) >= 8: + raise ItemValueError("invalid value for 'chart', must be {0}".format(CHART_PARAMS)) + + self.params = dict(zip(CHART_PARAMS, (p or str() for p in params))) + self.name = '{type}.{id}'.format(type=self.params['type'], + id=self.params['id']) + if self.params.get('chart_type') not in CHART_TYPES: + self.params['chart_type'] = 'absolute' + + self.dimensions = list() + self.variables = set() + self.flags = ChartFlags() + self.penalty = 0 + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __repr__(self): + return 'Chart({0})'.format(self.id) + + def __str__(self): + return self.id + + def __iter__(self): + return iter(self.dimensions) + + def __contains__(self, item): + return item in [dimension.id for dimension in self.dimensions] + + def add_variable(self, variable): + """ + :param variable: <list> + :return: + """ + self.variables.add(ChartVariable(variable)) + + def add_dimension(self, dimension): + """ + :param dimension: <list> + :return: + """ + dim = Dimension(dimension) + + if dim.id in self: + raise DuplicateItemError("'{dimension}' already in '{chart}' dimensions".format(dimension=dim.id, + chart=self.name)) + self.refresh() + self.dimensions.append(dim) + return dim + + def hide_dimension(self, dimension_id, reverse=False): + if dimension_id in self: + idx = self.dimensions.index(dimension_id) + dimension = self.dimensions[idx] + dimension.params['hidden'] = 'hidden' if not reverse else str() + self.refresh() + + def create(self): + """ + :return: + """ + chart = CHART_CREATE.format(**self.params) + dimensions = ''.join([dimension.create() for dimension in self.dimensions]) + variables = ''.join([var.set(var.value) for var in self.variables if var]) + + self.flags.push = False + self.flags.created = True + + safe_print(chart + dimensions + variables) + + def update(self, data, interval): + updated_dimensions, updated_variables = str(), str() + + for dim in self.dimensions: + value = dim.get_value(data) + if value is not None: + updated_dimensions += dim.set(value) + + for var in self.variables: + value = var.get_value(data) + if value is not None: + updated_variables += var.set(value) + + if updated_dimensions: + since_last = interval if self.flags.updated else 0 + + if self.flags.push: + self.create() + + chart_begin = CHART_BEGIN.format(type=self.type, id=self.id, since_last=since_last) + safe_print(chart_begin, updated_dimensions, updated_variables, 'END\n') + + self.flags.updated = True + self.penalty = 0 + else: + self.penalty += 1 + self.flags.updated = False + + return bool(updated_dimensions) + + def obsolete(self): + self.flags.obsoleted = True + if self.flags.created: + safe_print(CHART_OBSOLETE.format(**self.params)) + + def refresh(self): + self.penalty = 0 + self.flags.push = True + self.flags.obsoleted = False + + +class Dimension: + """Represent a dimension""" + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'dimension' must be a list type") + if not params: + raise ItemValueError("invalid value for 'dimension', must be {0}".format(DIMENSION_PARAMS)) + + self.params = dict(zip(DIMENSION_PARAMS, (p or str() for p in params))) + self.params['name'] = self.params.get('name') or self.params['id'] + + if self.params.get('algorithm') not in DIMENSION_ALGORITHMS: + self.params['algorithm'] = 'absolute' + if not isinstance(self.params.get('multiplier'), int): + self.params['multiplier'] = 1 + if not isinstance(self.params.get('divisor'), int): + self.params['divisor'] = 1 + self.params.setdefault('hidden', '') + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __repr__(self): + return 'Dimension({0})'.format(self.id) + + def __str__(self): + return self.id + + def __eq__(self, other): + if not isinstance(other, Dimension): + return self.id == other + return self.id == other.id + + def create(self): + return DIMENSION_CREATE.format(**self.params) + + def set(self, value): + """ + :param value: <str>: must be a digit + :return: + """ + return DIMENSION_SET.format(id=self.id, + value=value) + + def get_value(self, data): + try: + return int(data[self.id]) + except (KeyError, TypeError): + return None + + +class ChartVariable: + """Represent a chart variable""" + def __init__(self, params): + """ + :param params: <list> + """ + if not isinstance(params, list): + raise ItemTypeError("'variable' must be a list type") + if not params: + raise ItemValueError("invalid value for 'variable' must be: {0}".format(VARIABLE_PARAMS)) + + self.params = dict(zip(VARIABLE_PARAMS, params)) + self.params.setdefault('value', None) + + def __getattr__(self, item): + try: + return self.params[item] + except KeyError: + raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self), + attr=item)) + + def __bool__(self): + return self.value is not None + + def __nonzero__(self): + return self.__bool__() + + def __repr__(self): + return 'ChartVariable({0})'.format(self.id) + + def __str__(self): + return self.id + + def __eq__(self, other): + if isinstance(other, ChartVariable): + return self.id == other.id + return False + + def __hash__(self): + return hash(repr(self)) + + def set(self, value): + return CHART_VARIABLE_SET.format(id=self.id, + value=value) + + def get_value(self, data): + try: + return int(data[self.id]) + except (KeyError, TypeError): + return None + + +class ChartFlags: + def __init__(self): + self.push = True + self.created = False + self.updated = False + self.obsoleted = False diff --git a/python.d/python_modules/bases/collection.py b/python.d/python_modules/bases/collection.py new file mode 100644 index 000000000..e03b4f58e --- /dev/null +++ b/python.d/python_modules/bases/collection.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (l2isbad) + +import os + +PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':') + +CHART_BEGIN = 'BEGIN {0} {1}\n' +CHART_CREATE = "CHART {0} '{1}' '{2}' '{3}' '{4}' '{5}' {6} {7} {8}\n" +DIMENSION_CREATE = "DIMENSION '{0}' '{1}' {2} {3} {4} '{5}'\n" +DIMENSION_SET = "SET '{0}' = {1}\n" + + +def setdefault_values(config, base_dict): + for key, value in base_dict.items(): + config.setdefault(key, value) + return config + + +def run_and_exit(func): + def wrapper(*args, **kwargs): + func(*args, **kwargs) + exit(1) + return wrapper + + +def on_try_except_finally(on_except=(None, ), on_finally=(None, )): + except_func = on_except[0] + finally_func = on_finally[0] + + def decorator(func): + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + if except_func: + except_func(*on_except[1:]) + finally: + if finally_func: + finally_func(*on_finally[1:]) + return wrapper + return decorator + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +@on_try_except_finally(on_except=(exit, 1)) +def safe_print(*msg): + """ + :param msg: + :return: + """ + print(''.join(msg)) + + +def find_binary(binary): + """ + :param binary: <str> + :return: + """ + for directory in PATH: + binary_name = '/'.join([directory, binary]) + if os.path.isfile(binary_name) and os.access(binary_name, os.X_OK): + return binary_name + return None + + +def read_last_line(f): + with open(f, 'rb') as opened: + opened.seek(-2, 2) + while opened.read(1) != b'\n': + opened.seek(-2, 1) + if opened.tell() == 0: + break + result = opened.readline() + return result.decode() + + +class OldVersionCompatibility: + + def __init__(self): + self._data_stream = str() + + def begin(self, type_id, microseconds=0): + """ + :param type_id: <str> + :param microseconds: <str> or <int>: must be a digit + :return: + """ + self._data_stream += CHART_BEGIN.format(type_id, microseconds) + + def set(self, dim_id, value): + """ + :param dim_id: <str> + :param value: <int> or <str>: must be a digit + :return: + """ + self._data_stream += DIMENSION_SET.format(dim_id, value) + + def end(self): + self._data_stream += 'END\n' + + def chart(self, type_id, name='', title='', units='', family='', category='', chart_type='line', + priority='', update_every=''): + """ + :param type_id: <str> + :param name: <str> + :param title: <str> + :param units: <str> + :param family: <str> + :param category: <str> + :param chart_type: <str> + :param priority: <str> or <int> + :param update_every: <str> or <int> + :return: + """ + self._data_stream += CHART_CREATE.format(type_id, name, title, units, + family, category, chart_type, + priority, update_every) + + def dimension(self, dim_id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False): + """ + :param dim_id: <str> + :param name: <str> or None + :param algorithm: <str> + :param multiplier: <str> or <int>: must be a digit + :param divisor: <str> or <int>: must be a digit + :param hidden: <str>: literally "hidden" or "" + :return: + """ + self._data_stream += DIMENSION_CREATE.format(dim_id, name or dim_id, algorithm, + multiplier, divisor, hidden or str()) + + @on_try_except_finally(on_except=(exit, 1)) + def commit(self): + print(self._data_stream) + self._data_stream = str() diff --git a/python.d/python_modules/bases/loaders.py b/python.d/python_modules/bases/loaders.py new file mode 100644 index 000000000..d18b9dcd0 --- /dev/null +++ b/python.d/python_modules/bases/loaders.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (l2isbad) + +import types +from sys import version_info + +PY_VERSION = version_info[:2] + +if PY_VERSION > (3, 1): + from pyyaml3 import SafeLoader as YamlSafeLoader + from importlib.machinery import SourceFileLoader + DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map' +else: + from pyyaml2 import SafeLoader as YamlSafeLoader + from imp import load_source as SourceFileLoader + DEFAULT_MAPPING_TAG = u'tag:yaml.org,2002:map' + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + + +def dict_constructor(loader, node): + return OrderedDict(loader.construct_pairs(node)) + + +YamlSafeLoader.add_constructor(DEFAULT_MAPPING_TAG, dict_constructor) + + +class YamlOrderedLoader: + @staticmethod + def load_config_from_file(file_name): + opened, loaded = False, False + try: + stream = open(file_name, 'r') + opened = True + loader = YamlSafeLoader(stream) + loaded = True + parsed = loader.get_single_data() or dict() + except Exception as error: + return dict(), error + else: + return parsed, None + finally: + if opened: + stream.close() + if loaded: + loader.dispose() + + +class SourceLoader: + @staticmethod + def load_module_from_file(name, path): + try: + loaded = SourceFileLoader(name, path) + if isinstance(loaded, types.ModuleType): + return loaded, None + return loaded.load_module(), None + except Exception as error: + return None, error + + +class ModuleAndConfigLoader(YamlOrderedLoader, SourceLoader): + pass diff --git a/python.d/python_modules/bases/loggers.py b/python.d/python_modules/bases/loggers.py new file mode 100644 index 000000000..fc40b83d3 --- /dev/null +++ b/python.d/python_modules/bases/loggers.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +# Description: +# Author: Ilya Mashchenko (l2isbad) + +import logging +import traceback + +from sys import exc_info + +try: + from time import monotonic as time +except ImportError: + from time import time + +from bases.collection import on_try_except_finally + + +LOGGING_LEVELS = {'CRITICAL': 50, + 'ERROR': 40, + 'WARNING': 30, + 'INFO': 20, + 'DEBUG': 10, + 'NOTSET': 0} + +DEFAULT_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s : %(message)s' +DEFAULT_LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + +PYTHON_D_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s: %(module_name)s: %(job_name)s: %(message)s' +PYTHON_D_LOG_NAME = 'python.d' + + +def limiter(log_max_count=30, allowed_in_seconds=60): + def on_decorator(func): + + def on_call(*args): + current_time = args[0]._runtime_counters.START_RUN + lc = args[0]._logger_counters + + if lc.logged and lc.logged % log_max_count == 0: + if current_time - lc.time_to_compare <= allowed_in_seconds: + lc.dropped += 1 + return + lc.time_to_compare = current_time + + lc.logged += 1 + func(*args) + + return on_call + return on_decorator + + +def add_traceback(func): + def on_call(*args): + self = args[0] + + if not self.log_traceback: + func(*args) + else: + if exc_info()[0]: + func(*args) + func(self, traceback.format_exc()) + else: + func(*args) + + return on_call + + +class LoggerCounters: + def __init__(self): + self.logged = 0 + self.dropped = 0 + self.time_to_compare = time() + + def __repr__(self): + return 'LoggerCounter(logged: {logged}, dropped: {dropped})'.format(logged=self.logged, + dropped=self.dropped) + + +class BaseLogger(object): + def __init__(self, logger_name, log_fmt=DEFAULT_LOG_LINE_FORMAT, date_fmt=DEFAULT_LOG_TIME_FORMAT, + handler=logging.StreamHandler): + """ + :param logger_name: <str> + :param log_fmt: <str> + :param date_fmt: <str> + :param handler: <logging handler> + """ + self.logger = logging.getLogger(logger_name) + if not self.has_handlers(): + self.severity = 'INFO' + self.logger.addHandler(handler()) + self.set_formatter(fmt=log_fmt, date_fmt=date_fmt) + + def __repr__(self): + return '<Logger: {name})>'.format(name=self.logger.name) + + def set_formatter(self, fmt, date_fmt=DEFAULT_LOG_TIME_FORMAT): + """ + :param fmt: <str> + :param date_fmt: <str> + :return: + """ + if self.has_handlers(): + self.logger.handlers[0].setFormatter(logging.Formatter(fmt=fmt, datefmt=date_fmt)) + + def has_handlers(self): + return self.logger.handlers + + @property + def severity(self): + return self.logger.getEffectiveLevel() + + @severity.setter + def severity(self, level): + """ + :param level: <str> or <int> + :return: + """ + if level in LOGGING_LEVELS: + self.logger.setLevel(LOGGING_LEVELS[level]) + + def debug(self, *msg, **kwargs): + self.logger.debug(' '.join(map(str, msg)), **kwargs) + + def info(self, *msg, **kwargs): + self.logger.info(' '.join(map(str, msg)), **kwargs) + + def warning(self, *msg, **kwargs): + self.logger.warning(' '.join(map(str, msg)), **kwargs) + + def error(self, *msg, **kwargs): + self.logger.error(' '.join(map(str, msg)), **kwargs) + + def alert(self, *msg, **kwargs): + self.logger.critical(' '.join(map(str, msg)), **kwargs) + + @on_try_except_finally(on_finally=(exit, 1)) + def fatal(self, *msg, **kwargs): + self.logger.critical(' '.join(map(str, msg)), **kwargs) + + +class PythonDLogger(object): + def __init__(self, logger_name=PYTHON_D_LOG_NAME, log_fmt=PYTHON_D_LOG_LINE_FORMAT): + """ + :param logger_name: <str> + :param log_fmt: <str> + """ + self.logger = BaseLogger(logger_name, log_fmt=log_fmt) + self.module_name = 'plugin' + self.job_name = 'main' + self._logger_counters = LoggerCounters() + + _LOG_TRACEBACK = False + + @property + def log_traceback(self): + return PythonDLogger._LOG_TRACEBACK + + @log_traceback.setter + def log_traceback(self, value): + PythonDLogger._LOG_TRACEBACK = value + + def debug(self, *msg): + self.logger.debug(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def info(self, *msg): + self.logger.info(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def warning(self, *msg): + self.logger.warning(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + @add_traceback + def error(self, *msg): + self.logger.error(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + @add_traceback + def alert(self, *msg): + self.logger.alert(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + def fatal(self, *msg): + self.logger.fatal(*msg, extra={'module_name': self.module_name, + 'job_name': self.job_name or self.module_name}) + + +class PythonDLimitedLogger(PythonDLogger): + @limiter() + def info(self, *msg): + PythonDLogger.info(self, *msg) + + @limiter() + def warning(self, *msg): + PythonDLogger.warning(self, *msg) + + @limiter() + def error(self, *msg): + PythonDLogger.error(self, *msg) + + @limiter() + def alert(self, *msg): + PythonDLogger.alert(self, *msg) diff --git a/python.d/python_modules/msg.py b/python.d/python_modules/msg.py deleted file mode 100644 index 74716770c..000000000 --- a/python.d/python_modules/msg.py +++ /dev/null @@ -1,101 +0,0 @@ -# -*- coding: utf-8 -*- -# Description: logging for netdata python.d modules - -import traceback -import sys -from time import time, strftime - -DEBUG_FLAG = False -TRACE_FLAG = False -PROGRAM = "" -LOG_COUNTER = 0 -LOG_THROTTLE = 10000 # has to be too big during init -LOG_INTERVAL = 1 # has to be too low during init -LOG_NEXT_CHECK = 0 - -WRITE = sys.stderr.write -FLUSH = sys.stderr.flush - - -def log_msg(msg_type, *args): - """ - Print message on stderr. - :param msg_type: str - """ - global LOG_COUNTER - global LOG_THROTTLE - global LOG_INTERVAL - global LOG_NEXT_CHECK - now = time() - - if not DEBUG_FLAG: - LOG_COUNTER += 1 - - # WRITE("COUNTER " + str(LOG_COUNTER) + " THROTTLE " + str(LOG_THROTTLE) + " INTERVAL " + str(LOG_INTERVAL) + " NOW " + str(now) + " NEXT " + str(LOG_NEXT_CHECK) + "\n") - - if LOG_COUNTER <= LOG_THROTTLE or msg_type == "FATAL" or msg_type == "ALERT": - timestamp = strftime('%Y-%m-%d %X') - msg = "%s: %s %s: %s" % (timestamp, PROGRAM, str(msg_type), " ".join(args)) - WRITE(msg + "\n") - FLUSH() - elif LOG_COUNTER == LOG_THROTTLE + 1: - timestamp = strftime('%Y-%m-%d %X') - msg = "%s: python.d.plugin: throttling further log messages for %s seconds" % (timestamp, str(int(LOG_NEXT_CHECK + 0.5) - int(now))) - WRITE(msg + "\n") - FLUSH() - - if LOG_NEXT_CHECK <= now: - if LOG_COUNTER >= LOG_THROTTLE: - timestamp = strftime('%Y-%m-%d %X') - msg = "%s: python.d.plugin: Prevented %s log messages from displaying" % (timestamp, str(LOG_COUNTER - LOG_THROTTLE)) - WRITE(msg + "\n") - FLUSH() - LOG_NEXT_CHECK = now - (now % LOG_INTERVAL) + LOG_INTERVAL - LOG_COUNTER = 0 - - if TRACE_FLAG: - if msg_type == "FATAL" or msg_type == "ERROR" or msg_type == "ALERT": - traceback.print_exc() - - -def debug(*args): - """ - Print debug message on stderr. - """ - if not DEBUG_FLAG: - return - - log_msg("DEBUG", *args) - - -def error(*args): - """ - Print message on stderr. - """ - log_msg("ERROR", *args) - - -def alert(*args): - """ - Print message on stderr. - """ - log_msg("ALERT", *args) - - -def info(*args): - """ - Print message on stderr. - """ - log_msg("INFO", *args) - - -def fatal(*args): - """ - Print message on stderr and exit. - """ - try: - log_msg("FATAL", *args) - print('DISABLE') - except: - pass - sys.exit(1) diff --git a/python.d/python_modules/third_party/__init__.py b/python.d/python_modules/third_party/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python.d/python_modules/third_party/__init__.py diff --git a/python.d/python_modules/lm_sensors.py b/python.d/python_modules/third_party/lm_sensors.py index 1d868f0e2..1d868f0e2 100644 --- a/python.d/python_modules/lm_sensors.py +++ b/python.d/python_modules/third_party/lm_sensors.py diff --git a/python.d/python_modules/third_party/ordereddict.py b/python.d/python_modules/third_party/ordereddict.py new file mode 100644 index 000000000..d0b97d47c --- /dev/null +++ b/python.d/python_modules/third_party/ordereddict.py @@ -0,0 +1,128 @@ +# Copyright (c) 2009 Raymond Hettinger +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +from UserDict import DictMixin + + +class OrderedDict(dict, DictMixin): + + def __init__(self, *args, **kwds): + if len(args) > 1: + raise TypeError('expected at most 1 arguments, got %d' % len(args)) + try: + self.__end + except AttributeError: + self.clear() + self.update(*args, **kwds) + + def clear(self): + self.__end = end = [] + end += [None, end, end] # sentinel node for doubly linked list + self.__map = {} # key --> [key, prev, next] + dict.clear(self) + + def __setitem__(self, key, value): + if key not in self: + end = self.__end + curr = end[1] + curr[2] = end[1] = self.__map[key] = [key, curr, end] + dict.__setitem__(self, key, value) + + def __delitem__(self, key): + dict.__delitem__(self, key) + key, prev, next = self.__map.pop(key) + prev[2] = next + next[1] = prev + + def __iter__(self): + end = self.__end + curr = end[2] + while curr is not end: + yield curr[0] + curr = curr[2] + + def __reversed__(self): + end = self.__end + curr = end[1] + while curr is not end: + yield curr[0] + curr = curr[1] + + def popitem(self, last=True): + if not self: + raise KeyError('dictionary is empty') + if last: + key = reversed(self).next() + else: + key = iter(self).next() + value = self.pop(key) + return key, value + + def __reduce__(self): + items = [[k, self[k]] for k in self] + tmp = self.__map, self.__end + del self.__map, self.__end + inst_dict = vars(self).copy() + self.__map, self.__end = tmp + if inst_dict: + return self.__class__, (items,), inst_dict + return self.__class__, (items,) + + def keys(self): + return list(self) + + setdefault = DictMixin.setdefault + update = DictMixin.update + pop = DictMixin.pop + values = DictMixin.values + items = DictMixin.items + iterkeys = DictMixin.iterkeys + itervalues = DictMixin.itervalues + iteritems = DictMixin.iteritems + + def __repr__(self): + if not self: + return '%s()' % (self.__class__.__name__,) + return '%s(%r)' % (self.__class__.__name__, self.items()) + + def copy(self): + return self.__class__(self) + + @classmethod + def fromkeys(cls, iterable, value=None): + d = cls() + for key in iterable: + d[key] = value + return d + + def __eq__(self, other): + if isinstance(other, OrderedDict): + if len(self) != len(other): + return False + for p, q in zip(self.items(), other.items()): + if p != q: + return False + return True + return dict.__eq__(self, other) + + def __ne__(self, other): + return not self == other |