diff options
Diffstat (limited to 'plugins.d/python.d.plugin')
-rwxr-xr-x | plugins.d/python.d.plugin | 533 |
1 files changed, 533 insertions, 0 deletions
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin new file mode 100755 index 000000000..5e81fb263 --- /dev/null +++ b/plugins.d/python.d.plugin @@ -0,0 +1,533 @@ +#!/usr/bin/env bash +'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' +# -*- coding: utf-8 -*- + +# Description: netdata python modules supervisor +# Author: Pawel Krupa (paulfantom) + +import os +import sys +import time +import threading + +# ----------------------------------------------------------------------------- +# globals & environment setup +# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables +MODULE_EXTENSION = ".chart.py" +BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), + 'priority': 90000, + 'retries': 10} + +MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', + os.path.dirname(__file__)) + "/../python.d") + "/" +CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") +# directories should end with '/' +if CONFIG_DIR[-1] != "/": + CONFIG_DIR += "/" +sys.path.append(MODULES_DIR + "python_modules") + +PROGRAM = os.path.basename(__file__).replace(".plugin", "") +DEBUG_FLAG = False +OVERRIDE_UPDATE_EVERY = False + +# ----------------------------------------------------------------------------- +# custom, third party and version specific python modules management +import msg + +try: + assert sys.version_info >= (3, 1) + import importlib.machinery + PY_VERSION = 3 + # change this hack below if we want PY_VERSION to be used in modules + # import builtins + # builtins.PY_VERSION = 3 + msg.info('Using python v3') +except (AssertionError, ImportError): + try: + import imp + + # change this hack below if we want PY_VERSION to be used in modules + # import __builtin__ + # __builtin__.PY_VERSION = 2 + PY_VERSION = 2 + msg.info('Using python v2') + except ImportError: + msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') +# try: +# import yaml +# except ImportError: +# msg.fatal('Cannot find yaml library') +try: + if PY_VERSION == 3: + import pyyaml3 as yaml + else: + import pyyaml2 as yaml +except ImportError: + msg.fatal('Cannot find yaml library') + + +class PythonCharts(object): + """ + Main class used to control every python module. + """ + + def __init__(self, + modules=None, + modules_path='../python.d/', + modules_configs='../conf.d/', + modules_disabled=None): + """ + :param modules: list + :param modules_path: str + :param modules_configs: str + :param modules_disabled: list + """ + + if modules is None: + modules = [] + if modules_disabled is None: + modules_disabled = [] + + self.first_run = True + # set configuration directory + self.configs = modules_configs + + # load modules + loaded_modules = self._load_modules(modules_path, modules, modules_disabled) + + # load configuration files + configured_modules = self._load_configs(loaded_modules) + + # good economy and prosperity: + self.jobs = self._create_jobs(configured_modules) # type: list + + # enable timetable override like `python.d.plugin mysql debug 1` + if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: + for job in self.jobs: + job.create_timetable(BASE_CONFIG['update_every']) + + @staticmethod + def _import_module(path, name=None): + """ + Try to import module using only its path. + :param path: str + :param name: str + :return: object + """ + + if name is None: + name = path.split('/')[-1] + if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION: + return None + name = name[:-len(MODULE_EXTENSION)] + try: + if PY_VERSION == 3: + return importlib.machinery.SourceFileLoader(name, path).load_module() + else: + return imp.load_source(name, path) + except Exception as e: + msg.error("Problem loading", name, str(e)) + return None + + def _load_modules(self, path, modules, disabled): + """ + Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) + :param path: str + :param modules: list + :param disabled: list + :return: list + """ + + # check if plugin directory exists + if not os.path.isdir(path): + msg.fatal("cannot find charts directory ", path) + + # load modules + loaded = [] + if len(modules) > 0: + for m in modules: + if m in disabled: + continue + mod = self._import_module(path + m + MODULE_EXTENSION) + if mod is not None: + loaded.append(mod) + else: # exit if plugin is not found + msg.fatal('no modules found.') + else: + # scan directory specified in path and load all modules from there + names = os.listdir(path) + for mod in names: + if mod.replace(MODULE_EXTENSION, "") in disabled: + msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) + continue + m = self._import_module(path + mod) + if m is not None: + msg.debug(mod + ": loading module '" + path + mod + "'") + loaded.append(m) + return loaded + + def _load_configs(self, modules): + """ + Append configuration in list named `config` to every module. + For multi-job modules `config` list is created in _parse_config, + otherwise it is created here based on BASE_CONFIG prototype with None as identifier. + :param modules: list + :return: list + """ + for mod in modules: + configfile = self.configs + mod.__name__ + ".conf" + if os.path.isfile(configfile): + msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") + try: + if not hasattr(mod, 'config'): + mod.config = {} + setattr(mod, + 'config', + self._parse_config(mod, read_config(configfile))) + except Exception as e: + msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) + else: + msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") + # set config if not found + if not hasattr(mod, 'config'): + msg.debug(mod.__name__ + ": setting configuration for only one job") + mod.config = {None: {}} + for var in BASE_CONFIG: + try: + mod.config[None][var] = getattr(mod, var) + except AttributeError: + mod.config[None][var] = BASE_CONFIG[var] + return modules + + @staticmethod + def _parse_config(module, config): + """ + Parse configuration file or extract configuration from module file. + Example of returned dictionary: + config = {'name': { + 'update_every': 2, + 'retries': 3, + 'priority': 30000 + 'other_val': 123}} + :param module: object + :param config: dict + :return: dict + """ + if config is None: + config = {} + # get default values + defaults = {} + msg.debug(module.__name__ + ": reading configuration") + for key in BASE_CONFIG: + try: + # get defaults from module config + defaults[key] = int(config.pop(key)) + except (KeyError, ValueError): + try: + # get defaults from module source code + defaults[key] = getattr(module, key) + except (KeyError, ValueError, AttributeError): + # if above failed, get defaults from global dict + defaults[key] = BASE_CONFIG[key] + + # check if there are dict in config dict + many_jobs = False + for name in config: + if type(config[name]) is dict: + many_jobs = True + break + + # assign variables needed by supervisor to every job configuration + if many_jobs: + for name in config: + for key in defaults: + if key not in config[name]: + config[name][key] = defaults[key] + # if only one job is needed, values doesn't have to be in dict (in YAML) + else: + config = {None: config.copy()} + config[None].update(defaults) + + # return dictionary of jobs where every job has BASE_CONFIG variables + return config + + @staticmethod + def _create_jobs(modules): + """ + Create jobs based on module.config dictionary and module.Service class definition. + :param modules: list + :return: list + """ + jobs = [] + for module in modules: + for name in module.config: + # register a new job + conf = module.config[name] + try: + job = module.Service(configuration=conf, name=name) + except Exception as e: + msg.error(module.__name__ + + ("/" + str(name) if name is not None else "") + + ": cannot start job: '" + + str(e)) + return None + else: + # set chart_name (needed to plot run time graphs) + job.chart_name = module.__name__ + if name is not None: + job.chart_name += "_" + name + jobs.append(job) + msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") + + return [j for j in jobs if j is not None] + + def _stop(self, job, reason=None): + """ + Stop specified job and remove it from self.jobs list + Also notifies user about job failure if DEBUG_FLAG is set + :param job: object + :param reason: str + """ + prefix = job.__module__ + if job.name is not None and len(job.name) != 0: + prefix += "/" + job.name + try: + self.jobs.remove(job) + msg.info("Disabled", prefix) + except Exception as e: + msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e)) + + # TODO remove section below and remove `reason`. + prefix += ": " + if reason is None: + return + elif reason[:3] == "no ": + msg.error(prefix + + "does not seem to have " + + reason[3:] + + "() function. Disabling it.") + elif reason[:7] == "failed ": + msg.error(prefix + + reason[7:] + + "() function reports failure.") + elif reason[:13] == "configuration": + msg.error(prefix + + "configuration file '" + + self.configs + + job.__module__ + + ".conf' not found. Using defaults.") + elif reason[:11] == "misbehaving": + msg.error(prefix + "is " + reason) + + def check(self): + """ + Tries to execute check() on every job. + This cannot fail thus it is catching every exception + If job.check() fails job is stopped + """ + i = 0 + overridden = [] + msg.debug("all job objects", str(self.jobs)) + while i < len(self.jobs): + job = self.jobs[i] + try: + if not job.check(): + msg.error(job.chart_name, "check function failed.") + self._stop(job) + else: + msg.debug(job.chart_name, "check succeeded") + i += 1 + try: + if job.override_name is not None: + new_name = job.__module__ + '_' + job.override_name + if new_name in overridden: + msg.error(job.override_name + " already exists. Stopping '" + job.name + "'") + self._stop(job) + i -= 1 + else: + job.name = job.override_name + msg.debug(job.chart_name + " changing chart name to: '" + new_name + "'") + job.chart_name = new_name + overridden.append(job.chart_name) + except Exception: + pass + except AttributeError as e: + self._stop(job) + msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.") + msg.debug(str(e)) + except (UnboundLocalError, Exception) as e: + msg.error(job.chart_name, str(e)) + self._stop(job) + msg.debug("overridden job names:", str(overridden)) + msg.debug("all remaining job objects:", str(self.jobs)) + + def create(self): + """ + Tries to execute create() on every job. + This cannot fail thus it is catching every exception. + If job.create() fails job is stopped. + This is also creating job run time chart. + """ + i = 0 + while i < len(self.jobs): + job = self.jobs[i] + try: + if not job.create(): + msg.error(job.chart_name, "create function failed.") + self._stop(job) + else: + chart = job.chart_name + sys.stdout.write( + "CHART netdata.plugin_pythond_" + + chart + + " '' 'Execution time for " + + chart + + " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " + + str(job.timetable['freq']) + + '\n') + sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") + msg.debug("created charts for", job.chart_name) + # sys.stdout.flush() + i += 1 + except AttributeError: + msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.") + self._stop(job) + except (UnboundLocalError, Exception) as e: + msg.error(job.chart_name, str(e)) + self._stop(job) + + def update(self): + """ + Creates and supervises every job thread. + This will stay forever and ever and ever forever and ever it'll be the one... + """ + for job in self.jobs: + job.start() + + while True: + if threading.active_count() <= 1: + msg.fatal("no more jobs") + time.sleep(1) + + +def read_config(path): + """ + Read YAML configuration from specified file + :param path: str + :return: dict + """ + try: + with open(path, 'r') as stream: + config = yaml.load(stream) + except (OSError, IOError): + msg.error(str(path), "is not a valid configuration file") + return None + except yaml.YAMLError as e: + msg.error(str(path), "is malformed:", e) + return None + return config + + +def parse_cmdline(directory, *commands): + """ + Parse parameters from command line. + :param directory: str + :param commands: list of str + :return: dict + """ + global DEBUG_FLAG + global OVERRIDE_UPDATE_EVERY + global BASE_CONFIG + + changed_update = False + mods = [] + for cmd in commands[1:]: + if cmd == "check": + pass + elif cmd == "debug" or cmd == "all": + DEBUG_FLAG = True + # redirect stderr to stdout? + elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): + #DEBUG_FLAG = True + mods.append(cmd.replace(".chart.py", "")) + else: + try: + BASE_CONFIG['update_every'] = int(cmd) + changed_update = True + except ValueError: + pass + if changed_update and DEBUG_FLAG: + OVERRIDE_UPDATE_EVERY = True + msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) + + msg.debug("started from", commands[0], "with options:", *commands[1:]) + + return mods + + +# if __name__ == '__main__': +def run(): + """ + Main program. + """ + global DEBUG_FLAG, BASE_CONFIG + + # read configuration file + disabled = [] + configfile = CONFIG_DIR + "python.d.conf" + msg.PROGRAM = PROGRAM + msg.info("reading configuration file:", configfile) + log_counter = 200 + log_interval = 3600 + + conf = read_config(configfile) + if conf is not None: + try: + # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' + if conf['enabled'] is False: + msg.fatal('disabled in configuration file.\n') + except (KeyError, TypeError): + pass + try: + for param in BASE_CONFIG: + BASE_CONFIG[param] = conf[param] + except (KeyError, TypeError): + pass # use default update_every from NETDATA_UPDATE_EVERY + try: + DEBUG_FLAG = conf['debug'] + except (KeyError, TypeError): + pass + try: + log_counter = conf['logs_per_interval'] + except (KeyError, TypeError): + pass + try: + log_interval = conf['log_interval'] + except (KeyError, TypeError): + pass + for k, v in conf.items(): + if k in ("update_every", "debug", "enabled"): + continue + if v is False: + disabled.append(k) + + # parse passed command line arguments + modules = parse_cmdline(MODULES_DIR, *sys.argv) + msg.DEBUG_FLAG = DEBUG_FLAG + msg.LOG_COUNTER = log_counter + msg.LOG_INTERVAL = log_interval + msg.info("MODULES_DIR='" + MODULES_DIR + + "', CONFIG_DIR='" + CONFIG_DIR + + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + + ", ONLY_MODULES=" + str(modules)) + + # run plugins + charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) + charts.check() + charts.create() + charts.update() + msg.fatal("finished") + + +if __name__ == '__main__': + run() |