summaryrefslogtreecommitdiffstats
path: root/plugins.d/python.d.plugin
diff options
context:
space:
mode:
Diffstat (limited to 'plugins.d/python.d.plugin')
-rwxr-xr-xplugins.d/python.d.plugin533
1 files changed, 533 insertions, 0 deletions
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin
new file mode 100755
index 000000000..5e81fb263
--- /dev/null
+++ b/plugins.d/python.d.plugin
@@ -0,0 +1,533 @@
+#!/usr/bin/env bash
+'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
+# -*- coding: utf-8 -*-
+
+# Description: netdata python modules supervisor
+# Author: Pawel Krupa (paulfantom)
+
+import os
+import sys
+import time
+import threading
+
+# -----------------------------------------------------------------------------
+# globals & environment setup
+# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
+MODULE_EXTENSION = ".chart.py"
+BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
+ 'priority': 90000,
+ 'retries': 10}
+
+MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
+ os.path.dirname(__file__)) + "/../python.d") + "/"
+CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
+# directories should end with '/'
+if CONFIG_DIR[-1] != "/":
+ CONFIG_DIR += "/"
+sys.path.append(MODULES_DIR + "python_modules")
+
+PROGRAM = os.path.basename(__file__).replace(".plugin", "")
+DEBUG_FLAG = False
+OVERRIDE_UPDATE_EVERY = False
+
+# -----------------------------------------------------------------------------
+# custom, third party and version specific python modules management
+import msg
+
+try:
+ assert sys.version_info >= (3, 1)
+ import importlib.machinery
+ PY_VERSION = 3
+ # change this hack below if we want PY_VERSION to be used in modules
+ # import builtins
+ # builtins.PY_VERSION = 3
+ msg.info('Using python v3')
+except (AssertionError, ImportError):
+ try:
+ import imp
+
+ # change this hack below if we want PY_VERSION to be used in modules
+ # import __builtin__
+ # __builtin__.PY_VERSION = 2
+ PY_VERSION = 2
+ msg.info('Using python v2')
+ except ImportError:
+ msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+# try:
+# import yaml
+# except ImportError:
+# msg.fatal('Cannot find yaml library')
+try:
+ if PY_VERSION == 3:
+ import pyyaml3 as yaml
+ else:
+ import pyyaml2 as yaml
+except ImportError:
+ msg.fatal('Cannot find yaml library')
+
+
+class PythonCharts(object):
+ """
+ Main class used to control every python module.
+ """
+
+ def __init__(self,
+ modules=None,
+ modules_path='../python.d/',
+ modules_configs='../conf.d/',
+ modules_disabled=None):
+ """
+ :param modules: list
+ :param modules_path: str
+ :param modules_configs: str
+ :param modules_disabled: list
+ """
+
+ if modules is None:
+ modules = []
+ if modules_disabled is None:
+ modules_disabled = []
+
+ self.first_run = True
+ # set configuration directory
+ self.configs = modules_configs
+
+ # load modules
+ loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
+
+ # load configuration files
+ configured_modules = self._load_configs(loaded_modules)
+
+ # good economy and prosperity:
+ self.jobs = self._create_jobs(configured_modules) # type: list
+
+ # enable timetable override like `python.d.plugin mysql debug 1`
+ if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
+ for job in self.jobs:
+ job.create_timetable(BASE_CONFIG['update_every'])
+
+ @staticmethod
+ def _import_module(path, name=None):
+ """
+ Try to import module using only its path.
+ :param path: str
+ :param name: str
+ :return: object
+ """
+
+ if name is None:
+ name = path.split('/')[-1]
+ if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
+ return None
+ name = name[:-len(MODULE_EXTENSION)]
+ try:
+ if PY_VERSION == 3:
+ return importlib.machinery.SourceFileLoader(name, path).load_module()
+ else:
+ return imp.load_source(name, path)
+ except Exception as e:
+ msg.error("Problem loading", name, str(e))
+ return None
+
+ def _load_modules(self, path, modules, disabled):
+ """
+ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
+ :param path: str
+ :param modules: list
+ :param disabled: list
+ :return: list
+ """
+
+ # check if plugin directory exists
+ if not os.path.isdir(path):
+ msg.fatal("cannot find charts directory ", path)
+
+ # load modules
+ loaded = []
+ if len(modules) > 0:
+ for m in modules:
+ if m in disabled:
+ continue
+ mod = self._import_module(path + m + MODULE_EXTENSION)
+ if mod is not None:
+ loaded.append(mod)
+ else: # exit if plugin is not found
+ msg.fatal('no modules found.')
+ else:
+ # scan directory specified in path and load all modules from there
+ names = os.listdir(path)
+ for mod in names:
+ if mod.replace(MODULE_EXTENSION, "") in disabled:
+ msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
+ continue
+ m = self._import_module(path + mod)
+ if m is not None:
+ msg.debug(mod + ": loading module '" + path + mod + "'")
+ loaded.append(m)
+ return loaded
+
+ def _load_configs(self, modules):
+ """
+ Append configuration in list named `config` to every module.
+ For multi-job modules `config` list is created in _parse_config,
+ otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
+ :param modules: list
+ :return: list
+ """
+ for mod in modules:
+ configfile = self.configs + mod.__name__ + ".conf"
+ if os.path.isfile(configfile):
+ msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
+ try:
+ if not hasattr(mod, 'config'):
+ mod.config = {}
+ setattr(mod,
+ 'config',
+ self._parse_config(mod, read_config(configfile)))
+ except Exception as e:
+ msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+ else:
+ msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
+ # set config if not found
+ if not hasattr(mod, 'config'):
+ msg.debug(mod.__name__ + ": setting configuration for only one job")
+ mod.config = {None: {}}
+ for var in BASE_CONFIG:
+ try:
+ mod.config[None][var] = getattr(mod, var)
+ except AttributeError:
+ mod.config[None][var] = BASE_CONFIG[var]
+ return modules
+
+ @staticmethod
+ def _parse_config(module, config):
+ """
+ Parse configuration file or extract configuration from module file.
+ Example of returned dictionary:
+ config = {'name': {
+ 'update_every': 2,
+ 'retries': 3,
+ 'priority': 30000
+ 'other_val': 123}}
+ :param module: object
+ :param config: dict
+ :return: dict
+ """
+ if config is None:
+ config = {}
+ # get default values
+ defaults = {}
+ msg.debug(module.__name__ + ": reading configuration")
+ for key in BASE_CONFIG:
+ try:
+ # get defaults from module config
+ defaults[key] = int(config.pop(key))
+ except (KeyError, ValueError):
+ try:
+ # get defaults from module source code
+ defaults[key] = getattr(module, key)
+ except (KeyError, ValueError, AttributeError):
+ # if above failed, get defaults from global dict
+ defaults[key] = BASE_CONFIG[key]
+
+ # check if there are dict in config dict
+ many_jobs = False
+ for name in config:
+ if type(config[name]) is dict:
+ many_jobs = True
+ break
+
+ # assign variables needed by supervisor to every job configuration
+ if many_jobs:
+ for name in config:
+ for key in defaults:
+ if key not in config[name]:
+ config[name][key] = defaults[key]
+ # if only one job is needed, values doesn't have to be in dict (in YAML)
+ else:
+ config = {None: config.copy()}
+ config[None].update(defaults)
+
+ # return dictionary of jobs where every job has BASE_CONFIG variables
+ return config
+
+ @staticmethod
+ def _create_jobs(modules):
+ """
+ Create jobs based on module.config dictionary and module.Service class definition.
+ :param modules: list
+ :return: list
+ """
+ jobs = []
+ for module in modules:
+ for name in module.config:
+ # register a new job
+ conf = module.config[name]
+ try:
+ job = module.Service(configuration=conf, name=name)
+ except Exception as e:
+ msg.error(module.__name__ +
+ ("/" + str(name) if name is not None else "") +
+ ": cannot start job: '" +
+ str(e))
+ return None
+ else:
+ # set chart_name (needed to plot run time graphs)
+ job.chart_name = module.__name__
+ if name is not None:
+ job.chart_name += "_" + name
+ jobs.append(job)
+ msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
+
+ return [j for j in jobs if j is not None]
+
+ def _stop(self, job, reason=None):
+ """
+ Stop specified job and remove it from self.jobs list
+ Also notifies user about job failure if DEBUG_FLAG is set
+ :param job: object
+ :param reason: str
+ """
+ prefix = job.__module__
+ if job.name is not None and len(job.name) != 0:
+ prefix += "/" + job.name
+ try:
+ self.jobs.remove(job)
+ msg.info("Disabled", prefix)
+ except Exception as e:
+ msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
+
+ # TODO remove section below and remove `reason`.
+ prefix += ": "
+ if reason is None:
+ return
+ elif reason[:3] == "no ":
+ msg.error(prefix +
+ "does not seem to have " +
+ reason[3:] +
+ "() function. Disabling it.")
+ elif reason[:7] == "failed ":
+ msg.error(prefix +
+ reason[7:] +
+ "() function reports failure.")
+ elif reason[:13] == "configuration":
+ msg.error(prefix +
+ "configuration file '" +
+ self.configs +
+ job.__module__ +
+ ".conf' not found. Using defaults.")
+ elif reason[:11] == "misbehaving":
+ msg.error(prefix + "is " + reason)
+
+ def check(self):
+ """
+ Tries to execute check() on every job.
+ This cannot fail thus it is catching every exception
+ If job.check() fails job is stopped
+ """
+ i = 0
+ overridden = []
+ msg.debug("all job objects", str(self.jobs))
+ while i < len(self.jobs):
+ job = self.jobs[i]
+ try:
+ if not job.check():
+ msg.error(job.chart_name, "check function failed.")
+ self._stop(job)
+ else:
+ msg.debug(job.chart_name, "check succeeded")
+ i += 1
+ try:
+ if job.override_name is not None:
+ new_name = job.__module__ + '_' + job.override_name
+ if new_name in overridden:
+ msg.error(job.override_name + " already exists. Stopping '" + job.name + "'")
+ self._stop(job)
+ i -= 1
+ else:
+ job.name = job.override_name
+ msg.debug(job.chart_name + " changing chart name to: '" + new_name + "'")
+ job.chart_name = new_name
+ overridden.append(job.chart_name)
+ except Exception:
+ pass
+ except AttributeError as e:
+ self._stop(job)
+ msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
+ msg.debug(str(e))
+ except (UnboundLocalError, Exception) as e:
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
+ msg.debug("overridden job names:", str(overridden))
+ msg.debug("all remaining job objects:", str(self.jobs))
+
+ def create(self):
+ """
+ Tries to execute create() on every job.
+ This cannot fail thus it is catching every exception.
+ If job.create() fails job is stopped.
+ This is also creating job run time chart.
+ """
+ i = 0
+ while i < len(self.jobs):
+ job = self.jobs[i]
+ try:
+ if not job.create():
+ msg.error(job.chart_name, "create function failed.")
+ self._stop(job)
+ else:
+ chart = job.chart_name
+ sys.stdout.write(
+ "CHART netdata.plugin_pythond_" +
+ chart +
+ " '' 'Execution time for " +
+ chart +
+ " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
+ str(job.timetable['freq']) +
+ '\n')
+ sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
+ msg.debug("created charts for", job.chart_name)
+ # sys.stdout.flush()
+ i += 1
+ except AttributeError:
+ msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
+ self._stop(job)
+ except (UnboundLocalError, Exception) as e:
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
+
+ def update(self):
+ """
+ Creates and supervises every job thread.
+ This will stay forever and ever and ever forever and ever it'll be the one...
+ """
+ for job in self.jobs:
+ job.start()
+
+ while True:
+ if threading.active_count() <= 1:
+ msg.fatal("no more jobs")
+ time.sleep(1)
+
+
+def read_config(path):
+ """
+ Read YAML configuration from specified file
+ :param path: str
+ :return: dict
+ """
+ try:
+ with open(path, 'r') as stream:
+ config = yaml.load(stream)
+ except (OSError, IOError):
+ msg.error(str(path), "is not a valid configuration file")
+ return None
+ except yaml.YAMLError as e:
+ msg.error(str(path), "is malformed:", e)
+ return None
+ return config
+
+
+def parse_cmdline(directory, *commands):
+ """
+ Parse parameters from command line.
+ :param directory: str
+ :param commands: list of str
+ :return: dict
+ """
+ global DEBUG_FLAG
+ global OVERRIDE_UPDATE_EVERY
+ global BASE_CONFIG
+
+ changed_update = False
+ mods = []
+ for cmd in commands[1:]:
+ if cmd == "check":
+ pass
+ elif cmd == "debug" or cmd == "all":
+ DEBUG_FLAG = True
+ # redirect stderr to stdout?
+ elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
+ #DEBUG_FLAG = True
+ mods.append(cmd.replace(".chart.py", ""))
+ else:
+ try:
+ BASE_CONFIG['update_every'] = int(cmd)
+ changed_update = True
+ except ValueError:
+ pass
+ if changed_update and DEBUG_FLAG:
+ OVERRIDE_UPDATE_EVERY = True
+ msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
+
+ msg.debug("started from", commands[0], "with options:", *commands[1:])
+
+ return mods
+
+
+# if __name__ == '__main__':
+def run():
+ """
+ Main program.
+ """
+ global DEBUG_FLAG, BASE_CONFIG
+
+ # read configuration file
+ disabled = []
+ configfile = CONFIG_DIR + "python.d.conf"
+ msg.PROGRAM = PROGRAM
+ msg.info("reading configuration file:", configfile)
+ log_counter = 200
+ log_interval = 3600
+
+ conf = read_config(configfile)
+ if conf is not None:
+ try:
+ # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
+ if conf['enabled'] is False:
+ msg.fatal('disabled in configuration file.\n')
+ except (KeyError, TypeError):
+ pass
+ try:
+ for param in BASE_CONFIG:
+ BASE_CONFIG[param] = conf[param]
+ except (KeyError, TypeError):
+ pass # use default update_every from NETDATA_UPDATE_EVERY
+ try:
+ DEBUG_FLAG = conf['debug']
+ except (KeyError, TypeError):
+ pass
+ try:
+ log_counter = conf['logs_per_interval']
+ except (KeyError, TypeError):
+ pass
+ try:
+ log_interval = conf['log_interval']
+ except (KeyError, TypeError):
+ pass
+ for k, v in conf.items():
+ if k in ("update_every", "debug", "enabled"):
+ continue
+ if v is False:
+ disabled.append(k)
+
+ # parse passed command line arguments
+ modules = parse_cmdline(MODULES_DIR, *sys.argv)
+ msg.DEBUG_FLAG = DEBUG_FLAG
+ msg.LOG_COUNTER = log_counter
+ msg.LOG_INTERVAL = log_interval
+ msg.info("MODULES_DIR='" + MODULES_DIR +
+ "', CONFIG_DIR='" + CONFIG_DIR +
+ "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+ ", ONLY_MODULES=" + str(modules))
+
+ # run plugins
+ charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
+ charts.check()
+ charts.create()
+ charts.update()
+ msg.fatal("finished")
+
+
+if __name__ == '__main__':
+ run()