summaryrefslogtreecommitdiffstats
path: root/python.d/python_modules
diff options
context:
space:
mode:
Diffstat (limited to 'python.d/python_modules')
-rw-r--r--python.d/python_modules/base.py1126
-rw-r--r--python.d/python_modules/bases/FrameworkServices/ExecutableService.py85
-rw-r--r--python.d/python_modules/bases/FrameworkServices/LogService.py78
-rw-r--r--python.d/python_modules/bases/FrameworkServices/MySQLService.py158
-rw-r--r--python.d/python_modules/bases/FrameworkServices/SimpleService.py252
-rw-r--r--python.d/python_modules/bases/FrameworkServices/SocketService.py250
-rw-r--r--python.d/python_modules/bases/FrameworkServices/UrlService.py115
-rw-r--r--python.d/python_modules/bases/FrameworkServices/__init__.py0
-rw-r--r--python.d/python_modules/bases/__init__.py0
-rw-r--r--python.d/python_modules/bases/charts.py376
-rw-r--r--python.d/python_modules/bases/collection.py144
-rw-r--r--python.d/python_modules/bases/loaders.py66
-rw-r--r--python.d/python_modules/bases/loggers.py205
-rw-r--r--python.d/python_modules/msg.py101
-rw-r--r--python.d/python_modules/third_party/__init__.py0
-rw-r--r--python.d/python_modules/third_party/lm_sensors.py (renamed from python.d/python_modules/lm_sensors.py)0
-rw-r--r--python.d/python_modules/third_party/ordereddict.py128
17 files changed, 1865 insertions, 1219 deletions
diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py
index 1d5417ec..7c6e1d2f 100644
--- a/python.d/python_modules/base.py
+++ b/python.d/python_modules/base.py
@@ -1,1119 +1,9 @@
# -*- coding: utf-8 -*-
-# Description: netdata python modules framework
-# Author: Pawel Krupa (paulfantom)
-
-# Remember:
-# ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1
-# Follow PEP8 as much as it is possible
-# "check" and "create" CANNOT be blocking.
-# "update" CAN be blocking
-# "update" function needs to be fast, so follow:
-# https://wiki.python.org/moin/PythonSpeed/PerformanceTips
-# basically:
-# - use local variables wherever it is possible
-# - avoid dots in expressions that are executed many times
-# - use "join()" instead of "+"
-# - use "import" only at the beginning
-#
-# using ".encode()" in one thread can block other threads as well (only in python2)
-
-import os
-import re
-import socket
-import time
-import threading
-
-import urllib3
-
-from glob import glob
-from subprocess import Popen, PIPE
-from sys import exc_info
-
-try:
- import MySQLdb
- PY_MYSQL = True
-except ImportError:
- try:
- import pymysql as MySQLdb
- PY_MYSQL = True
- except ImportError:
- PY_MYSQL = False
-
-import msg
-
-
-PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':')
-try:
- urllib3.disable_warnings()
-except AttributeError:
- msg.error('urllib3: warnings were not disabled')
-
-
-# class BaseService(threading.Thread):
-class SimpleService(threading.Thread):
- """
- Prototype of Service class.
- Implemented basic functionality to run jobs by `python.d.plugin`
- """
- def __init__(self, configuration=None, name=None):
- """
- This needs to be initialized in child classes
- :param configuration: dict
- :param name: str
- """
- threading.Thread.__init__(self)
- self._data_stream = ""
- self.daemon = True
- self.retries = 0
- self.retries_left = 0
- self.priority = 140000
- self.update_every = 1
- self.name = name
- self.override_name = None
- self.chart_name = ""
- self._dimensions = []
- self._charts = []
- self.__chart_set = False
- self.__first_run = True
- self.order = []
- self.definitions = {}
- self._data_from_check = dict()
- if configuration is None:
- self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
- raise RuntimeError
- else:
- self._extract_base_config(configuration)
- self.timetable = {}
- self.create_timetable()
-
- # --- BASIC SERVICE CONFIGURATION ---
-
- def _extract_base_config(self, config):
- """
- Get basic parameters to run service
- Minimum config:
- config = {'update_every':1,
- 'priority':100000,
- 'retries':0}
- :param config: dict
- """
- pop = config.pop
- try:
- self.override_name = pop('name')
- except KeyError:
- pass
- self.update_every = int(pop('update_every'))
- self.priority = int(pop('priority'))
- self.retries = int(pop('retries'))
- self.retries_left = self.retries
- self.configuration = config
-
- def create_timetable(self, freq=None):
- """
- Create service timetable.
- `freq` is optional
- Example:
- timetable = {'last': 1466370091.3767564,
- 'next': 1466370092,
- 'freq': 1}
- :param freq: int
- """
- if freq is None:
- freq = self.update_every
- now = time.time()
- self.timetable = {'last': now,
- 'next': now - (now % freq) + freq,
- 'freq': freq}
-
- # --- THREAD CONFIGURATION ---
-
- def _run_once(self):
- """
- Executes self.update(interval) and draws run time chart.
- Return value presents exit status of update()
- :return: boolean
- """
- t_start = float(time.time())
- chart_name = self.chart_name
-
- since_last = int((t_start - self.timetable['last']) * 1000000)
- if self.__first_run:
- since_last = 0
-
- if not self.update(since_last):
- self.error("update function failed.")
- return False
-
- # draw performance graph
- run_time = int((time.time() - t_start) * 1000)
- print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
- (self.chart_name, str(since_last), str(run_time)))
-
- self.debug(chart_name, "updated in", str(run_time), "ms")
- self.timetable['last'] = t_start
- self.__first_run = False
- return True
-
- def run(self):
- """
- Runs job in thread. Handles retries.
- Exits when job failed or timed out.
- :return: None
- """
- step = float(self.timetable['freq'])
- penalty = 0
- self.timetable['last'] = float(time.time() - step)
- self.debug("starting data collection - update frequency:", str(step), " retries allowed:", str(self.retries))
- while True: # run forever, unless something is wrong
- now = float(time.time())
- next = self.timetable['next'] = now - (now % step) + step + penalty
-
- # it is important to do this in a loop
- # sleep() is interruptable
- while now < next:
- self.debug("sleeping for", str(next - now), "secs to reach frequency of",
- str(step), "secs, now:", str(now), " next:", str(next), " penalty:", str(penalty))
- time.sleep(next - now)
- now = float(time.time())
-
- # do the job
- try:
- status = self._run_once()
- except Exception:
- status = False
-
- if status:
- # it is good
- self.retries_left = self.retries
- penalty = 0
- else:
- # it failed
- self.retries_left -= 1
- if self.retries_left <= 0:
- if penalty == 0:
- penalty = float(self.retries * step) / 2
- else:
- penalty *= 1.5
-
- if penalty > 600:
- penalty = 600
-
- self.retries_left = self.retries
- self.alert("failed to collect data for " + str(self.retries) +
- " times - increasing penalty to " + str(penalty) + " sec and trying again")
-
- else:
- self.error("failed to collect data - " + str(self.retries_left)
- + " retries left - penalty: " + str(penalty) + " sec")
-
- # --- CHART ---
-
- @staticmethod
- def _format(*args):
- """
- Escape and convert passed arguments.
- :param args: anything
- :return: list
- """
- params = []
- append = params.append
- for p in args:
- if p is None:
- append(p)
- continue
- if type(p) is not str:
- p = str(p)
- if ' ' in p:
- p = "'" + p + "'"
- append(p)
- return params
-
- def _line(self, instruction, *params):
- """
- Converts *params to string and joins them with one space between every one.
- Result is appended to self._data_stream
- :param params: str/int/float
- """
- tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
- self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
-
- def chart(self, type_id, name="", title="", units="", family="",
- category="", chart_type="line", priority="", update_every=""):
- """
- Defines a new chart.
- :param type_id: str
- :param name: str
- :param title: str
- :param units: str
- :param family: str
- :param category: str
- :param chart_type: str
- :param priority: int/str
- :param update_every: int/str
- """
- self._charts.append(type_id)
-
- p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every)
- self._line("CHART", *p)
-
- def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
- """
- Defines a new dimension for the chart
- :param id: str
- :param name: str
- :param algorithm: str
- :param multiplier: int/str
- :param divisor: int/str
- :param hidden: boolean
- :return:
- """
- try:
- int(multiplier)
- except TypeError:
- self.error("malformed dimension: multiplier is not a number:", multiplier)
- multiplier = 1
- try:
- int(divisor)
- except TypeError:
- self.error("malformed dimension: divisor is not a number:", divisor)
- divisor = 1
- if name is None:
- name = id
- if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
- algorithm = "absolute"
-
- self._dimensions.append(str(id))
- if hidden:
- p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
- else:
- p = self._format(id, name, algorithm, multiplier, divisor)
-
- self._line("DIMENSION", *p)
-
- def begin(self, type_id, microseconds=0):
- """
- Begin data set
- :param type_id: str
- :param microseconds: int
- :return: boolean
- """
- if type_id not in self._charts:
- self.error("wrong chart type_id:", type_id)
- return False
- try:
- int(microseconds)
- except TypeError:
- self.error("malformed begin statement: microseconds are not a number:", microseconds)
- microseconds = ""
-
- self._line("BEGIN", type_id, str(microseconds))
- return True
-
- def set(self, id, value):
- """
- Set value to dimension
- :param id: str
- :param value: int/float
- :return: boolean
- """
- if id not in self._dimensions:
- self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
- return False
- try:
- value = str(int(value))
- except TypeError:
- self.error("cannot set non-numeric value:", str(value))
- return False
- self._line("SET", id, "=", str(value))
- self.__chart_set = True
- return True
-
- def end(self):
- if self.__chart_set:
- self._line("END")
- self.__chart_set = False
- else:
- pos = self._data_stream.rfind("BEGIN")
- self._data_stream = self._data_stream[:pos]
-
- def commit(self):
- """
- Upload new data to netdata.
- """
- try:
- print(self._data_stream)
- except Exception as e:
- msg.fatal('cannot send data to netdata:', str(e))
- self._data_stream = ""
-
- # --- ERROR HANDLING ---
-
- def error(self, *params):
- """
- Show error message on stderr
- """
- msg.error(self.chart_name, *params)
-
- def alert(self, *params):
- """
- Show error message on stderr
- """
- msg.alert(self.chart_name, *params)
-
- def debug(self, *params):
- """
- Show debug message on stderr
- """
- msg.debug(self.chart_name, *params)
-
- def info(self, *params):
- """
- Show information message on stderr
- """
- msg.info(self.chart_name, *params)
-
- # --- MAIN METHODS ---
-
- def _get_data(self):
- """
- Get some data
- :return: dict
- """
- return {}
-
- def check(self):
- """
- check() prototype
- :return: boolean
- """
- self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.")
- data = self._get_data()
-
- if data is None:
- self.debug("failed to receive data during check().")
- return False
-
- if len(data) == 0:
- self.debug("empty data during check().")
- return False
-
- self.debug("successfully received data during check(): '" + str(data) + "'")
- return True
-
- def create(self):
- """
- Create charts
- :return: boolean
- """
- data = self._data_from_check or self._get_data()
- if data is None:
- self.debug("failed to receive data during create().")
- return False
-
- idx = 0
- for name in self.order:
- options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
- self.chart(self.chart_name + "." + name, *options)
- # check if server has this datapoint
- for line in self.definitions[name]['lines']:
- if line[0] in data:
- self.dimension(*line)
- idx += 1
-
- self.commit()
- return True
-
- def update(self, interval):
- """
- Update charts
- :param interval: int
- :return: boolean
- """
- data = self._get_data()
- if data is None:
- self.debug("failed to receive data during update().")
- return False
-
- updated = False
- for chart in self.order:
- if self.begin(self.chart_name + "." + chart, interval):
- updated = True
- for dim in self.definitions[chart]['lines']:
- try:
- self.set(dim[0], data[dim[0]])
- except KeyError:
- pass
- self.end()
-
- self.commit()
- if not updated:
- self.error("no charts to update")
-
- return updated
-
- @staticmethod
- def find_binary(binary):
- try:
- if isinstance(binary, str):
- binary = os.path.basename(binary)
- return next(('/'.join([p, binary]) for p in PATH
- if os.path.isfile('/'.join([p, binary]))
- and os.access('/'.join([p, binary]), os.X_OK)))
- return None
- except StopIteration:
- return None
-
- def _add_new_dimension(self, dimension_id, chart_name, dimension=None, algorithm='incremental',
- multiplier=1, divisor=1, priority=65000):
- """
- :param dimension_id:
- :param chart_name:
- :param dimension:
- :param algorithm:
- :param multiplier:
- :param divisor:
- :param priority:
- :return:
- """
- if not all([dimension_id not in self._dimensions,
- chart_name in self.order,
- chart_name in self.definitions]):
- return
- self._dimensions.append(dimension_id)
- dimension_list = list(map(str, [dimension_id,
- dimension if dimension else dimension_id,
- algorithm,
- multiplier,
- divisor]))
- self.definitions[chart_name]['lines'].append(dimension_list)
- add_to_name = self.override_name or self.name
- job_name = ('_'.join([self.__module__, re.sub('\s+', '_', add_to_name)])
- if add_to_name != 'None' else self.__module__)
- chart = 'CHART {0}.{1} '.format(job_name, chart_name)
- options = '"" "{0}" {1} "{2}" {3} {4} '.format(*self.definitions[chart_name]['options'][1:6])
- other = '{0} {1}\n'.format(priority, self.update_every)
- new_dimension = "DIMENSION {0}\n".format(' '.join(dimension_list))
- print(chart + options + other + new_dimension)
-
-
-class UrlService(SimpleService):
- def __init__(self, configuration=None, name=None):
- SimpleService.__init__(self, configuration=configuration, name=name)
- self.url = self.configuration.get('url')
- self.user = self.configuration.get('user')
- self.password = self.configuration.get('pass')
- self.proxy_user = self.configuration.get('proxy_user')
- self.proxy_password = self.configuration.get('proxy_pass')
- self.proxy_url = self.configuration.get('proxy_url')
- self._manager = None
-
- def __make_headers(self, **header_kw):
- user = header_kw.get('user') or self.user
- password = header_kw.get('pass') or self.password
- proxy_user = header_kw.get('proxy_user') or self.proxy_user
- proxy_password = header_kw.get('proxy_pass') or self.proxy_password
- header_params = dict(keep_alive=True)
- proxy_header_params = dict()
- if user and password:
- header_params['basic_auth'] = '{user}:{password}'.format(user=user,
- password=password)
- if proxy_user and proxy_password:
- proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user,
- password=proxy_password)
- try:
- return urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params)
- except TypeError as error:
- self.error('build_header() error: {error}'.format(error=error))
- return None, None
-
- def _build_manager(self, **header_kw):
- header, proxy_header = self.__make_headers(**header_kw)
- if header is None or proxy_header is None:
- return None
- proxy_url = header_kw.get('proxy_url') or self.proxy_url
- if proxy_url:
- manager = urllib3.ProxyManager
- params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header)
- else:
- manager = urllib3.PoolManager
- params = dict(headers=header)
- try:
- return manager(**params)
- except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error:
- self.error('build_manager() error:', str(error))
- return None
-
- def _get_raw_data(self, url=None, manager=None):
- """
- Get raw data from http request
- :return: str
- """
- try:
- url = url or self.url
- manager = manager or self._manager
- # TODO: timeout, retries and method hardcoded..
- response = manager.request(method='GET',
- url=url,
- timeout=1,
- retries=1,
- headers=manager.headers)
- except (urllib3.exceptions.HTTPError, TypeError, AttributeError) as error:
- self.error('Url: {url}. Error: {error}'.format(url=url, error=error))
- return None
- if response.status == 200:
- return response.data.decode()
- self.debug('Url: {url}. Http response status code: {code}'.format(url=url, code=response.status))
- return None
-
- def check(self):
- """
- Format configuration data and try to connect to server
- :return: boolean
- """
- if not (self.url and isinstance(self.url, str)):
- self.error('URL is not defined or type is not <str>')
- return False
-
- self._manager = self._build_manager()
- if not self._manager:
- return False
-
- try:
- data = self._get_data()
- except Exception as error:
- self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error))
- return False
-
- if isinstance(data, dict) and data:
- self._data_from_check = data
- return True
- self.error('_get_data() returned no data or type is not <dict>')
- return False
-
-
-class SocketService(SimpleService):
- def __init__(self, configuration=None, name=None):
- self._sock = None
- self._keep_alive = False
- self.host = "localhost"
- self.port = None
- self.unix_socket = None
- self.request = ""
- self.__socket_config = None
- self.__empty_request = "".encode()
- SimpleService.__init__(self, configuration=configuration, name=name)
-
- def _socketerror(self, message=None):
- if self.unix_socket is not None:
- self.error("unix socket '" + self.unix_socket + "':", message)
- else:
- if self.__socket_config is not None:
- af, socktype, proto, canonname, sa = self.__socket_config
- self.error("socket to '" + str(sa[0]) + "' port " + str(sa[1]) + ":", message)
- else:
- self.error("unknown socket:", message)
-
- def _connect2socket(self, res=None):
- """
- Connect to a socket, passing the result of getaddrinfo()
- :return: boolean
- """
- if res is None:
- res = self.__socket_config
- if res is None:
- self.error("Cannot create socket to 'None':")
- return False
-
- af, socktype, proto, canonname, sa = res
- try:
- self.debug("creating socket to '" + str(sa[0]) + "', port " + str(sa[1]))
- self._sock = socket.socket(af, socktype, proto)
- except socket.error as e:
- self.error("Failed to create socket to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e))
- self._sock = None
- self.__socket_config = None
- return False
-
- try:
- self.debug("connecting socket to '" + str(sa[0]) + "', port " + str(sa[1]))
- self._sock.connect(sa)
- except socket.error as e:
- self.error("Failed to connect to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e))
- self._disconnect()
- self.__socket_config = None
- return False
-
- self.debug("connected to '" + str(sa[0]) + "', port " + str(sa[1]))
- self.__socket_config = res
- return True
-
- def _connect2unixsocket(self):
- """
- Connect to a unix socket, given its filename
- :return: boolean
- """
- if self.unix_socket is None:
- self.error("cannot connect to unix socket 'None'")
- return False
-
- try:
- self.debug("attempting DGRAM unix socket '" + str(self.unix_socket) + "'")
- self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- self._sock.connect(self.unix_socket)
- self.debug("connected DGRAM unix socket '" + str(self.unix_socket) + "'")
- return True
- except socket.error as e:
- self.debug("Failed to connect DGRAM unix socket '" + str(self.unix_socket) + "':", str(e))
-
- try:
- self.debug("attempting STREAM unix socket '" + str(self.unix_socket) + "'")
- self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self._sock.connect(self.unix_socket)
- self.debug("connected STREAM unix socket '" + str(self.unix_socket) + "'")
- return True
- except socket.error as e:
- self.debug("Failed to connect STREAM unix socket '" + str(self.unix_socket) + "':", str(e))
- self.error("Failed to connect to unix socket '" + str(self.unix_socket) + "':", str(e))
- self._sock = None
- return False
-
- def _connect(self):
- """
- Recreate socket and connect to it since sockets cannot be reused after closing
- Available configurations are IPv6, IPv4 or UNIX socket
- :return:
- """
- try:
- if self.unix_socket is not None:
- self._connect2unixsocket()
-
- else:
- if self.__socket_config is not None:
- self._connect2socket()
- else:
- for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
- if self._connect2socket(res): break
-
- except Exception as e:
- self._sock = None
- self.__socket_config = None
-
- if self._sock is not None:
- self._sock.setblocking(0)
- self._sock.settimeout(5)
- self.debug("set socket timeout to: " + str(self._sock.gettimeout()))
-
- def _disconnect(self):
- """
- Close socket connection
- :return:
- """
- if self._sock is not None:
- try:
- self.debug("closing socket")
- self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all
- self._sock.close()
- except Exception:
- pass
- self._sock = None
-
- def _send(self):
- """
- Send request.
- :return: boolean
- """
- # Send request if it is needed
- if self.request != self.__empty_request:
- try:
- self.debug("sending request:", str(self.request))
- self._sock.send(self.request)
- except Exception as e:
- self._socketerror("error sending request:" + str(e))
- self._disconnect()
- return False
- return True
-
- def _receive(self):
- """
- Receive data from socket
- :return: str
- """
- data = ""
- while True:
- self.debug("receiving response")
- try:
- buf = self._sock.recv(4096)
- except Exception as e:
- self._socketerror("failed to receive response:" + str(e))
- self._disconnect()
- break
-
- if buf is None or len(buf) == 0: # handle server disconnect
- if data == "":
- self._socketerror("unexpectedly disconnected")
- else:
- self.debug("server closed the connection")
- self._disconnect()
- break
-
- self.debug("received data:", str(buf))
- data += buf.decode('utf-8', 'ignore')
- if self._check_raw_data(data):
- break
-
- self.debug("final response:", str(data))
- return data
-
- def _get_raw_data(self):
- """
- Get raw data with low-level "socket" module.
- :return: str
- """
- if self._sock is None:
- self._connect()
- if self._sock is None:
- return None
-
- # Send request if it is needed
- if not self._send():
- return None
-
- data = self._receive()
-
- if not self._keep_alive:
- self._disconnect()
-
- return data
-
- def _check_raw_data(self, data):
- """
- Check if all data has been gathered from socket
- :param data: str
- :return: boolean
- """
- return True
-
- def _parse_config(self):
- """
- Parse configuration data
- :return: boolean
- """
- if self.name is None or self.name == str(None):
- self.name = ""
- else:
- self.name = str(self.name)
-
- try:
- self.unix_socket = str(self.configuration['socket'])
- except (KeyError, TypeError):
- self.debug("No unix socket specified. Trying TCP/IP socket.")
- self.unix_socket = None
- try:
- self.host = str(self.configuration['host'])
- except (KeyError, TypeError):
- self.debug("No host specified. Using: '" + self.host + "'")
- try:
- self.port = int(self.configuration['port'])
- except (KeyError, TypeError):
- self.debug("No port specified. Using: '" + str(self.port) + "'")
-
- try:
- self.request = str(self.configuration['request'])
- except (KeyError, TypeError):
- self.debug("No request specified. Using: '" + str(self.request) + "'")
-
- self.request = self.request.encode()
-
- def check(self):
- self._parse_config()
- return SimpleService.check(self)
-
-
-class LogService(SimpleService):
- def __init__(self, configuration=None, name=None):
- SimpleService.__init__(self, configuration=configuration, name=name)
- self.log_path = self.configuration.get('path')
- self.__glob_path = self.log_path
- self._last_position = 0
- self.retries = 100000 # basically always retry
- self.__re_find = dict(current=0, run=0, maximum=60)
-
- def _get_raw_data(self):
- """
- Get log lines since last poll
- :return: list
- """
- lines = list()
- try:
- if self.__re_find['current'] == self.__re_find['run']:
- self._find_recent_log_file()
- size = os.path.getsize(self.log_path)
- if size == self._last_position:
- self.__re_find['current'] += 1
- return list() # return empty list if nothing has changed
- elif size < self._last_position:
- self._last_position = 0 # read from beginning if file has shrunk
-
- with open(self.log_path) as fp:
- fp.seek(self._last_position)
- for line in fp:
- lines.append(line)
- self._last_position = fp.tell()
- self.__re_find['current'] = 0
- except (OSError, IOError) as error:
- self.__re_find['current'] += 1
- self.error(str(error))
-
- return lines or None
-
- def _find_recent_log_file(self):
- """
- :return:
- """
- self.__re_find['run'] = self.__re_find['maximum']
- self.__re_find['current'] = 0
- self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files
- path_list = glob(self.__glob_path)
- if path_list:
- self.log_path = max(path_list)
- return True
- return False
-
- def check(self):
- """
- Parse basic configuration and check if log file exists
- :return: boolean
- """
- if not self.log_path:
- self.error("No path to log specified")
- return None
-
- if all([self._find_recent_log_file(),
- os.access(self.log_path, os.R_OK),
- os.path.isfile(self.log_path)]):
- return True
- self.error("Cannot access %s" % self.log_path)
- return False
-
- def create(self):
- # set cursor at last byte of log file
- self._last_position = os.path.getsize(self.log_path)
- status = SimpleService.create(self)
- # self._last_position = 0
- return status
-
-
-class ExecutableService(SimpleService):
-
- def __init__(self, configuration=None, name=None):
- SimpleService.__init__(self, configuration=configuration, name=name)
- self.command = None
-
- def _get_raw_data(self, stderr=False):
- """
- Get raw data from executed command
- :return: <list>
- """
- try:
- p = Popen(self.command, stdout=PIPE, stderr=PIPE)
- except Exception as error:
- self.error("Executing command", " ".join(self.command), "resulted in error:", str(error))
- return None
- data = list()
- std = p.stderr if stderr else p.stdout
- for line in std.readlines():
- data.append(line.decode())
-
- return data or None
-
- def check(self):
- """
- Parse basic configuration, check if command is whitelisted and is returning values
- :return: <boolean>
- """
- # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified)
- if 'command' in self.configuration:
- self.command = self.configuration['command']
-
- # "command" must be: 1.not None 2. type <str>
- if not (self.command and isinstance(self.command, str)):
- self.error('Command is not defined or command type is not <str>')
- return False
-
- # Split "command" into: 1. command <str> 2. options <list>
- command, opts = self.command.split()[0], self.command.split()[1:]
-
- # Check for "bad" symbols in options. No pipes, redirects etc. TODO: what is missing?
- bad_opts = set(''.join(opts)) & set(['&', '|', ';', '>', '<'])
- if bad_opts:
- self.error("Bad command argument(s): %s" % bad_opts)
- return False
-
- # Find absolute path ('echo' => '/bin/echo')
- if '/' not in command:
- command = self.find_binary(command)
- if not command:
- self.error('Can\'t locate "%s" binary in PATH(%s)' % (self.command, PATH))
- return False
- # Check if binary exist and executable
- else:
- if not (os.path.isfile(command) and os.access(command, os.X_OK)):
- self.error('"%s" is not a file or not executable' % command)
- return False
-
- self.command = [command] + opts if opts else [command]
-
- try:
- data = self._get_data()
- except Exception as error:
- self.error('_get_data() failed. Command: %s. Error: %s' % (self.command, error))
- return False
-
- if isinstance(data, dict) and data:
- # We need this for create() method. No reason to execute get_data() again if result is not empty dict()
- self._data_from_check = data
- return True
- else:
- self.error("Command", str(self.command), "returned no data")
- return False
-
-
-class MySQLService(SimpleService):
-
- def __init__(self, configuration=None, name=None):
- SimpleService.__init__(self, configuration=configuration, name=name)
- self.__connection = None
- self.__conn_properties = dict()
- self.extra_conn_properties = dict()
- self.__queries = self.configuration.get('queries', dict())
- self.queries = dict()
-
- def __connect(self):
- try:
- connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties)
- except (MySQLdb.MySQLError, TypeError, AttributeError) as error:
- return None, str(error)
- else:
- return connection, None
-
- def check(self):
- def get_connection_properties(conf, extra_conf):
- properties = dict()
- if conf.get('user'):
- properties['user'] = conf['user']
- if conf.get('pass'):
- properties['passwd'] = conf['pass']
- if conf.get('socket'):
- properties['unix_socket'] = conf['socket']
- elif conf.get('host'):
- properties['host'] = conf['host']
- properties['port'] = int(conf.get('port', 3306))
- elif conf.get('my.cnf'):
- if MySQLdb.__name__ == 'pymysql':
- self.error('"my.cnf" parsing is not working for pymysql')
- else:
- properties['read_default_file'] = conf['my.cnf']
- if isinstance(extra_conf, dict) and extra_conf:
- properties.update(extra_conf)
-
- return properties or None
-
- def is_valid_queries_dict(raw_queries, log_error):
- """
- :param raw_queries: dict:
- :param log_error: function:
- :return: dict or None
-
- raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries)
- """
- def is_valid_query(query):
- return all([isinstance(query, str),
- query.startswith(('SELECT', 'select', 'SHOW', 'show'))])
-
- if hasattr(raw_queries, 'keys') and raw_queries:
- valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)])
- bad_queries = set(raw_queries) - set(valid_queries)
-
- if bad_queries:
- log_error('Removed query(s): %s' % bad_queries)
- return valid_queries
- else:
- log_error('Unsupported "queries" format. Must be not empty <dict>')
- return None
-
- if not PY_MYSQL:
- self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin')
- return False
-
- # Preference: 1. "queries" from the configuration file 2. "queries" from the module
- self.queries = self.__queries or self.queries
- # Check if "self.queries" exist, not empty and all queries are in valid format
- self.queries = is_valid_queries_dict(self.queries, self.error)
- if not self.queries:
- return None
-
- # Get connection properties
- self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties)
- if not self.__conn_properties:
- self.error('Connection properties are missing')
- return False
-
- # Create connection to the database
- self.__connection, error = self.__connect()
- if error:
- self.error('Can\'t establish connection to MySQL: %s' % error)
- return False
-
- try:
- data = self._get_data()
- except Exception as error:
- self.error('_get_data() failed. Error: %s' % error)
- return False
-
- if isinstance(data, dict) and data:
- # We need this for create() method
- self._data_from_check = data
- return True
- else:
- self.error("_get_data() returned no data or type is not <dict>")
- return False
-
- def _get_raw_data(self, description=None):
- """
- Get raw data from MySQL server
- :return: dict: fetchall() or (fetchall(), description)
- """
-
- if not self.__connection:
- self.__connection, error = self.__connect()
- if error:
- return None
-
- raw_data = dict()
- queries = dict(self.queries)
- try:
- with self.__connection as cursor:
- for name, query in queries.items():
- try:
- cursor.execute(query)
- except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
- if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
- raise RuntimeError
- self.error('Removed query: %s[%s]. Error: %s'
- % (name, query, error))
- self.queries.pop(name)
- continue
- else:
- raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
- self.__connection.commit()
- except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError):
- self.__connection.close()
- self.__connection = None
- return None
- else:
- return raw_data or None
-
- @staticmethod
- def __is_error_critical(err_class, err_text):
- return err_class == MySQLdb.OperationalError and all(['denied' not in err_text,
- 'Unknown column' not in err_text])
+# Description: backward compatibility with old version
+
+from bases.FrameworkServices.SimpleService import SimpleService
+from bases.FrameworkServices.UrlService import UrlService
+from bases.FrameworkServices.SocketService import SocketService
+from bases.FrameworkServices.LogService import LogService
+from bases.FrameworkServices.ExecutableService import ExecutableService
+from bases.FrameworkServices.MySQLService import MySQLService
diff --git a/python.d/python_modules/bases/FrameworkServices/ExecutableService.py b/python.d/python_modules/bases/FrameworkServices/ExecutableService.py
new file mode 100644
index 00000000..b6d7871f
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/ExecutableService.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
+
+import os
+
+from subprocess import Popen, PIPE
+
+from bases.FrameworkServices.SimpleService import SimpleService
+from bases.collection import find_binary
+
+
+class ExecutableService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.command = None
+
+ def _get_raw_data(self, stderr=False):
+ """
+ Get raw data from executed command
+ :return: <list>
+ """
+ try:
+ p = Popen(self.command, stdout=PIPE, stderr=PIPE)
+ except Exception as error:
+ self.error('Executing command {command} resulted in error: {error}'.format(command=self.command,
+ error=error))
+ return None
+ data = list()
+ std = p.stderr if stderr else p.stdout
+ for line in std:
+ data.append(line.decode())
+
+ return data or None
+
+ def check(self):
+ """
+ Parse basic configuration, check if command is whitelisted and is returning values
+ :return: <boolean>
+ """
+ # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified)
+ if 'command' in self.configuration:
+ self.command = self.configuration['command']
+
+ # "command" must be: 1.not None 2. type <str>
+ if not (self.command and isinstance(self.command, str)):
+ self.error('Command is not defined or command type is not <str>')
+ return False
+
+ # Split "command" into: 1. command <str> 2. options <list>
+ command, opts = self.command.split()[0], self.command.split()[1:]
+
+ # Check for "bad" symbols in options. No pipes, redirects etc.
+ opts_list = ['&', '|', ';', '>', '<']
+ bad_opts = set(''.join(opts)) & set(opts_list)
+ if bad_opts:
+ self.error("Bad command argument(s): {opts}".format(opts=bad_opts))
+ return False
+
+ # Find absolute path ('echo' => '/bin/echo')
+ if '/' not in command:
+ command = find_binary(command)
+ if not command:
+ self.error('Can\'t locate "{command}" binary'.format(command=self.command))
+ return False
+ # Check if binary exist and executable
+ else:
+ if not os.access(command, os.X_OK):
+ self.error('"{binary}" is not executable'.format(binary=command))
+ return False
+
+ self.command = [command] + opts if opts else [command]
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Command: {command}. Error: {error}'.format(command=self.command,
+ error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error('Command "{command}" returned no data'.format(command=self.command))
+ return False
diff --git a/python.d/python_modules/bases/FrameworkServices/LogService.py b/python.d/python_modules/bases/FrameworkServices/LogService.py
new file mode 100644
index 00000000..45daa244
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/LogService.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+
+from glob import glob
+import os
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+class LogService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.log_path = self.configuration.get('path')
+ self.__glob_path = self.log_path
+ self._last_position = 0
+ self.__re_find = dict(current=0, run=0, maximum=60)
+
+ def _get_raw_data(self):
+ """
+ Get log lines since last poll
+ :return: list
+ """
+ lines = list()
+ try:
+ if self.__re_find['current'] == self.__re_find['run']:
+ self._find_recent_log_file()
+ size = os.path.getsize(self.log_path)
+ if size == self._last_position:
+ self.__re_find['current'] += 1
+ return list() # return empty list if nothing has changed
+ elif size < self._last_position:
+ self._last_position = 0 # read from beginning if file has shrunk
+
+ with open(self.log_path) as fp:
+ fp.seek(self._last_position)
+ for line in fp:
+ lines.append(line)
+ self._last_position = fp.tell()
+ self.__re_find['current'] = 0
+ except (OSError, IOError) as error:
+ self.__re_find['current'] += 1
+ self.error(str(error))
+
+ return lines or None
+
+ def _find_recent_log_file(self):
+ """
+ :return:
+ """
+ self.__re_find['run'] = self.__re_find['maximum']
+ self.__re_find['current'] = 0
+ self.__glob_path = self.__glob_path or self.log_path # workaround for modules w/o config files
+ path_list = glob(self.__glob_path)
+ if path_list:
+ self.log_path = max(path_list)
+ return True
+ return False
+
+ def check(self):
+ """
+ Parse basic configuration and check if log file exists
+ :return: boolean
+ """
+ if not self.log_path:
+ self.error('No path to log specified')
+ return None
+
+ if self._find_recent_log_file() and os.access(self.log_path, os.R_OK) and os.path.isfile(self.log_path):
+ return True
+ self.error('Cannot access {0}'.format(self.log_path))
+ return False
+
+ def create(self):
+ # set cursor at last byte of log file
+ self._last_position = os.path.getsize(self.log_path)
+ status = SimpleService.create(self)
+ return status
diff --git a/python.d/python_modules/bases/FrameworkServices/MySQLService.py b/python.d/python_modules/bases/FrameworkServices/MySQLService.py
new file mode 100644
index 00000000..3acc5b10
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/MySQLService.py
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (l2isbad)
+
+from sys import exc_info
+
+try:
+ import MySQLdb
+
+ PY_MYSQL = True
+except ImportError:
+ try:
+ import pymysql as MySQLdb
+
+ PY_MYSQL = True
+ except ImportError:
+ PY_MYSQL = False
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+class MySQLService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.__connection = None
+ self.__conn_properties = dict()
+ self.extra_conn_properties = dict()
+ self.__queries = self.configuration.get('queries', dict())
+ self.queries = dict()
+
+ def __connect(self):
+ try:
+ connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties)
+ except (MySQLdb.MySQLError, TypeError, AttributeError) as error:
+ return None, str(error)
+ else:
+ return connection, None
+
+ def check(self):
+ def get_connection_properties(conf, extra_conf):
+ properties = dict()
+ if conf.get('user'):
+ properties['user'] = conf['user']
+ if conf.get('pass'):
+ properties['passwd'] = conf['pass']
+ if conf.get('socket'):
+ properties['unix_socket'] = conf['socket']
+ elif conf.get('host'):
+ properties['host'] = conf['host']
+ properties['port'] = int(conf.get('port', 3306))
+ elif conf.get('my.cnf'):
+ if MySQLdb.__name__ == 'pymysql':
+ self.error('"my.cnf" parsing is not working for pymysql')
+ else:
+ properties['read_default_file'] = conf['my.cnf']
+ if isinstance(extra_conf, dict) and extra_conf:
+ properties.update(extra_conf)
+
+ return properties or None
+
+ def is_valid_queries_dict(raw_queries, log_error):
+ """
+ :param raw_queries: dict:
+ :param log_error: function:
+ :return: dict or None
+
+ raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries)
+ """
+
+ def is_valid_query(query):
+ return all([isinstance(query, str),
+ query.startswith(('SELECT', 'select', 'SHOW', 'show'))])
+
+ if hasattr(raw_queries, 'keys') and raw_queries:
+ valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)])
+ bad_queries = set(raw_queries) - set(valid_queries)
+
+ if bad_queries:
+ log_error('Removed query(s): {queries}'.format(queries=bad_queries))
+ return valid_queries
+ else:
+ log_error('Unsupported "queries" format. Must be not empty <dict>')
+ return None
+
+ if not PY_MYSQL:
+ self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin')
+ return False
+
+ # Preference: 1. "queries" from the configuration file 2. "queries" from the module
+ self.queries = self.__queries or self.queries
+ # Check if "self.queries" exist, not empty and all queries are in valid format
+ self.queries = is_valid_queries_dict(self.queries, self.error)
+ if not self.queries:
+ return None
+
+ # Get connection properties
+ self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties)
+ if not self.__conn_properties:
+ self.error('Connection properties are missing')
+ return False
+
+ # Create connection to the database
+ self.__connection, error = self.__connect()
+ if error:
+ self.error('Can\'t establish connection to MySQL: {error}'.format(error=error))
+ return False
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Error: {error}'.format(error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error("_get_data() returned no data or type is not <dict>")
+ return False
+
+ def _get_raw_data(self, description=None):
+ """
+ Get raw data from MySQL server
+ :return: dict: fetchall() or (fetchall(), description)
+ """
+
+ if not self.__connection:
+ self.__connection, error = self.__connect()
+ if error:
+ return None
+
+ raw_data = dict()
+ queries = dict(self.queries)
+ try:
+ with self.__connection as cursor:
+ for name, query in queries.items():
+ try:
+ cursor.execute(query)
+ except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
+ if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
+ raise RuntimeError
+ self.error('Removed query: {name}[{query}]. Error: error'.format(name=name,
+ query=query,
+ error=error))
+ self.queries.pop(name)
+ continue
+ else:
+ raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
+ self.__connection.commit()
+ except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError):
+ self.__connection.close()
+ self.__connection = None
+ return None
+ else:
+ return raw_data or None
+
+ @staticmethod
+ def __is_error_critical(err_class, err_text):
+ return err_class == MySQLdb.OperationalError and all(['denied' not in err_text,
+ 'Unknown column' not in err_text])
diff --git a/python.d/python_modules/bases/FrameworkServices/SimpleService.py b/python.d/python_modules/bases/FrameworkServices/SimpleService.py
new file mode 100644
index 00000000..14c83910
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/SimpleService.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
+
+from threading import Thread
+
+try:
+ from time import sleep, monotonic as time
+except ImportError:
+ from time import sleep, time
+
+from bases.charts import Charts, ChartError, create_runtime_chart
+from bases.collection import OldVersionCompatibility, safe_print
+from bases.loggers import PythonDLimitedLogger
+
+RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \
+ 'SET run_time = {elapsed}\n' \
+ 'END\n'
+
+
+class RuntimeCounters:
+ def __init__(self, configuration):
+ """
+ :param configuration: <dict>
+ """
+ self.FREQ = int(configuration.pop('update_every'))
+ self.START_RUN = 0
+ self.NEXT_RUN = 0
+ self.PREV_UPDATE = 0
+ self.SINCE_UPDATE = 0
+ self.ELAPSED = 0
+ self.RETRIES = 0
+ self.RETRIES_MAX = configuration.pop('retries')
+ self.PENALTY = 0
+
+ def is_sleep_time(self):
+ return self.START_RUN < self.NEXT_RUN
+
+
+class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, object):
+ """
+ Prototype of Service class.
+ Implemented basic functionality to run jobs by `python.d.plugin`
+ """
+ def __init__(self, configuration, name=''):
+ """
+ :param configuration: <dict>
+ :param name: <str>
+ """
+ Thread.__init__(self)
+ self.daemon = True
+ PythonDLimitedLogger.__init__(self)
+ OldVersionCompatibility.__init__(self)
+ self.configuration = configuration
+ self.order = list()
+ self.definitions = dict()
+
+ self.module_name = self.__module__
+ self.job_name = configuration.pop('job_name')
+ self.override_name = configuration.pop('override_name')
+ self.fake_name = None
+
+ self._runtime_counters = RuntimeCounters(configuration=configuration)
+ self.charts = Charts(job_name=self.actual_name,
+ priority=configuration.pop('priority'),
+ cleanup=configuration.pop('chart_cleanup'),
+ get_update_every=self.get_update_every,
+ module_name=self.module_name)
+
+ def __repr__(self):
+ return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__),
+ name=self.name)
+
+ @property
+ def name(self):
+ if self.job_name:
+ return '_'.join([self.module_name, self.override_name or self.job_name])
+ return self.module_name
+
+ def actual_name(self):
+ return self.fake_name or self.name
+
+ @property
+ def update_every(self):
+ return self._runtime_counters.FREQ
+
+ @update_every.setter
+ def update_every(self, value):
+ """
+ :param value: <int>
+ :return:
+ """
+ self._runtime_counters.FREQ = value
+
+ def get_update_every(self):
+ return self.update_every
+
+ def check(self):
+ """
+ check() prototype
+ :return: boolean
+ """
+ self.debug("job doesn't implement check() method. Using default which simply invokes get_data().")
+ data = self.get_data()
+ if data and isinstance(data, dict):
+ return True
+ self.debug('returned value is wrong: {0}'.format(data))
+ return False
+
+ @create_runtime_chart
+ def create(self):
+ for chart_name in self.order:
+ chart_config = self.definitions.get(chart_name)
+
+ if not chart_config:
+ self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. "
+ "Skipping it.".format(chart_name=chart_name))
+ continue
+
+ # create chart
+ chart_params = [chart_name] + chart_config['options']
+ try:
+ self.charts.add_chart(params=chart_params)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name,
+ error=error))
+ continue
+
+ # add dimensions to chart
+ for dimension in chart_config['lines']:
+ try:
+ self.charts[chart_name].add_dimension(dimension)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension,
+ error=error))
+ continue
+
+ # add variables to chart
+ if 'variables' in chart_config:
+ for variable in chart_config['variables']:
+ try:
+ self.charts[chart_name].add_variable(variable)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable,
+ error=error))
+ continue
+
+ del self.order
+ del self.definitions
+
+ # True if job has at least 1 chart else False
+ return bool(self.charts)
+
+ def run(self):
+ """
+ Runs job in thread. Handles retries.
+ Exits when job failed or timed out.
+ :return: None
+ """
+ job = self._runtime_counters
+ self.debug('started, update frequency: {freq}, '
+ 'retries: {retries}'.format(freq=job.FREQ, retries=job.RETRIES_MAX - job.RETRIES))
+
+ while True:
+ job.START_RUN = time()
+
+ job.NEXT_RUN = job.START_RUN - (job.START_RUN % job.FREQ) + job.FREQ + job.PENALTY
+
+ self.sleep_until_next_run()
+
+ if job.PREV_UPDATE:
+ job.SINCE_UPDATE = int((job.START_RUN - job.PREV_UPDATE) * 1e6)
+
+ try:
+ updated = self.update(interval=job.SINCE_UPDATE)
+ except Exception as error:
+ self.error('update() unhandled exception: {error}'.format(error=error))
+ updated = False
+
+ if not updated:
+ if not self.manage_retries():
+ return
+ else:
+ job.ELAPSED = int((time() - job.START_RUN) * 1e3)
+ job.PREV_UPDATE = job.START_RUN
+ job.RETRIES, job.PENALTY = 0, 0
+ safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name,
+ since_last=job.SINCE_UPDATE,
+ elapsed=job.ELAPSED))
+ self.debug('update => [{status}] (elapsed time: {elapsed}, '
+ 'retries left: {retries})'.format(status='OK' if updated else 'FAILED',
+ elapsed=job.ELAPSED if updated else '-',
+ retries=job.RETRIES_MAX - job.RETRIES))
+
+ def update(self, interval):
+ """
+ :return:
+ """
+ data = self.get_data()
+ if not data:
+ self.debug('get_data() returned no data')
+ return False
+ elif not isinstance(data, dict):
+ self.debug('get_data() returned incorrect type data')
+ return False
+
+ updated = False
+
+ for chart in self.charts:
+ if chart.flags.obsoleted:
+ continue
+ elif self.charts.cleanup and chart.penalty >= self.charts.cleanup:
+ chart.obsolete()
+ self.error("chart '{0}' was suppressed due to non updating".format(chart.name))
+ continue
+
+ ok = chart.update(data, interval)
+ if ok:
+ updated = True
+
+ if not updated:
+ self.debug('none of the charts has been updated')
+
+ return updated
+
+ def manage_retries(self):
+ rc = self._runtime_counters
+ rc.RETRIES += 1
+ if rc.RETRIES % 5 == 0:
+ rc.PENALTY = int(rc.RETRIES * self.update_every / 2)
+ if rc.RETRIES >= rc.RETRIES_MAX:
+ self.error('stopped after {0} data collection failures in a row'.format(rc.RETRIES_MAX))
+ return False
+ return True
+
+ def sleep_until_next_run(self):
+ job = self._runtime_counters
+
+ # sleep() is interruptable
+ while job.is_sleep_time():
+ sleep_time = job.NEXT_RUN - job.START_RUN
+ self.debug('sleeping for {sleep_time} to reach frequency of {freq} sec'.format(sleep_time=sleep_time,
+ freq=job.FREQ + job.PENALTY))
+ sleep(sleep_time)
+ job.START_RUN = time()
+
+ def get_data(self):
+ return self._get_data()
+
+ def _get_data(self):
+ raise NotImplementedError
diff --git a/python.d/python_modules/bases/FrameworkServices/SocketService.py b/python.d/python_modules/bases/FrameworkServices/SocketService.py
new file mode 100644
index 00000000..90631df1
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/SocketService.py
@@ -0,0 +1,250 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+
+import socket
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+
+class SocketService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ self._sock = None
+ self._keep_alive = False
+ self.host = 'localhost'
+ self.port = None
+ self.unix_socket = None
+ self.request = ''
+ self.__socket_config = None
+ self.__empty_request = "".encode()
+ SimpleService.__init__(self, configuration=configuration, name=name)
+
+ def _socket_error(self, message=None):
+ if self.unix_socket is not None:
+ self.error('unix socket "{socket}": {message}'.format(socket=self.unix_socket,
+ message=message))
+ else:
+ if self.__socket_config is not None:
+ af, sock_type, proto, canon_name, sa = self.__socket_config
+ self.error('socket to "{address}" port {port}: {message}'.format(address=sa[0],
+ port=sa[1],
+ message=message))
+ else:
+ self.error('unknown socket: {0}'.format(message))
+
+ def _connect2socket(self, res=None):
+ """
+ Connect to a socket, passing the result of getaddrinfo()
+ :return: boolean
+ """
+ if res is None:
+ res = self.__socket_config
+ if res is None:
+ self.error("Cannot create socket to 'None':")
+ return False
+
+ af, sock_type, proto, canon_name, sa = res
+ try:
+ self.debug('Creating socket to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self._sock = socket.socket(af, sock_type, proto)
+ except socket.error as error:
+ self.error('Failed to create socket "{address}", port {port}, error: {error}'.format(address=sa[0],
+ port=sa[1],
+ error=error))
+ self._sock = None
+ self.__socket_config = None
+ return False
+
+ try:
+ self.debug('connecting socket to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self._sock.connect(sa)
+ except socket.error as error:
+ self.error('Failed to connect to "{address}", port {port}, error: {error}'.format(address=sa[0],
+ port=sa[1],
+ error=error))
+ self._disconnect()
+ self.__socket_config = None
+ return False
+
+ self.debug('connected to "{address}", port {port}'.format(address=sa[0], port=sa[1]))
+ self.__socket_config = res
+ return True
+
+ def _connect2unixsocket(self):
+ """
+ Connect to a unix socket, given its filename
+ :return: boolean
+ """
+ if self.unix_socket is None:
+ self.error("cannot connect to unix socket 'None'")
+ return False
+
+ try:
+ self.debug('attempting DGRAM unix socket "{0}"'.format(self.unix_socket))
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self._sock.connect(self.unix_socket)
+ self.debug('connected DGRAM unix socket "{0}"'.format(self.unix_socket))
+ return True
+ except socket.error as error:
+ self.debug('Failed to connect DGRAM unix socket "{socket}": {error}'.format(socket=self.unix_socket,
+ error=error))
+
+ try:
+ self.debug('attempting STREAM unix socket "{0}"'.format(self.unix_socket))
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self._sock.connect(self.unix_socket)
+ self.debug('connected STREAM unix socket "{0}"'.format(self.unix_socket))
+ return True
+ except socket.error as error:
+ self.debug('Failed to connect STREAM unix socket "{socket}": {error}'.format(socket=self.unix_socket,
+ error=error))
+ self._sock = None
+ return False
+
+ def _connect(self):
+ """
+ Recreate socket and connect to it since sockets cannot be reused after closing
+ Available configurations are IPv6, IPv4 or UNIX socket
+ :return:
+ """
+ try:
+ if self.unix_socket is not None:
+ self._connect2unixsocket()
+
+ else:
+ if self.__socket_config is not None:
+ self._connect2socket()
+ else:
+ for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+ if self._connect2socket(res):
+ break
+
+ except Exception:
+ self._sock = None
+ self.__socket_config = None
+
+ if self._sock is not None:
+ self._sock.setblocking(0)
+ self._sock.settimeout(5)
+ self.debug('set socket timeout to: {0}'.format(self._sock.gettimeout()))
+
+ def _disconnect(self):
+ """
+ Close socket connection
+ :return:
+ """
+ if self._sock is not None:
+ try:
+ self.debug('closing socket')
+ self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all
+ self._sock.close()
+ except Exception:
+ pass
+ self._sock = None
+
+ def _send(self):
+ """
+ Send request.
+ :return: boolean
+ """
+ # Send request if it is needed
+ if self.request != self.__empty_request:
+ try:
+ self.debug('sending request: {0}'.format(self.request))
+ self._sock.send(self.request)
+ except Exception as error:
+ self._socket_error('error sending request: {0}'.format(error))
+ self._disconnect()
+ return False
+ return True
+
+ def _receive(self):
+ """
+ Receive data from socket
+ :return: str
+ """
+ data = ""
+ while True:
+ self.debug('receiving response')
+ try:
+ buf = self._sock.recv(4096)
+ except Exception as error:
+ self._socket_error('failed to receive response: {0}'.format(error))
+ self._disconnect()
+ break
+
+ if buf is None or len(buf) == 0: # handle server disconnect
+ if data == "":
+ self._socket_error('unexpectedly disconnected')
+ else:
+ self.debug('server closed the connection')
+ self._disconnect()
+ break
+
+ self.debug('received data')
+ data += buf.decode('utf-8', 'ignore')
+ if self._check_raw_data(data):
+ break
+
+ self.debug('final response: {0}'.format(data))
+ return data
+
+ def _get_raw_data(self):
+ """
+ Get raw data with low-level "socket" module.
+ :return: str
+ """
+ if self._sock is None:
+ self._connect()
+ if self._sock is None:
+ return None
+
+ # Send request if it is needed
+ if not self._send():
+ return None
+
+ data = self._receive()
+
+ if not self._keep_alive:
+ self._disconnect()
+
+ return data
+
+ @staticmethod
+ def _check_raw_data(data):
+ """
+ Check if all data has been gathered from socket
+ :param data: str
+ :return: boolean
+ """
+ return bool(data)
+
+ def _parse_config(self):
+ """
+ Parse configuration data
+ :return: boolean
+ """
+ try:
+ self.unix_socket = str(self.configuration['socket'])
+ except (KeyError, TypeError):
+ self.debug('No unix socket specified. Trying TCP/IP socket.')
+ self.unix_socket = None
+ try:
+ self.host = str(self.configuration['host'])
+ except (KeyError, TypeError):
+ self.debug('No host specified. Using: "{0}"'.format(self.host))
+ try:
+ self.port = int(self.configuration['port'])
+ except (KeyError, TypeError):
+ self.debug('No port specified. Using: "{0}"'.format(self.port))
+
+ try:
+ self.request = str(self.configuration['request'])
+ except (KeyError, TypeError):
+ self.debug('No request specified. Using: "{0}"'.format(self.request))
+
+ self.request = self.request.encode()
+
+ def check(self):
+ self._parse_config()
+ return SimpleService.check(self)
diff --git a/python.d/python_modules/bases/FrameworkServices/UrlService.py b/python.d/python_modules/bases/FrameworkServices/UrlService.py
new file mode 100644
index 00000000..0941ab16
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/UrlService.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
+
+import urllib3
+
+from bases.FrameworkServices.SimpleService import SimpleService
+
+try:
+ urllib3.disable_warnings()
+except AttributeError:
+ pass
+
+
+class UrlService(SimpleService):
+ def __init__(self, configuration=None, name=None):
+ SimpleService.__init__(self, configuration=configuration, name=name)
+ self.url = self.configuration.get('url')
+ self.user = self.configuration.get('user')
+ self.password = self.configuration.get('pass')
+ self.proxy_user = self.configuration.get('proxy_user')
+ self.proxy_password = self.configuration.get('proxy_pass')
+ self.proxy_url = self.configuration.get('proxy_url')
+ self.header = self.configuration.get('header')
+ self.request_timeout = self.configuration.get('timeout', 1)
+ self._manager = None
+
+ def __make_headers(self, **header_kw):
+ user = header_kw.get('user') or self.user
+ password = header_kw.get('pass') or self.password
+ proxy_user = header_kw.get('proxy_user') or self.proxy_user
+ proxy_password = header_kw.get('proxy_pass') or self.proxy_password
+ custom_header = header_kw.get('header') or self.header
+ header_params = dict(keep_alive=True)
+ proxy_header_params = dict()
+ if user and password:
+ header_params['basic_auth'] = '{user}:{password}'.format(user=user,
+ password=password)
+ if proxy_user and proxy_password:
+ proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user,
+ password=proxy_password)
+ try:
+ header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params)
+ except TypeError as error:
+ self.error('build_header() error: {error}'.format(error=error))
+ return None, None
+ else:
+ header.update(custom_header or dict())
+ return header, proxy_header
+
+ def _build_manager(self, **header_kw):
+ header, proxy_header = self.__make_headers(**header_kw)
+ if header is None or proxy_header is None:
+ return None
+ proxy_url = header_kw.get('proxy_url') or self.proxy_url
+ if proxy_url:
+ manager = urllib3.ProxyManager
+ params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header)
+ else:
+ manager = urllib3.PoolManager
+ params = dict(headers=header)
+ try:
+ url = header_kw.get('url') or self.url
+ if url.startswith('https'):
+ return manager(assert_hostname=False, cert_reqs='CERT_NONE', **params)
+ return manager(**params)
+ except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error:
+ self.error('build_manager() error:', str(error))
+ return None
+
+ def _get_raw_data(self, url=None, manager=None):
+ """
+ Get raw data from http request
+ :return: str
+ """
+ try:
+ url = url or self.url
+ manager = manager or self._manager
+ response = manager.request(method='GET',
+ url=url,
+ timeout=self.request_timeout,
+ retries=1,
+ headers=manager.headers)
+ except (urllib3.exceptions.HTTPError, TypeError, AttributeError) as error:
+ self.error('Url: {url}. Error: {error}'.format(url=url, error=error))
+ return None
+ if response.status == 200:
+ return response.data.decode()
+ self.debug('Url: {url}. Http response status code: {code}'.format(url=url, code=response.status))
+ return None
+
+ def check(self):
+ """
+ Format configuration data and try to connect to server
+ :return: boolean
+ """
+ if not (self.url and isinstance(self.url, str)):
+ self.error('URL is not defined or type is not <str>')
+ return False
+
+ self._manager = self._build_manager()
+ if not self._manager:
+ return False
+
+ try:
+ data = self._get_data()
+ except Exception as error:
+ self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error))
+ return False
+
+ if isinstance(data, dict) and data:
+ return True
+ self.error('_get_data() returned no data or type is not <dict>')
+ return False
diff --git a/python.d/python_modules/bases/FrameworkServices/__init__.py b/python.d/python_modules/bases/FrameworkServices/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python.d/python_modules/bases/FrameworkServices/__init__.py
diff --git a/python.d/python_modules/bases/__init__.py b/python.d/python_modules/bases/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python.d/python_modules/bases/__init__.py
diff --git a/python.d/python_modules/bases/charts.py b/python.d/python_modules/bases/charts.py
new file mode 100644
index 00000000..1e9348e5
--- /dev/null
+++ b/python.d/python_modules/bases/charts.py
@@ -0,0 +1,376 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (l2isbad)
+
+from bases.collection import safe_print
+
+CHART_PARAMS = ['type', 'id', 'name', 'title', 'units', 'family', 'context', 'chart_type']
+DIMENSION_PARAMS = ['id', 'name', 'algorithm', 'multiplier', 'divisor', 'hidden']
+VARIABLE_PARAMS = ['id', 'value']
+
+CHART_TYPES = ['line', 'area', 'stacked']
+DIMENSION_ALGORITHMS = ['absolute', 'incremental', 'percentage-of-absolute-row', 'percentage-of-incremental-row']
+
+CHART_BEGIN = 'BEGIN {type}.{id} {since_last}\n'
+CHART_CREATE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \
+ "{chart_type} {priority} {update_every} '' 'python.d.plugin' '{module_name}'\n"
+CHART_OBSOLETE = "CHART {type}.{id} '{name}' '{title}' '{units}' '{family}' '{context}' " \
+ "{chart_type} {priority} {update_every} 'obsolete'\n"
+
+
+DIMENSION_CREATE = "DIMENSION '{id}' '{name}' {algorithm} {multiplier} {divisor} '{hidden}'\n"
+DIMENSION_SET = "SET '{id}' = {value}\n"
+
+CHART_VARIABLE_SET = "VARIABLE CHART '{id}' = {value}\n"
+
+RUNTIME_CHART_CREATE = "CHART netdata.runtime_{job_name} '' 'Execution time for {job_name}' 'ms' 'python.d' " \
+ "netdata.pythond_runtime line 145000 {update_every}\n" \
+ "DIMENSION run_time 'run time' absolute 1 1\n"
+
+
+def create_runtime_chart(func):
+ """
+ Calls a wrapped function, then prints runtime chart to stdout.
+
+ Used as a decorator for SimpleService.create() method.
+ The whole point of making 'create runtime chart' functionality as a decorator was
+ to help users who re-implements create() in theirs classes.
+
+ :param func: class method
+ :return:
+ """
+ def wrapper(*args, **kwargs):
+ self = args[0]
+ ok = func(*args, **kwargs)
+ if ok:
+ safe_print(RUNTIME_CHART_CREATE.format(job_name=self.name,
+ update_every=self._runtime_counters.FREQ))
+ return ok
+ return wrapper
+
+
+class ChartError(Exception):
+ """Base-class for all exceptions raised by this module"""
+
+
+class DuplicateItemError(ChartError):
+ """Occurs when user re-adds a chart or a dimension that has already been added"""
+
+
+class ItemTypeError(ChartError):
+ """Occurs when user passes value of wrong type to Chart, Dimension or ChartVariable class"""
+
+
+class ItemValueError(ChartError):
+ """Occurs when user passes inappropriate value to Chart, Dimension or ChartVariable class"""
+
+
+class Charts:
+ """Represent a collection of charts
+
+ All charts stored in a dict.
+ Chart is a instance of Chart class.
+ Charts adding must be done using Charts.add_chart() method only"""
+ def __init__(self, job_name, priority, cleanup, get_update_every, module_name):
+ """
+ :param job_name: <bound method>
+ :param priority: <int>
+ :param get_update_every: <bound method>
+ """
+ self.job_name = job_name
+ self.priority = priority
+ self.cleanup = cleanup
+ self.get_update_every = get_update_every
+ self.module_name = module_name
+ self.charts = dict()
+
+ def __len__(self):
+ return len(self.charts)
+
+ def __iter__(self):
+ return iter(self.charts.values())
+
+ def __repr__(self):
+ return 'Charts({0})'.format(self)
+
+ def __str__(self):
+ return str([chart for chart in self.charts])
+
+ def __contains__(self, item):
+ return item in self.charts
+
+ def __getitem__(self, item):
+ return self.charts[item]
+
+ def __delitem__(self, key):
+ del self.charts[key]
+
+ def __bool__(self):
+ return bool(self.charts)
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def add_chart(self, params):
+ """
+ Create Chart instance and add it to the dict
+
+ Manually adds job name, priority and update_every to params.
+ :param params: <list>
+ :return:
+ """
+ params = [self.job_name()] + params
+ new_chart = Chart(params)
+
+ new_chart.params['update_every'] = self.get_update_every()
+ new_chart.params['priority'] = self.priority
+ new_chart.params['module_name'] = self.module_name
+
+ self.priority += 1
+ self.charts[new_chart.id] = new_chart
+
+ return new_chart
+
+ def active_charts(self):
+ return [chart.id for chart in self if not chart.flags.obsoleted]
+
+
+class Chart:
+ """Represent a chart"""
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'chart' must be a list type")
+ if not len(params) >= 8:
+ raise ItemValueError("invalid value for 'chart', must be {0}".format(CHART_PARAMS))
+
+ self.params = dict(zip(CHART_PARAMS, (p or str() for p in params)))
+ self.name = '{type}.{id}'.format(type=self.params['type'],
+ id=self.params['id'])
+ if self.params.get('chart_type') not in CHART_TYPES:
+ self.params['chart_type'] = 'absolute'
+
+ self.dimensions = list()
+ self.variables = set()
+ self.flags = ChartFlags()
+ self.penalty = 0
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __repr__(self):
+ return 'Chart({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __iter__(self):
+ return iter(self.dimensions)
+
+ def __contains__(self, item):
+ return item in [dimension.id for dimension in self.dimensions]
+
+ def add_variable(self, variable):
+ """
+ :param variable: <list>
+ :return:
+ """
+ self.variables.add(ChartVariable(variable))
+
+ def add_dimension(self, dimension):
+ """
+ :param dimension: <list>
+ :return:
+ """
+ dim = Dimension(dimension)
+
+ if dim.id in self:
+ raise DuplicateItemError("'{dimension}' already in '{chart}' dimensions".format(dimension=dim.id,
+ chart=self.name))
+ self.refresh()
+ self.dimensions.append(dim)
+ return dim
+
+ def hide_dimension(self, dimension_id, reverse=False):
+ if dimension_id in self:
+ idx = self.dimensions.index(dimension_id)
+ dimension = self.dimensions[idx]
+ dimension.params['hidden'] = 'hidden' if not reverse else str()
+ self.refresh()
+
+ def create(self):
+ """
+ :return:
+ """
+ chart = CHART_CREATE.format(**self.params)
+ dimensions = ''.join([dimension.create() for dimension in self.dimensions])
+ variables = ''.join([var.set(var.value) for var in self.variables if var])
+
+ self.flags.push = False
+ self.flags.created = True
+
+ safe_print(chart + dimensions + variables)
+
+ def update(self, data, interval):
+ updated_dimensions, updated_variables = str(), str()
+
+ for dim in self.dimensions:
+ value = dim.get_value(data)
+ if value is not None:
+ updated_dimensions += dim.set(value)
+
+ for var in self.variables:
+ value = var.get_value(data)
+ if value is not None:
+ updated_variables += var.set(value)
+
+ if updated_dimensions:
+ since_last = interval if self.flags.updated else 0
+
+ if self.flags.push:
+ self.create()
+
+ chart_begin = CHART_BEGIN.format(type=self.type, id=self.id, since_last=since_last)
+ safe_print(chart_begin, updated_dimensions, updated_variables, 'END\n')
+
+ self.flags.updated = True
+ self.penalty = 0
+ else:
+ self.penalty += 1
+ self.flags.updated = False
+
+ return bool(updated_dimensions)
+
+ def obsolete(self):
+ self.flags.obsoleted = True
+ if self.flags.created:
+ safe_print(CHART_OBSOLETE.format(**self.params))
+
+ def refresh(self):
+ self.penalty = 0
+ self.flags.push = True
+ self.flags.obsoleted = False
+
+
+class Dimension:
+ """Represent a dimension"""
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'dimension' must be a list type")
+ if not params:
+ raise ItemValueError("invalid value for 'dimension', must be {0}".format(DIMENSION_PARAMS))
+
+ self.params = dict(zip(DIMENSION_PARAMS, (p or str() for p in params)))
+ self.params['name'] = self.params.get('name') or self.params['id']
+
+ if self.params.get('algorithm') not in DIMENSION_ALGORITHMS:
+ self.params['algorithm'] = 'absolute'
+ if not isinstance(self.params.get('multiplier'), int):
+ self.params['multiplier'] = 1
+ if not isinstance(self.params.get('divisor'), int):
+ self.params['divisor'] = 1
+ self.params.setdefault('hidden', '')
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __repr__(self):
+ return 'Dimension({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __eq__(self, other):
+ if not isinstance(other, Dimension):
+ return self.id == other
+ return self.id == other.id
+
+ def create(self):
+ return DIMENSION_CREATE.format(**self.params)
+
+ def set(self, value):
+ """
+ :param value: <str>: must be a digit
+ :return:
+ """
+ return DIMENSION_SET.format(id=self.id,
+ value=value)
+
+ def get_value(self, data):
+ try:
+ return int(data[self.id])
+ except (KeyError, TypeError):
+ return None
+
+
+class ChartVariable:
+ """Represent a chart variable"""
+ def __init__(self, params):
+ """
+ :param params: <list>
+ """
+ if not isinstance(params, list):
+ raise ItemTypeError("'variable' must be a list type")
+ if not params:
+ raise ItemValueError("invalid value for 'variable' must be: {0}".format(VARIABLE_PARAMS))
+
+ self.params = dict(zip(VARIABLE_PARAMS, params))
+ self.params.setdefault('value', None)
+
+ def __getattr__(self, item):
+ try:
+ return self.params[item]
+ except KeyError:
+ raise AttributeError("'{instance}' has no attribute '{attr}'".format(instance=repr(self),
+ attr=item))
+
+ def __bool__(self):
+ return self.value is not None
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def __repr__(self):
+ return 'ChartVariable({0})'.format(self.id)
+
+ def __str__(self):
+ return self.id
+
+ def __eq__(self, other):
+ if isinstance(other, ChartVariable):
+ return self.id == other.id
+ return False
+
+ def __hash__(self):
+ return hash(repr(self))
+
+ def set(self, value):
+ return CHART_VARIABLE_SET.format(id=self.id,
+ value=value)
+
+ def get_value(self, data):
+ try:
+ return int(data[self.id])
+ except (KeyError, TypeError):
+ return None
+
+
+class ChartFlags:
+ def __init__(self):
+ self.push = True
+ self.created = False
+ self.updated = False
+ self.obsoleted = False
diff --git a/python.d/python_modules/bases/collection.py b/python.d/python_modules/bases/collection.py
new file mode 100644
index 00000000..e03b4f58
--- /dev/null
+++ b/python.d/python_modules/bases/collection.py
@@ -0,0 +1,144 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (l2isbad)
+
+import os
+
+PATH = os.getenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin').split(':')
+
+CHART_BEGIN = 'BEGIN {0} {1}\n'
+CHART_CREATE = "CHART {0} '{1}' '{2}' '{3}' '{4}' '{5}' {6} {7} {8}\n"
+DIMENSION_CREATE = "DIMENSION '{0}' '{1}' {2} {3} {4} '{5}'\n"
+DIMENSION_SET = "SET '{0}' = {1}\n"
+
+
+def setdefault_values(config, base_dict):
+ for key, value in base_dict.items():
+ config.setdefault(key, value)
+ return config
+
+
+def run_and_exit(func):
+ def wrapper(*args, **kwargs):
+ func(*args, **kwargs)
+ exit(1)
+ return wrapper
+
+
+def on_try_except_finally(on_except=(None, ), on_finally=(None, )):
+ except_func = on_except[0]
+ finally_func = on_finally[0]
+
+ def decorator(func):
+ def wrapper(*args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except Exception:
+ if except_func:
+ except_func(*on_except[1:])
+ finally:
+ if finally_func:
+ finally_func(*on_finally[1:])
+ return wrapper
+ return decorator
+
+
+def static_vars(**kwargs):
+ def decorate(func):
+ for k in kwargs:
+ setattr(func, k, kwargs[k])
+ return func
+ return decorate
+
+
+@on_try_except_finally(on_except=(exit, 1))
+def safe_print(*msg):
+ """
+ :param msg:
+ :return:
+ """
+ print(''.join(msg))
+
+
+def find_binary(binary):
+ """
+ :param binary: <str>
+ :return:
+ """
+ for directory in PATH:
+ binary_name = '/'.join([directory, binary])
+ if os.path.isfile(binary_name) and os.access(binary_name, os.X_OK):
+ return binary_name
+ return None
+
+
+def read_last_line(f):
+ with open(f, 'rb') as opened:
+ opened.seek(-2, 2)
+ while opened.read(1) != b'\n':
+ opened.seek(-2, 1)
+ if opened.tell() == 0:
+ break
+ result = opened.readline()
+ return result.decode()
+
+
+class OldVersionCompatibility:
+
+ def __init__(self):
+ self._data_stream = str()
+
+ def begin(self, type_id, microseconds=0):
+ """
+ :param type_id: <str>
+ :param microseconds: <str> or <int>: must be a digit
+ :return:
+ """
+ self._data_stream += CHART_BEGIN.format(type_id, microseconds)
+
+ def set(self, dim_id, value):
+ """
+ :param dim_id: <str>
+ :param value: <int> or <str>: must be a digit
+ :return:
+ """
+ self._data_stream += DIMENSION_SET.format(dim_id, value)
+
+ def end(self):
+ self._data_stream += 'END\n'
+
+ def chart(self, type_id, name='', title='', units='', family='', category='', chart_type='line',
+ priority='', update_every=''):
+ """
+ :param type_id: <str>
+ :param name: <str>
+ :param title: <str>
+ :param units: <str>
+ :param family: <str>
+ :param category: <str>
+ :param chart_type: <str>
+ :param priority: <str> or <int>
+ :param update_every: <str> or <int>
+ :return:
+ """
+ self._data_stream += CHART_CREATE.format(type_id, name, title, units,
+ family, category, chart_type,
+ priority, update_every)
+
+ def dimension(self, dim_id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
+ """
+ :param dim_id: <str>
+ :param name: <str> or None
+ :param algorithm: <str>
+ :param multiplier: <str> or <int>: must be a digit
+ :param divisor: <str> or <int>: must be a digit
+ :param hidden: <str>: literally "hidden" or ""
+ :return:
+ """
+ self._data_stream += DIMENSION_CREATE.format(dim_id, name or dim_id, algorithm,
+ multiplier, divisor, hidden or str())
+
+ @on_try_except_finally(on_except=(exit, 1))
+ def commit(self):
+ print(self._data_stream)
+ self._data_stream = str()
diff --git a/python.d/python_modules/bases/loaders.py b/python.d/python_modules/bases/loaders.py
new file mode 100644
index 00000000..d18b9dcd
--- /dev/null
+++ b/python.d/python_modules/bases/loaders.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (l2isbad)
+
+import types
+from sys import version_info
+
+PY_VERSION = version_info[:2]
+
+if PY_VERSION > (3, 1):
+ from pyyaml3 import SafeLoader as YamlSafeLoader
+ from importlib.machinery import SourceFileLoader
+ DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
+else:
+ from pyyaml2 import SafeLoader as YamlSafeLoader
+ from imp import load_source as SourceFileLoader
+ DEFAULT_MAPPING_TAG = u'tag:yaml.org,2002:map'
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
+
+
+def dict_constructor(loader, node):
+ return OrderedDict(loader.construct_pairs(node))
+
+
+YamlSafeLoader.add_constructor(DEFAULT_MAPPING_TAG, dict_constructor)
+
+
+class YamlOrderedLoader:
+ @staticmethod
+ def load_config_from_file(file_name):
+ opened, loaded = False, False
+ try:
+ stream = open(file_name, 'r')
+ opened = True
+ loader = YamlSafeLoader(stream)
+ loaded = True
+ parsed = loader.get_single_data() or dict()
+ except Exception as error:
+ return dict(), error
+ else:
+ return parsed, None
+ finally:
+ if opened:
+ stream.close()
+ if loaded:
+ loader.dispose()
+
+
+class SourceLoader:
+ @staticmethod
+ def load_module_from_file(name, path):
+ try:
+ loaded = SourceFileLoader(name, path)
+ if isinstance(loaded, types.ModuleType):
+ return loaded, None
+ return loaded.load_module(), None
+ except Exception as error:
+ return None, error
+
+
+class ModuleAndConfigLoader(YamlOrderedLoader, SourceLoader):
+ pass
diff --git a/python.d/python_modules/bases/loggers.py b/python.d/python_modules/bases/loggers.py
new file mode 100644
index 00000000..fc40b83d
--- /dev/null
+++ b/python.d/python_modules/bases/loggers.py
@@ -0,0 +1,205 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Ilya Mashchenko (l2isbad)
+
+import logging
+import traceback
+
+from sys import exc_info
+
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
+
+from bases.collection import on_try_except_finally
+
+
+LOGGING_LEVELS = {'CRITICAL': 50,
+ 'ERROR': 40,
+ 'WARNING': 30,
+ 'INFO': 20,
+ 'DEBUG': 10,
+ 'NOTSET': 0}
+
+DEFAULT_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s : %(message)s'
+DEFAULT_LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
+
+PYTHON_D_LOG_LINE_FORMAT = '%(asctime)s: %(name)s %(levelname)s: %(module_name)s: %(job_name)s: %(message)s'
+PYTHON_D_LOG_NAME = 'python.d'
+
+
+def limiter(log_max_count=30, allowed_in_seconds=60):
+ def on_decorator(func):
+
+ def on_call(*args):
+ current_time = args[0]._runtime_counters.START_RUN
+ lc = args[0]._logger_counters
+
+ if lc.logged and lc.logged % log_max_count == 0:
+ if current_time - lc.time_to_compare <= allowed_in_seconds:
+ lc.dropped += 1
+ return
+ lc.time_to_compare = current_time
+
+ lc.logged += 1
+ func(*args)
+
+ return on_call
+ return on_decorator
+
+
+def add_traceback(func):
+ def on_call(*args):
+ self = args[0]
+
+ if not self.log_traceback:
+ func(*args)
+ else:
+ if exc_info()[0]:
+ func(*args)
+ func(self, traceback.format_exc())
+ else:
+ func(*args)
+
+ return on_call
+
+
+class LoggerCounters:
+ def __init__(self):
+ self.logged = 0
+ self.dropped = 0
+ self.time_to_compare = time()
+
+ def __repr__(self):
+ return 'LoggerCounter(logged: {logged}, dropped: {dropped})'.format(logged=self.logged,
+ dropped=self.dropped)
+
+
+class BaseLogger(object):
+ def __init__(self, logger_name, log_fmt=DEFAULT_LOG_LINE_FORMAT, date_fmt=DEFAULT_LOG_TIME_FORMAT,
+ handler=logging.StreamHandler):
+ """
+ :param logger_name: <str>
+ :param log_fmt: <str>
+ :param date_fmt: <str>
+ :param handler: <logging handler>
+ """
+ self.logger = logging.getLogger(logger_name)
+ if not self.has_handlers():
+ self.severity = 'INFO'
+ self.logger.addHandler(handler())
+ self.set_formatter(fmt=log_fmt, date_fmt=date_fmt)
+
+ def __repr__(self):
+ return '<Logger: {name})>'.format(name=self.logger.name)
+
+ def set_formatter(self, fmt, date_fmt=DEFAULT_LOG_TIME_FORMAT):
+ """
+ :param fmt: <str>
+ :param date_fmt: <str>
+ :return:
+ """
+ if self.has_handlers():
+ self.logger.handlers[0].setFormatter(logging.Formatter(fmt=fmt, datefmt=date_fmt))
+
+ def has_handlers(self):
+ return self.logger.handlers
+
+ @property
+ def severity(self):
+ return self.logger.getEffectiveLevel()
+
+ @severity.setter
+ def severity(self, level):
+ """
+ :param level: <str> or <int>
+ :return:
+ """
+ if level in LOGGING_LEVELS:
+ self.logger.setLevel(LOGGING_LEVELS[level])
+
+ def debug(self, *msg, **kwargs):
+ self.logger.debug(' '.join(map(str, msg)), **kwargs)
+
+ def info(self, *msg, **kwargs):
+ self.logger.info(' '.join(map(str, msg)), **kwargs)
+
+ def warning(self, *msg, **kwargs):
+ self.logger.warning(' '.join(map(str, msg)), **kwargs)
+
+ def error(self, *msg, **kwargs):
+ self.logger.error(' '.join(map(str, msg)), **kwargs)
+
+ def alert(self, *msg, **kwargs):
+ self.logger.critical(' '.join(map(str, msg)), **kwargs)
+
+ @on_try_except_finally(on_finally=(exit, 1))
+ def fatal(self, *msg, **kwargs):
+ self.logger.critical(' '.join(map(str, msg)), **kwargs)
+
+
+class PythonDLogger(object):
+ def __init__(self, logger_name=PYTHON_D_LOG_NAME, log_fmt=PYTHON_D_LOG_LINE_FORMAT):
+ """
+ :param logger_name: <str>
+ :param log_fmt: <str>
+ """
+ self.logger = BaseLogger(logger_name, log_fmt=log_fmt)
+ self.module_name = 'plugin'
+ self.job_name = 'main'
+ self._logger_counters = LoggerCounters()
+
+ _LOG_TRACEBACK = False
+
+ @property
+ def log_traceback(self):
+ return PythonDLogger._LOG_TRACEBACK
+
+ @log_traceback.setter
+ def log_traceback(self, value):
+ PythonDLogger._LOG_TRACEBACK = value
+
+ def debug(self, *msg):
+ self.logger.debug(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def info(self, *msg):
+ self.logger.info(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def warning(self, *msg):
+ self.logger.warning(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ @add_traceback
+ def error(self, *msg):
+ self.logger.error(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ @add_traceback
+ def alert(self, *msg):
+ self.logger.alert(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+ def fatal(self, *msg):
+ self.logger.fatal(*msg, extra={'module_name': self.module_name,
+ 'job_name': self.job_name or self.module_name})
+
+
+class PythonDLimitedLogger(PythonDLogger):
+ @limiter()
+ def info(self, *msg):
+ PythonDLogger.info(self, *msg)
+
+ @limiter()
+ def warning(self, *msg):
+ PythonDLogger.warning(self, *msg)
+
+ @limiter()
+ def error(self, *msg):
+ PythonDLogger.error(self, *msg)
+
+ @limiter()
+ def alert(self, *msg):
+ PythonDLogger.alert(self, *msg)
diff --git a/python.d/python_modules/msg.py b/python.d/python_modules/msg.py
deleted file mode 100644
index 74716770..00000000
--- a/python.d/python_modules/msg.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# -*- coding: utf-8 -*-
-# Description: logging for netdata python.d modules
-
-import traceback
-import sys
-from time import time, strftime
-
-DEBUG_FLAG = False
-TRACE_FLAG = False
-PROGRAM = ""
-LOG_COUNTER = 0
-LOG_THROTTLE = 10000 # has to be too big during init
-LOG_INTERVAL = 1 # has to be too low during init
-LOG_NEXT_CHECK = 0
-
-WRITE = sys.stderr.write
-FLUSH = sys.stderr.flush
-
-
-def log_msg(msg_type, *args):
- """
- Print message on stderr.
- :param msg_type: str
- """
- global LOG_COUNTER
- global LOG_THROTTLE
- global LOG_INTERVAL
- global LOG_NEXT_CHECK
- now = time()
-
- if not DEBUG_FLAG:
- LOG_COUNTER += 1
-
- # WRITE("COUNTER " + str(LOG_COUNTER) + " THROTTLE " + str(LOG_THROTTLE) + " INTERVAL " + str(LOG_INTERVAL) + " NOW " + str(now) + " NEXT " + str(LOG_NEXT_CHECK) + "\n")
-
- if LOG_COUNTER <= LOG_THROTTLE or msg_type == "FATAL" or msg_type == "ALERT":
- timestamp = strftime('%Y-%m-%d %X')
- msg = "%s: %s %s: %s" % (timestamp, PROGRAM, str(msg_type), " ".join(args))
- WRITE(msg + "\n")
- FLUSH()
- elif LOG_COUNTER == LOG_THROTTLE + 1:
- timestamp = strftime('%Y-%m-%d %X')
- msg = "%s: python.d.plugin: throttling further log messages for %s seconds" % (timestamp, str(int(LOG_NEXT_CHECK + 0.5) - int(now)))
- WRITE(msg + "\n")
- FLUSH()
-
- if LOG_NEXT_CHECK <= now:
- if LOG_COUNTER >= LOG_THROTTLE:
- timestamp = strftime('%Y-%m-%d %X')
- msg = "%s: python.d.plugin: Prevented %s log messages from displaying" % (timestamp, str(LOG_COUNTER - LOG_THROTTLE))
- WRITE(msg + "\n")
- FLUSH()
- LOG_NEXT_CHECK = now - (now % LOG_INTERVAL) + LOG_INTERVAL
- LOG_COUNTER = 0
-
- if TRACE_FLAG:
- if msg_type == "FATAL" or msg_type == "ERROR" or msg_type == "ALERT":
- traceback.print_exc()
-
-
-def debug(*args):
- """
- Print debug message on stderr.
- """
- if not DEBUG_FLAG:
- return
-
- log_msg("DEBUG", *args)
-
-
-def error(*args):
- """
- Print message on stderr.
- """
- log_msg("ERROR", *args)
-
-
-def alert(*args):
- """
- Print message on stderr.
- """
- log_msg("ALERT", *args)
-
-
-def info(*args):
- """
- Print message on stderr.
- """
- log_msg("INFO", *args)
-
-
-def fatal(*args):
- """
- Print message on stderr and exit.
- """
- try:
- log_msg("FATAL", *args)
- print('DISABLE')
- except:
- pass
- sys.exit(1)
diff --git a/python.d/python_modules/third_party/__init__.py b/python.d/python_modules/third_party/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python.d/python_modules/third_party/__init__.py
diff --git a/python.d/python_modules/lm_sensors.py b/python.d/python_modules/third_party/lm_sensors.py
index 1d868f0e..1d868f0e 100644
--- a/python.d/python_modules/lm_sensors.py
+++ b/python.d/python_modules/third_party/lm_sensors.py
diff --git a/python.d/python_modules/third_party/ordereddict.py b/python.d/python_modules/third_party/ordereddict.py
new file mode 100644
index 00000000..d0b97d47
--- /dev/null
+++ b/python.d/python_modules/third_party/ordereddict.py
@@ -0,0 +1,128 @@
+# Copyright (c) 2009 Raymond Hettinger
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+from UserDict import DictMixin
+
+
+class OrderedDict(dict, DictMixin):
+
+ def __init__(self, *args, **kwds):
+ if len(args) > 1:
+ raise TypeError('expected at most 1 arguments, got %d' % len(args))
+ try:
+ self.__end
+ except AttributeError:
+ self.clear()
+ self.update(*args, **kwds)
+
+ def clear(self):
+ self.__end = end = []
+ end += [None, end, end] # sentinel node for doubly linked list
+ self.__map = {} # key --> [key, prev, next]
+ dict.clear(self)
+
+ def __setitem__(self, key, value):
+ if key not in self:
+ end = self.__end
+ curr = end[1]
+ curr[2] = end[1] = self.__map[key] = [key, curr, end]
+ dict.__setitem__(self, key, value)
+
+ def __delitem__(self, key):
+ dict.__delitem__(self, key)
+ key, prev, next = self.__map.pop(key)
+ prev[2] = next
+ next[1] = prev
+
+ def __iter__(self):
+ end = self.__end
+ curr = end[2]
+ while curr is not end:
+ yield curr[0]
+ curr = curr[2]
+
+ def __reversed__(self):
+ end = self.__end
+ curr = end[1]
+ while curr is not end:
+ yield curr[0]
+ curr = curr[1]
+
+ def popitem(self, last=True):
+ if not self:
+ raise KeyError('dictionary is empty')
+ if last:
+ key = reversed(self).next()
+ else:
+ key = iter(self).next()
+ value = self.pop(key)
+ return key, value
+
+ def __reduce__(self):
+ items = [[k, self[k]] for k in self]
+ tmp = self.__map, self.__end
+ del self.__map, self.__end
+ inst_dict = vars(self).copy()
+ self.__map, self.__end = tmp
+ if inst_dict:
+ return self.__class__, (items,), inst_dict
+ return self.__class__, (items,)
+
+ def keys(self):
+ return list(self)
+
+ setdefault = DictMixin.setdefault
+ update = DictMixin.update
+ pop = DictMixin.pop
+ values = DictMixin.values
+ items = DictMixin.items
+ iterkeys = DictMixin.iterkeys
+ itervalues = DictMixin.itervalues
+ iteritems = DictMixin.iteritems
+
+ def __repr__(self):
+ if not self:
+ return '%s()' % (self.__class__.__name__,)
+ return '%s(%r)' % (self.__class__.__name__, self.items())
+
+ def copy(self):
+ return self.__class__(self)
+
+ @classmethod
+ def fromkeys(cls, iterable, value=None):
+ d = cls()
+ for key in iterable:
+ d[key] = value
+ return d
+
+ def __eq__(self, other):
+ if isinstance(other, OrderedDict):
+ if len(self) != len(other):
+ return False
+ for p, q in zip(self.items(), other.items()):
+ if p != q:
+ return False
+ return True
+ return dict.__eq__(self, other)
+
+ def __ne__(self, other):
+ return not self == other