diff options
author | Federico Ceratto <federico.ceratto@gmail.com> | 2017-12-19 23:39:21 +0000 |
---|---|---|
committer | Federico Ceratto <federico.ceratto@gmail.com> | 2017-12-19 23:39:21 +0000 |
commit | 61aedf201c2c4bf0e5aa4db32e74f4d860b88593 (patch) | |
tree | bcf4f9a0cd8bc2daf38b2ff9f29bfcc1e5ed8968 /plugins.d/python.d.plugin | |
parent | New upstream version 1.8.0+dfsg (diff) | |
download | netdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.tar.xz netdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.zip |
New upstream version 1.9.0+dfsgupstream/1.9.0+dfsg
Diffstat (limited to '')
-rwxr-xr-x | plugins.d/python.d.plugin | 869 |
1 files changed, 326 insertions, 543 deletions
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin index 03c156f4..855080e8 100755 --- a/plugins.d/python.d.plugin +++ b/plugins.d/python.d.plugin @@ -1,599 +1,382 @@ #!/usr/bin/env bash -'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' -# -*- coding: utf-8 -*- +'''':; exec "$(command -v python || command -v python3 || command -v python2 || +echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' -# Description: netdata python modules supervisor +# -*- coding: utf-8 -*- +# Description: # Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) import os import sys -import time import threading + from re import sub +from sys import version_info, argv +from time import sleep + +try: + from time import monotonic as time +except ImportError: + from time import time + +PY_VERSION = version_info[:2] +PLUGIN_CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', os.path.dirname(__file__) + '/../../../../etc/netdata') + '/' +CHARTS_PY_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + '/../python.d') + '/' +CHARTS_PY_CONFIG_DIR = PLUGIN_CONFIG_DIR + 'python.d/' +PYTHON_MODULES_DIR = CHARTS_PY_DIR + 'python_modules' + +sys.path.append(PYTHON_MODULES_DIR) + +from bases.loaders import ModuleAndConfigLoader +from bases.loggers import PythonDLogger +from bases.collection import setdefault_values, run_and_exit + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict -# ----------------------------------------------------------------------------- -# globals & environment setup -# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables -MODULE_EXTENSION = ".chart.py" BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), - 'priority': 90000, - 'retries': 10} + 'retries': 60, + 'priority': 60000, + 'autodetection_retry': 0, + 'chart_cleanup': 10, + 'name': str()} -MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', - os.path.dirname(__file__)) + "/../python.d") + "/" -CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', - os.path.dirname(__file__) + "/../../../../etc/netdata") +MODULE_EXTENSION = '.chart.py' +OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log'] -# directories should end with '/' -if CONFIG_DIR[-1] != "/": - CONFIG_DIR += "/" -sys.path.append(MODULES_DIR + "python_modules") -PROGRAM = os.path.basename(__file__).replace(".plugin", "") -DEBUG_FLAG = False -TRACE_FLAG = False -OVERRIDE_UPDATE_EVERY = False +def module_ok(m): + return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES -# ----------------------------------------------------------------------------- -# custom, third party and version specific python modules management -import msg -try: - assert sys.version_info >= (3, 1) - import importlib.machinery - PY_VERSION = 3 - # change this hack below if we want PY_VERSION to be used in modules - # import builtins - # builtins.PY_VERSION = 3 - msg.info('Using python v3') -except (AssertionError, ImportError): - try: - import imp - - # change this hack below if we want PY_VERSION to be used in modules - # import __builtin__ - # __builtin__.PY_VERSION = 2 - PY_VERSION = 2 - msg.info('Using python v2') - except ImportError: - msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') -# try: -# import yaml -# except ImportError: -# msg.fatal('Cannot find yaml library') -try: - if PY_VERSION == 3: - import pyyaml3 as yaml - else: - import pyyaml2 as yaml -except ImportError: - msg.fatal('Cannot find yaml library') +ALL_MODULES = [m for m in sorted(os.listdir(CHARTS_PY_DIR)) if module_ok(m)] -try: - from collections import OrderedDict - ORDERED = True - DICT = OrderedDict - msg.info('YAML output is ordered') -except ImportError: - try: - from ordereddict import OrderedDict - ORDERED = True - DICT = OrderedDict - msg.info('YAML output is ordered') - except ImportError: - ORDERED = False - DICT = dict - msg.info('YAML output is unordered') -if ORDERED: - def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): - class OrderedLoader(Loader): - pass - - def construct_mapping(loader, node): - loader.flatten_mapping(node) - return object_pairs_hook(loader.construct_pairs(node)) - OrderedLoader.add_constructor( - yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, - construct_mapping) - return yaml.load(stream, OrderedLoader) - - -class PythonCharts(object): - """ - Main class used to control every python module. - """ - - def __init__(self, - modules=None, - modules_path='../python.d/', - modules_configs='../conf.d/', - modules_disabled=None, - modules_enabled=None, - default_run=None): + +def parse_cmd(): + debug = 'debug' in argv[1:] + trace = 'trace' in argv[1:] + override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False) + modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES] + return debug, trace, override_update_every, modules or ALL_MODULES + + +def multi_job_check(config): + return next((True for key in config if isinstance(config[key], dict)), False) + + +class Job(object): + def __init__(self, initialized_job, job_id): """ - :param modules: list - :param modules_path: str - :param modules_configs: str - :param modules_disabled: list - :param modules_enabled: list - :param default_run: bool + :param initialized_job: instance of <Class Service> + :param job_id: <str> """ + self.job = initialized_job + self.id = job_id # key in Modules.jobs() + self.module_name = self.job.__module__ # used in Plugin.delete_job() + self.recheck_every = self.job.configuration.pop('autodetection_retry') + self.checked = False # used in Plugin.check_job() + self.created = False # used in Plugin.create_job_charts() + if OVERRIDE_UPDATE_EVERY: + self.job.update_every = int(OVERRIDE_UPDATE_EVERY) - if modules is None: - modules = [] - if modules_disabled is None: - modules_disabled = [] + def __getattr__(self, item): + return getattr(self.job, item) - self.first_run = True - # set configuration directory - self.configs = modules_configs + def __repr__(self): + return self.job.__repr__() - # load modules - loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run) + def is_dead(self): + return bool(self.ident) and not self.is_alive() - # load configuration files - configured_modules = self._load_configs(loaded_modules) + def not_launched(self): + return not bool(self.ident) - # good economy and prosperity: - self.jobs = self._create_jobs(configured_modules) # type <list> + def is_autodetect(self): + return self.recheck_every - # enable timetable override like `python.d.plugin mysql debug 1` - if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: - for job in self.jobs: - job.create_timetable(BASE_CONFIG['update_every']) - @staticmethod - def _import_module(path, name=None): +class Module(object): + def __init__(self, service, config): """ - Try to import module using only its path. - :param path: str - :param name: str - :return: object + :param service: <Module> + :param config: <dict> """ + self.service = service + self.name = service.__name__ + self.config = self.jobs_configurations_builder(config) + self.jobs = OrderedDict() + self.counter = 1 - if name is None: - name = path.split('/')[-1] - if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION: - return None - name = name[:-len(MODULE_EXTENSION)] - try: - if PY_VERSION == 3: - return importlib.machinery.SourceFileLoader(name, path).load_module() - else: - return imp.load_source(name, path) - except Exception as e: - msg.error("Problem loading", name, str(e)) - return None + self.initialize_jobs() + + def __repr__(self): + return "<Class Module '{name}'>".format(name=self.name) + + def __iter__(self): + return iter(OrderedDict(self.jobs).values()) + + def __getitem__(self, item): + return self.jobs[item] + + def __delitem__(self, key): + del self.jobs[key] + + def __len__(self): + return len(self.jobs) + + def __bool__(self): + return bool(self.jobs) - def _load_modules(self, path, modules, disabled, enabled, default_run): + def __nonzero__(self): + return self.__bool__() + + def jobs_configurations_builder(self, config): """ - Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) - :param path: str - :param modules: list - :param disabled: list - :return: list + :param config: <dict> + :return: """ + counter = 0 + job_base_config = dict() - # check if plugin directory exists - if not os.path.isdir(path): - msg.fatal("cannot find charts directory ", path) - - # load modules - loaded = [] - if len(modules) > 0: - for m in modules: - if m in disabled: - continue - mod = self._import_module(path + m + MODULE_EXTENSION) - if mod is not None: - loaded.append(mod) - else: # exit if plugin is not found - msg.fatal('no modules found.') - else: - # scan directory specified in path and load all modules from there - if default_run is False: - names = [module for module in os.listdir(path) if module[:-9] in enabled] - else: - names = os.listdir(path) - for mod in names: - if mod.replace(MODULE_EXTENSION, "") in disabled: - msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) - continue - m = self._import_module(path + mod) - if m is not None: - msg.debug(mod + ": loading module '" + path + mod + "'") - loaded.append(m) - return loaded - - def _load_configs(self, modules): + for attr in BASE_CONFIG: + job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr])) + + if not config: + config = {str(): dict()} + elif not multi_job_check(config): + config = {str(): config} + + for job_name in config: + if not isinstance(config[job_name], dict): + continue + + job_config = setdefault_values(config[job_name], base_dict=job_base_config) + job_name = sub(r'\s+', '_', job_name) + config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name']) + counter += 1 + job_id = 'job' + str(counter).zfill(3) + + yield job_id, job_name, job_config + + def initialize_jobs(self): """ - Append configuration in list named `config` to every module. - For multi-job modules `config` list is created in _parse_config, - otherwise it is created here based on BASE_CONFIG prototype with None as identifier. - :param modules: list - :return: list + :return: """ - for mod in modules: - configfile = self.configs + mod.__name__ + ".conf" - if os.path.isfile(configfile): - msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") - try: - if not hasattr(mod, 'config'): - mod.config = {} - setattr(mod, - 'config', - self._parse_config(mod, read_config(configfile))) - except Exception as e: - msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) + for job_id, job_name, job_config in self.config: + job_config['job_name'] = job_name + job_config['override_name'] = job_config.pop('name') + + try: + initialized_job = self.service.Service(configuration=job_config) + except Exception as error: + Logger.error("job initialization: '{module_name} {job_name}' " + "=> ['FAILED'] ({error})".format(module_name=self.name, + job_name=job_name, + error=error)) + continue + else: + Logger.debug("job initialization: '{module_name} {job_name}' " + "=> ['OK']".format(module_name=self.name, + job_name=job_name or self.name)) + self.jobs[job_id] = Job(initialized_job=initialized_job, + job_id=job_id) + del self.config + del self.service + + +class Plugin(object): + def __init__(self): + self.loader = ModuleAndConfigLoader() + self.modules = OrderedDict() + self.sleep_time = 1 + self.runs_counter = 0 + self.config, error = self.loader.load_config_from_file(PLUGIN_CONFIG_DIR + 'python.d.conf') + if error: + run_and_exit(Logger.error)(error) + + if not self.config.get('enabled', True): + run_and_exit(Logger.info)('DISABLED in configuration file.') + + self.load_and_initialize_modules() + if not self.modules: + run_and_exit(Logger.info)('No modules to run. Exit...') + + def __iter__(self): + return iter(OrderedDict(self.modules).values()) + + @property + def jobs(self): + return (job for mod in self for job in mod) + + @property + def dead_jobs(self): + return (job for job in self.jobs if job.is_dead()) + + @property + def autodetect_jobs(self): + return [job for job in self.jobs if job.not_launched()] + + def enabled_modules(self): + for mod in MODULES_TO_RUN: + mod_name = mod[:-len(MODULE_EXTENSION)] + mod_path = CHARTS_PY_DIR + mod + conf_path = ''.join([CHARTS_PY_CONFIG_DIR, mod_name, '.conf']) + + if DEBUG: + yield mod, mod_name, mod_path, conf_path else: - msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") - # set config if not found - if not hasattr(mod, 'config'): - msg.debug(mod.__name__ + ": setting configuration for only one job") - mod.config = {None: {}} - for var in BASE_CONFIG: - try: - mod.config[None][var] = getattr(mod, var) - except AttributeError: - mod.config[None][var] = BASE_CONFIG[var] - return modules + if all([self.config.get('default_run', True), + self.config.get(mod_name, True)]): + yield mod, mod_name, mod_path, conf_path + + elif all([not self.config.get('default_run'), + self.config.get(mod_name)]): + yield mod, mod_name, mod_path, conf_path + + def load_and_initialize_modules(self): + for mod, mod_name, mod_path, conf_path in self.enabled_modules(): + + # Load module from file ------------------------------------------------------------ + loaded_module, error = self.loader.load_module_from_file(mod_name, mod_path) + log = Logger.error if error else Logger.debug + log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', + module_name=mod_name)) + if error: + Logger.error("load source error : {0}".format(error)) + continue + + # Load module config from file ------------------------------------------------------ + loaded_config, error = self.loader.load_config_from_file(conf_path) + log = Logger.error if error else Logger.debug + log("module load config: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', + module_name=mod_name)) + if error: + Logger.error('load config error : {0}'.format(error)) + + # Service instance initialization --------------------------------------------------- + initialized_module = Module(service=loaded_module, config=loaded_config) + Logger.debug("module status: '{module_name}' => [{status}] " + "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED', + module_name=initialized_module.name, + jobs_number=len(initialized_module))) + + if initialized_module: + self.modules[initialized_module.name] = initialized_module @staticmethod - def _parse_config(module, config): + def check_job(job): """ - Parse configuration file or extract configuration from module file. - Example of returned dictionary: - config = {'name': { - 'update_every': 2, - 'retries': 3, - 'priority': 30000 - 'other_val': 123}} - :param module: object - :param config: dict - :return: dict + :param job: <Job> + :return: """ - if config is None: - config = {} - # get default values - defaults = {} - msg.debug(module.__name__ + ": reading configuration") - for key in BASE_CONFIG: - try: - # get defaults from module config - defaults[key] = int(config.pop(key)) - except (KeyError, ValueError): - try: - # get defaults from module source code - defaults[key] = getattr(module, key) - except (KeyError, ValueError, AttributeError): - # if above failed, get defaults from global dict - defaults[key] = BASE_CONFIG[key] - - # check if there are dict in config dict - many_jobs = False - for name in config: - if isinstance(config[name], DICT): - many_jobs = True - break - - # assign variables needed by supervisor to every job configuration - if many_jobs: - for name in config: - for key in defaults: - if key not in config[name]: - config[name][key] = defaults[key] - # if only one job is needed, values doesn't have to be in dict (in YAML) + try: + check_ok = bool(job.check()) + except Exception as error: + job.error('check() unhandled exception: {error}'.format(error=error)) + return None else: - config = {None: config.copy()} - config[None].update(defaults) - - # return dictionary of jobs where every job has BASE_CONFIG variables - return config + return check_ok @staticmethod - def _create_jobs(modules): - """ - Create jobs based on module.config dictionary and module.Service class definition. - :param modules: list - :return: list + def create_job_charts(job): """ - jobs = [] - for module in modules: - for name in module.config: - # register a new job - conf = module.config[name] - try: - job = module.Service(configuration=conf, name=name) - except Exception as e: - msg.error(module.__name__ + - ("/" + str(name) if name is not None else "") + - ": cannot start job: '" + - str(e)) - continue - else: - # set chart_name (needed to plot run time graphs) - job.chart_name = module.__name__ - if name is not None: - job.chart_name += "_" + name - jobs.append(job) - msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") - - return [j for j in jobs if j is not None] - - def _stop(self, job, reason=None): + :param job: <Job> + :return: """ - Stop specified job and remove it from self.jobs list - Also notifies user about job failure if DEBUG_FLAG is set - :param job: object - :param reason: str - """ - prefix = job.__module__ - if job.name is not None and len(job.name) != 0: - prefix += "/" + job.name try: - msg.error("DISABLED:", prefix) - self.jobs.remove(job) - except Exception as e: - msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e)) - - # TODO remove section below and remove `reason`. - prefix += ": " - if reason is None: - return - elif reason[:3] == "no ": - msg.error(prefix + - "does not seem to have " + - reason[3:] + - "() function. Disabling it.") - elif reason[:7] == "failed ": - msg.error(prefix + - reason[7:] + - "() function reports failure.") - elif reason[:13] == "configuration": - msg.error(prefix + - "configuration file '" + - self.configs + - job.__module__ + - ".conf' not found. Using defaults.") - elif reason[:11] == "misbehaving": - msg.error(prefix + "is " + reason) - - def check(self): - """ - Tries to execute check() on every job. - This cannot fail thus it is catching every exception - If job.check() fails job is stopped - """ - i = 0 - overridden = [] - msg.debug("all job objects", str(self.jobs)) - while i < len(self.jobs): - job = self.jobs[i] - try: - if not job.check(): - msg.error(job.chart_name, "check() failed - disabling job") - self._stop(job) - else: - msg.info("CHECKED OK:", job.chart_name) - i += 1 - try: - if job.override_name is not None: - new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name) - if new_name in overridden: - msg.info("DROPPED:", job.name, ", job '" + job.override_name + - "' is already served by another job.") - self._stop(job) - i -= 1 - else: - job.name = job.override_name - msg.info("RENAMED:", new_name, ", from " + job.chart_name) - job.chart_name = new_name - overridden.append(job.chart_name) - except Exception: - pass - except AttributeError as e: - self._stop(job) - msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.") - msg.debug(str(e)) - except (UnboundLocalError, Exception) as e: - msg.error(job.chart_name, str(e)) - self._stop(job) - msg.debug("overridden job names:", str(overridden)) - msg.debug("all remaining job objects:", str(self.jobs)) - - def create(self): - """ - Tries to execute create() on every job. - This cannot fail thus it is catching every exception. - If job.create() fails job is stopped. - This is also creating job run time chart. - """ - i = 0 - while i < len(self.jobs): - job = self.jobs[i] - try: - if not job.create(): - msg.error(job.chart_name, "create function failed.") - self._stop(job) - else: - chart = job.chart_name - sys.stdout.write( - "CHART netdata.plugin_pythond_" + - chart + - " '' 'Execution time for " + - chart + - " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " + - str(job.timetable['freq']) + - '\n') - sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") - msg.debug("created charts for", job.chart_name) - # sys.stdout.flush() - i += 1 - except AttributeError: - msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.") - self._stop(job) - except (UnboundLocalError, Exception) as e: - msg.error(job.chart_name, str(e)) - self._stop(job) - - def update(self): + create_ok = job.create() + except Exception as error: + job.error('create() unhandled exception: {error}'.format(error=error)) + return False + else: + return create_ok + + def delete_job(self, job): """ - Creates and supervises every job thread. - This will stay forever and ever and ever forever and ever it'll be the one... + :param job: <Job> + :return: """ - for job in self.jobs: - job.start() + del self.modules[job.module_name][job.id] - while True: - if threading.active_count() <= 1: - msg.fatal("no more jobs") - time.sleep(1) - - -def read_config(path): - """ - Read YAML configuration from specified file - :param path: str - :return: dict - """ - try: - with open(path, 'r') as stream: - if ORDERED: - config = ordered_load(stream, yaml.SafeLoader) + def run_check(self): + checked = list() + for job in self.jobs: + if job.name in checked: + job.info('check() => [DROPPED] (already served by another job)') + self.delete_job(job) + continue + ok = self.check_job(job) + if ok: + job.info('check() => [OK]') + checked.append(job.name) + job.checked = True + continue + if not job.is_autodetect() or ok is None: + job.error('check() => [FAILED]') + self.delete_job(job) else: - config = yaml.load(stream) - except (OSError, IOError) as error: - msg.error(str(path), 'reading error:', str(error)) - return None - except yaml.YAMLError as error: - msg.error(str(path), "is malformed:", str(error)) - return None - return config - - -def parse_cmdline(directory, *commands): - """ - Parse parameters from command line. - :param directory: str - :param commands: list of str - :return: dict - """ - global DEBUG_FLAG, TRACE_FLAG - global OVERRIDE_UPDATE_EVERY - global BASE_CONFIG - - changed_update = False - mods = [] - for cmd in commands[1:]: - if cmd == "check": - pass - elif cmd == "debug" or cmd == "all": - DEBUG_FLAG = True - # redirect stderr to stdout? - elif cmd == "trace" or cmd == "all": - TRACE_FLAG = True - elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): - # DEBUG_FLAG = True - mods.append(cmd.replace(".chart.py", "")) - else: - try: - BASE_CONFIG['update_every'] = int(cmd) - changed_update = True - except ValueError: - pass - if changed_update and DEBUG_FLAG: - OVERRIDE_UPDATE_EVERY = True - msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) - - msg.debug("started from", commands[0], "with options:", *commands[1:]) - - return mods - - -# if __name__ == '__main__': -def run(): - """ - Main program. - """ - global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG - - # read configuration file - disabled = ['nginx_log', 'gunicorn_log', 'apache_cache'] - enabled = list() - default_run = True - configfile = CONFIG_DIR + "python.d.conf" - msg.PROGRAM = PROGRAM - msg.info("reading configuration file:", configfile) - log_throttle = 200 - log_interval = 3600 - - conf = read_config(configfile) - if conf is not None: - try: - # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' - if conf['enabled'] is False: - msg.fatal('disabled in configuration file.\n') - except (KeyError, TypeError): - pass - - try: - for param in BASE_CONFIG: - BASE_CONFIG[param] = conf[param] - except (KeyError, TypeError): - pass # use default update_every from NETDATA_UPDATE_EVERY - - try: - DEBUG_FLAG = conf['debug'] - except (KeyError, TypeError): - pass - - try: - TRACE_FLAG = conf['trace'] - except (KeyError, TypeError): - pass - - try: - log_throttle = conf['logs_per_interval'] - except (KeyError, TypeError): - pass + job.error('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every)) - try: - log_interval = conf['log_interval'] - except (KeyError, TypeError): - pass + def run_create(self): + for job in self.jobs: + if not job.checked: + # skip autodetection_retry jobs + continue + ok = self.create_job_charts(job) + if ok: + job.debug('create() => [OK] (charts: {0})'.format(len(job.charts))) + job.created = True + continue + job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts))) + self.delete_job(job) - default_run = True if ('default_run' not in conf or conf.get('default_run')) else False + def start(self): + self.run_check() + self.run_create() + for job in self.jobs: + if job.created: + job.start() - for k, v in conf.items(): - if k in ("update_every", "debug", "enabled", "default_run"): - continue - if default_run: - if v is False: - disabled.append(k) - else: - if v is True: - enabled.append(k) - # parse passed command line arguments - modules = parse_cmdline(MODULES_DIR, *sys.argv) - msg.DEBUG_FLAG = DEBUG_FLAG - msg.TRACE_FLAG = TRACE_FLAG - msg.LOG_THROTTLE = log_throttle - msg.LOG_INTERVAL = log_interval - msg.LOG_COUNTER = 0 - msg.LOG_NEXT_CHECK = 0 - msg.info("MODULES_DIR='" + MODULES_DIR + - "', CONFIG_DIR='" + CONFIG_DIR + - "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + - ", ONLY_MODULES=" + str(modules)) - - # run plugins - charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run) - charts.check() - charts.create() - charts.update() - msg.fatal("finished") + while True: + if threading.active_count() <= 1 and not self.autodetect_jobs: + run_and_exit(Logger.info)('FINISHED') + + sleep(self.sleep_time) + self.cleanup() + self.autodetect_retry() + + def cleanup(self): + for job in self.dead_jobs: + self.delete_job(job) + for mod in self: + if not mod: + del self.modules[mod.name] + + def autodetect_retry(self): + self.runs_counter += self.sleep_time + for job in self.autodetect_jobs: + if self.runs_counter % job.recheck_every == 0: + checked = self.check_job(job) + if checked: + created = self.create_job_charts(job) + if not created: + self.delete_job(job) + continue + job.start() if __name__ == '__main__': - run() + DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd() + Logger = PythonDLogger() + if DEBUG: + Logger.logger.severity = 'DEBUG' + if TRACE: + Logger.log_traceback = True + Logger.info('Using python {version}'.format(version=PY_VERSION[0])) + + plugin = Plugin() + plugin.start() |