diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2019-03-16 07:50:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2019-03-16 07:50:20 +0000 |
commit | b26be28df9fd4db2106cc2a557966c9d2a7345d9 (patch) | |
tree | 437e6106c0aa2e73f2dd68d0551545ae503f60d7 /collectors/python.d.plugin/python.d.plugin.in | |
parent | Adding upstream version 1.12.2. (diff) | |
download | netdata-b26be28df9fd4db2106cc2a557966c9d2a7345d9.tar.xz netdata-b26be28df9fd4db2106cc2a557966c9d2a7345d9.zip |
Adding upstream version 1.13.0.upstream/1.13.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin.in | 988 |
1 files changed, 638 insertions, 350 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in index 240c44e0f..dc6c5be9f 100644 --- a/collectors/python.d.plugin/python.d.plugin.in +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -8,426 +8,714 @@ echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # Author: Ilya Mashchenko (l2isbad) # SPDX-License-Identifier: GPL-3.0-or-later + +import collections +import copy import gc +import multiprocessing import os +import re import sys +import time import threading +import types + +PY_VERSION = sys.version_info[:2] + +if PY_VERSION > (3, 1): + from importlib.machinery import SourceFileLoader +else: + from imp import load_source as SourceFileLoader + + +ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' +ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' +ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' +ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' + + +def dirs(): + user_config = os.getenv( + ENV_NETDATA_USER_CONFIG_DIR, + '@configdir_POST@', + ) + stock_config = os.getenv( + ENV_NETDATA_STOCK_CONFIG_DIR, + '@libconfigdir_POST@', + ) + modules_user_config = os.path.join(user_config, 'python.d') + modules_stock_config = os.path.join(stock_config, 'python.d') + + modules = os.path.abspath( + os.getenv( + ENV_NETDATA_PLUGINS_DIR, + os.path.dirname(__file__), + ) + '/../python.d' + ) + pythond_packages = os.path.join(modules, 'python_modules') + + return collections.namedtuple( + 'Dirs', + [ + 'user_config', + 'stock_config', + 'modules_user_config', + 'modules_stock_config', + 'modules', + 'pythond_packages', + ] + )( + user_config, + stock_config, + modules_user_config, + modules_stock_config, + modules, + pythond_packages, + ) + + +DIRS = dirs() + +sys.path.append(DIRS.pythond_packages) + + +from bases.collection import safe_print +from bases.loggers import PythonDLogger +from bases.loaders import load_config -from re import sub -from sys import version_info, argv, stdout -from time import sleep +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict -GC_RUN = True -GC_COLLECT_EVERY = 300 -PY_VERSION = version_info[:2] +END_TASK_MARKER = None -USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@') -STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@') +IS_ATTY = sys.stdout.isatty() -PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d') -PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d') +PLUGIN_CONF_FILE = 'python.d.conf' +MODULE_SUFFIX = '.chart.py' -PLUGINS_DIR = os.path.abspath(os.getenv( - 'NETDATA_PLUGINS_DIR', - os.path.dirname(__file__)) + '/../python.d') +OBSOLETED_MODULES = ( + 'apache_cache', # replaced by web_log + 'cpuidle', # rewritten in C + 'cpufreq', # rewritten in C + 'gunicorn_log', # replaced by web_log + 'linux_power_supply', # rewritten in C + 'nginx_log', # replaced by web_log + 'mdstat', # rewritten in C + 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624 +) -PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules') +AVAILABLE_MODULES = [ + m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules)) + if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES +] -sys.path.append(PYTHON_MODULES_DIR) +PLUGIN_BASE_CONF = { + 'enabled': True, + 'default_run': True, + 'gc_run': True, + 'gc_interval': 300, +} -from bases.loaders import ModuleAndConfigLoader # noqa: E402 -from bases.loggers import PythonDLogger # noqa: E402 -from bases.collection import setdefault_values, run_and_exit, safe_print # noqa: E402 +JOB_BASE_CONF = { + 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1), + 'priority': 60000, + 'autodetection_retry': 0, + 'chart_cleanup': 10, + 'penalty': True, + 'name': str(), +} -try: - from collections import OrderedDict -except ImportError: - from third_party.ordereddict import OrderedDict -IS_ATTY = stdout.isatty() +class HeartBeat(threading.Thread): + def __init__(self, every): + threading.Thread.__init__(self) + self.daemon = True + self.every = every + + def run(self): + while True: + time.sleep(self.every) + if IS_ATTY: + continue + safe_print('\n') + + +def load_module(name): + abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) + module = SourceFileLoader(name, abs_path) + if isinstance(module, types.ModuleType): + return module + return module.load_module() + + +def multi_path_find(name, paths): + for path in paths: + abs_name = os.path.join(path, name) + if os.path.isfile(abs_name): + return abs_name + return '' + + +Task = collections.namedtuple( + 'Task', + [ + 'module_name', + 'explicitly_enabled', + ], +) + +Result = collections.namedtuple( + 'Result', + [ + 'module_name', + 'jobs_configs', + ], +) + + +class ModuleChecker(multiprocessing.Process): + def __init__( + self, + task_queue, + result_queue, + ): + multiprocessing.Process.__init__(self) + self.log = PythonDLogger() + self.log.job_name = 'checker' + self.task_queue = task_queue + self.result_queue = result_queue + + def run(self): + self.log.info('starting...') + HeartBeat(1).start() + while self.run_once(): + pass + self.log.info('terminating...') + + def run_once(self): + task = self.task_queue.get() + + if task is END_TASK_MARKER: + self.task_queue.task_done() + self.result_queue.put(END_TASK_MARKER) + return False + + result = self.do_task(task) + if result: + self.result_queue.put(result) + self.task_queue.task_done() -BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), - 'priority': 60000, - 'autodetection_retry': 0, - 'chart_cleanup': 10, - 'penalty': True, - 'name': str()} + return True + def do_task(self, task): + self.log.info("{0} : checking".format(task.module_name)) -MODULE_EXTENSION = '.chart.py' -OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq', 'cpuidle', 'mdstat', 'linux_power_supply'] + # LOAD SOURCE + module = Module(task.module_name) + try: + module.load_source() + except Exception as error: + self.log.warning("{0} : error on loading source : {1}, skipping module".format( + task.module_name, + error, + )) + return None + else: + self.log.info("{0} : source successfully loaded".format(task.module_name)) + if module.is_disabled_by_default() and not task.explicitly_enabled: + self.log.info("{0} : disabled by default".format(task.module_name)) + return None -def module_ok(m): - return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES + # LOAD CONFIG + paths = [ + DIRS.modules_user_config, + DIRS.modules_stock_config, + ] + conf_abs_path = multi_path_find( + name='{0}.conf'.format(task.module_name), + paths=paths, + ) -ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)] + if conf_abs_path: + self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path)) + try: + module.load_config(conf_abs_path) + except Exception as error: + self.log.warning("{0} : error on loading config : {1}, skipping module".format( + task.module_name, error)) + return None + else: + self.log.info("{0} : config was not found in '{1}', using default 1 job config".format( + task.module_name, paths)) + + # CHECK JOBS + jobs = module.create_jobs() + self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs))) + + successful_jobs_configs = list() + for job in jobs: + if job.autodetection_retry() > 0: + successful_jobs_configs.append(job.config) + self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name)) + continue + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format( + task.module_name, job.name, error)) + continue -def parse_cmd(): - debug = 'debug' in argv[1:] - trace = 'trace' in argv[1:] - override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False) - modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES] - return debug, trace, override_update_every, modules or ALL_MODULES + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + task.module_name, job.name, error)) + continue + if not ok: + self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name)) + continue -def multi_job_check(config): - return next((True for key in config if isinstance(config[key], dict)), False) + self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name)) + job.config['autodetection_retry'] = job.config['update_every'] + successful_jobs_configs.append(job.config) -class RawModule: - def __init__(self, name, path, explicitly_enabled=True): - self.name = name - self.path = path - self.explicitly_enabled = explicitly_enabled - - -class Job(object): - def __init__(self, initialized_job, job_id): - """ - :param initialized_job: instance of <Class Service> - :param job_id: <str> - """ - self.job = initialized_job - self.id = job_id # key in Modules.jobs() - self.module_name = self.job.__module__ # used in Plugin.delete_job() - self.recheck_every = self.job.configuration.pop('autodetection_retry') - self.checked = False # used in Plugin.check_job() - self.created = False # used in Plugin.create_job_charts() - if self.job.update_every < int(OVERRIDE_UPDATE_EVERY): - self.job.update_every = int(OVERRIDE_UPDATE_EVERY) - - def __getattr__(self, item): - return getattr(self.job, item) - - def __repr__(self): - return self.job.__repr__() - - def is_dead(self): - return bool(self.ident) and not self.is_alive() - - def not_launched(self): - return not bool(self.ident) - - def is_autodetect(self): - return self.recheck_every - - -class Module(object): - def __init__(self, service, config): - """ - :param service: <Module> - :param config: <dict> - """ + if not successful_jobs_configs: + self.log.info("{0} : all jobs failed, skipping module".format(task.module_name)) + return None + + return Result(module.source.__name__, successful_jobs_configs) + + +class JobConf(OrderedDict): + def __init__(self, *args): + OrderedDict.__init__(self, *args) + + def set_defaults_from_module(self, module): + for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]: + self[k] = getattr(module, k) + + def set_defaults_from_config(self, module_config): + for k in [k for k in JOB_BASE_CONF if k in module_config]: + self[k] = module_config[k] + + def set_job_name(self, name): + self['job_name'] = re.sub(r'\s+', '_', name) + + def set_override_name(self, name): + self['override_name'] = re.sub(r'\s+', '_', name) + + def as_dict(self): + return copy.deepcopy(OrderedDict(self)) + + +class Job: + def __init__( + self, + service, + module_name, + config, + ): self.service = service - self.name = service.__name__ - self.config = self.jobs_configurations_builder(config) - self.jobs = OrderedDict() - self.counter = 1 + self.config = config + self.module_name = module_name + self.name = config['job_name'] + self.override_name = config['override_name'] + self.wrapped = None - self.initialize_jobs() + def init(self): + self.wrapped = self.service(configuration=self.config.as_dict()) - def __repr__(self): - return "<Class Module '{name}'>".format(name=self.name) + def check(self): + return self.wrapped.check() - def __iter__(self): - return iter(OrderedDict(self.jobs).values()) + def post_check(self, min_update_every): + if self.wrapped.update_every < min_update_every: + self.wrapped.update_every = min_update_every - def __getitem__(self, item): - return self.jobs[item] + def create(self): + return self.wrapped.create() - def __delitem__(self, key): - del self.jobs[key] + def autodetection_retry(self): + return self.config['autodetection_retry'] - def __len__(self): - return len(self.jobs) + def run(self): + self.wrapped.run() - def __bool__(self): - return bool(self.jobs) - def __nonzero__(self): - return self.__bool__() +class Module: + def __init__(self, name): + self.name = name + self.source = None + self.config = dict() - def jobs_configurations_builder(self, config): - """ - :param config: <dict> - :return: - """ - counter = 0 - job_base_config = dict() + def is_disabled_by_default(self): + return bool(getattr(self.source, 'disabled_by_default', False)) - for attr in BASE_CONFIG: - job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr])) + def load_source(self): + self.source = load_module(self.name) - if not config: - config = {str(): dict()} - elif not multi_job_check(config): - config = {str(): config} + def load_config(self, abs_path): + self.config = load_config(abs_path) or dict() - for job_name in config: - if not isinstance(config[job_name], dict): - continue + def gather_jobs_configs(self): + job_names = [v for v in self.config if isinstance(self.config[v], dict)] - job_config = setdefault_values(config[job_name], base_dict=job_base_config) - job_name = sub(r'\s+', '_', job_name) - config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name']) - counter += 1 - job_id = 'job' + str(counter).zfill(3) + if len(job_names) == 0: + job_conf = JobConf(JOB_BASE_CONF) + job_conf.set_defaults_from_module(self.source) + job_conf.update(self.config) + job_conf.set_job_name(self.name) + job_conf.set_override_name(job_conf.pop('name')) + return [job_conf] - yield job_id, job_name, job_config + configs = list() + for job_name in job_names: + raw_job_conf = self.config[job_name] + job_conf = JobConf(JOB_BASE_CONF) + job_conf.set_defaults_from_module(self.source) + job_conf.set_defaults_from_config(self.config) + job_conf.update(raw_job_conf) + job_conf.set_job_name(job_name) + job_conf.set_override_name(job_conf.pop('name')) + configs.append(job_conf) - def initialize_jobs(self): - """ - :return: - """ - for job_id, job_name, job_config in self.config: - job_config['job_name'] = job_name - job_config['override_name'] = job_config.pop('name') + return configs - try: - initialized_job = self.service.Service(configuration=job_config) - except Exception as error: - Logger.error("job initialization: '{module_name} {job_name}' " - "=> ['FAILED'] ({error})".format(module_name=self.name, - job_name=job_name, - error=error)) - continue - else: - Logger.debug("job initialization: '{module_name} {job_name}' " - "=> ['OK']".format(module_name=self.name, - job_name=job_name or self.name)) - self.jobs[job_id] = Job(initialized_job=initialized_job, - job_id=job_id) - del self.config - del self.service - - -class Plugin(object): - def __init__(self): - self.loader = ModuleAndConfigLoader() - self.modules = OrderedDict() - self.sleep_time = 1 - self.runs_counter = 0 - - user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf') - stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf') - - Logger.debug("loading '{0}'".format(user_config)) - self.config, error = self.loader.load_config_from_file(user_config) - - if error: - Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error)) - Logger.debug("loading '{0}'".format(stock_config)) - self.config, error = self.loader.load_config_from_file(stock_config) - if error: - Logger.error("cannot load '{0}': {1}".format(stock_config, error)) - - self.do_gc = self.config.get("gc_run", GC_RUN) - self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY) - - if not self.config.get('enabled', True): - run_and_exit(Logger.info)('DISABLED in configuration file.') - - self.load_and_initialize_modules() - if not self.modules: - run_and_exit(Logger.info)('No modules to run. Exit...') - - def __iter__(self): - return iter(OrderedDict(self.modules).values()) - - @property - def jobs(self): - return (job for mod in self for job in mod) - - @property - def dead_jobs(self): - return (job for job in self.jobs if job.is_dead()) - - @property - def autodetect_jobs(self): - return [job for job in self.jobs if job.not_launched()] - - def enabled_modules(self): - for mod in MODULES_TO_RUN: - mod_name = mod[:-len(MODULE_EXTENSION)] - mod_path = os.path.join(PLUGINS_DIR, mod) - if any( - [ - self.config.get('default_run', True) and self.config.get(mod_name, True), - (not self.config.get('default_run')) and self.config.get(mod_name), - ] - ): - yield RawModule( - name=mod_name, - path=mod_path, - explicitly_enabled=self.config.get(mod_name), - ) - - def load_and_initialize_modules(self): - for mod in self.enabled_modules(): - - # Load module from file ------------------------------------------------------------ - loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path) - log = Logger.error if error else Logger.debug - log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', - module_name=mod.name)) - if error: - Logger.error("load source error : {0}".format(error)) - continue + def create_jobs(self, jobs_conf=None): + return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()] - # Load module config from file ------------------------------------------------------ - user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf') - stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf') - Logger.debug("loading '{0}'".format(user_config)) - loaded_config, error = self.loader.load_config_from_file(user_config) - if error: - Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error)) - Logger.debug("loading '{0}'".format(stock_config)) - loaded_config, error = self.loader.load_config_from_file(stock_config) +class JobRunner(threading.Thread): + def __init__(self, job): + threading.Thread.__init__(self) + self.daemon = True + self.wrapped = job - if error: - Logger.error("cannot load '{0}': {1}".format(stock_config, error)) + def run(self): + self.wrapped.run() - # Skip disabled modules - if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled: - Logger.info("module '{0}' disabled by default".format(loaded_module.__name__)) - continue - # Module initialization --------------------------------------------------- +class PluginConf(dict): + def __init__(self, *args): + dict.__init__(self, *args) - initialized_module = Module(service=loaded_module, config=loaded_config) - Logger.debug("module status: '{module_name}' => [{status}] " - "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED', - module_name=initialized_module.name, - jobs_number=len(initialized_module))) - if initialized_module: - self.modules[initialized_module.name] = initialized_module + def is_module_enabled(self, module_name, explicit): + if module_name in self: + return self[module_name] + if explicit: + return False + return self['default_run'] + + +class Plugin: + def __init__( + self, + min_update_every=1, + modules_to_run=tuple(AVAILABLE_MODULES), + ): + self.log = PythonDLogger() + self.config = PluginConf(PLUGIN_BASE_CONF) + self.task_queue = multiprocessing.JoinableQueue() + self.result_queue = multiprocessing.JoinableQueue() + self.min_update_every = min_update_every + self.modules_to_run = modules_to_run + self.auto_detection_jobs = list() + self.tasks = list() + self.results = list() + self.checked_jobs = collections.defaultdict(list) + self.runs = 0 @staticmethod - def check_job(job): - """ - :param job: <Job> - :return: - """ - try: - check_ok = bool(job.check()) - except Exception as error: - job.error('check() unhandled exception: {error}'.format(error=error)) - return None - else: - return check_ok + def shutdown(): + safe_print('DISABLE') + exit(0) - @staticmethod - def create_job_charts(job): - """ - :param job: <Job> - :return: - """ + def run(self): + jobs = self.create_jobs() + if not jobs: + return + + for job in self.prepare_jobs(jobs): + self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name)) + JobRunner(job).start() + + self.serve() + + def enqueue_tasks(self): + for task in self.tasks: + self.task_queue.put(task) + self.task_queue.put(END_TASK_MARKER) + + def dequeue_results(self): + while True: + result = self.result_queue.get() + self.result_queue.task_done() + if result is END_TASK_MARKER: + break + self.results.append(result) + + def load_config(self): + paths = [ + DIRS.user_config, + DIRS.stock_config, + ] + + self.log.info("checking for config in {0}".format(paths)) + abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths) + if not abs_path: + self.log.warning('config was not found, using defaults') + return True + + self.log.info("config found, loading config '{0}'".format(abs_path)) try: - create_ok = job.create() + config = load_config(abs_path) or dict() except Exception as error: - job.error('create() unhandled exception: {error}'.format(error=error)) + self.log.error('error on loading config : {0}'.format(error)) return False - else: - return create_ok - - def delete_job(self, job): - """ - :param job: <Job> - :return: - """ - del self.modules[job.module_name][job.id] - - def run_check(self): - checked = list() - for job in self.jobs: - if job.name in checked: - job.info('check() => [DROPPED] (already served by another job)') - self.delete_job(job) + + self.log.info('config successfully loaded') + self.config.update(config) + return True + + def setup(self): + self.log.info('starting setup') + if not self.load_config(): + return False + + if not self.config['enabled']: + self.log.info('disabled in configuration file') + return False + + for mod in self.modules_to_run: + if self.config.is_module_enabled(mod, False): + task = Task(mod, self.config.is_module_enabled(mod, True)) + self.tasks.append(task) + else: + self.log.info("{0} : disabled in configuration file".format(mod)) + + if not self.tasks: + self.log.info('no modules to run') + return False + + worker = ModuleChecker(self.task_queue, self.result_queue) + self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks))) + worker.start() + + # TODO: timeouts? + self.enqueue_tasks() + self.task_queue.join() + self.dequeue_results() + self.result_queue.join() + self.task_queue.close() + self.result_queue.close() + self.log.info('stopping checker process') + worker.join() + + if not self.results: + self.log.info('no modules to run') + return False + + self.log.info("setup complete, {0} active module(s) : '{1}'".format( + len(self.results), + [v.module_name for v in self.results]) + ) + + return True + + def create_jobs(self): + jobs = list() + for result in self.results: + module = Module(result.module_name) + try: + module.load_source() + except Exception as error: + self.log.warning("{0} : error on loading module source : {1}, skipping module".format( + result.module_name, error)) continue - ok = self.check_job(job) - if ok: - job.info('check() => [OK]') - checked.append(job.name) - job.checked = True + + module_jobs = module.create_jobs(result.jobs_configs) + self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs))) + jobs.extend(module_jobs) + + return jobs + + def prepare_jobs(self, jobs): + prepared = list() + + for job in jobs: + check_name = job.override_name or job.name + if check_name in self.checked_jobs[job.module_name]: + self.log.info('{0}[{1}] : already served by another job, skipping the job'.format( + job.module_name, job.name)) continue - if not job.is_autodetect() or ok is None: - job.info('check() => [FAILED]') - self.delete_job(job) - else: - job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every)) - def run_create(self): - for job in self.jobs: - if not job.checked: - # skip autodetection_retry jobs + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( + job.module_name, job.name, error)) + continue + + self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name)) + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + job.module_name, job.name, error)) continue - ok = self.create_job_charts(job) - if ok: - job.debug('create() => [OK] (charts: {0})'.format(len(job.charts))) - job.created = True + + if not ok: + self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name)) + if job.autodetection_retry() > 0: + self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format( + job.module_name, job.name, job.autodetection_retry())) + self.auto_detection_jobs.append(job) continue - job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts))) - self.delete_job(job) - def start(self): - self.run_check() - self.run_create() - for job in self.jobs: - if job.created: - job.start() + self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name)) + + job.post_check(int(self.min_update_every)) + + if not job.create(): + self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name)) + + self.checked_jobs[job.module_name].append(check_name) + prepared.append(job) + + return prepared + + def serve(self): + gc_run = self.config['gc_run'] + gc_interval = self.config['gc_interval'] while True: - if threading.active_count() <= 1 and not self.autodetect_jobs: - run_and_exit(Logger.info)('FINISHED') + self.runs += 1 + + # threads: main + heartbeat + if threading.active_count() <= 2 and not self.auto_detection_jobs: + return - sleep(self.sleep_time) - self.cleanup() - self.autodetect_retry() + time.sleep(1) - # FIXME: https://github.com/netdata/netdata/issues/3817 - if self.do_gc and self.runs_counter % self.gc_interval == 0: + if gc_run and self.runs % gc_interval == 0: v = gc.collect() - Logger.debug("GC full collection run result: {0}".format(v)) - - # for exiting on SIGPIPE - if not IS_ATTY: - safe_print('\n') - - def cleanup(self): - for job in self.dead_jobs: - self.delete_job(job) - for mod in self: - if not mod: - del self.modules[mod.name] - - def autodetect_retry(self): - self.runs_counter += self.sleep_time - for job in self.autodetect_jobs: - if self.runs_counter % job.recheck_every == 0: - checked = self.check_job(job) - if checked: - created = self.create_job_charts(job) - if not created: - self.delete_job(job) - continue - job.start() + self.log.debug('GC collection run result: {0}'.format(v)) + + self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)] + + def retry_job(self, job): + stop_retrying = True + retry_later = False + + if self.runs % job.autodetection_retry() != 0: + return retry_later + + check_name = job.override_name or job.name + if check_name in self.checked_jobs[job.module_name]: + self.log.info("{0}[{1}]: already served by another job, give up on retrying".format( + job.module_name, job.name)) + return stop_retrying + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format( + job.module_name, job.name, error)) + return stop_retrying + + if not ok: + self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format( + job.module_name, job.name, job.autodetection_retry())) + return retry_later + self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name)) + + if not job.create(): + return stop_retrying + + job.post_check(int(self.min_update_every)) + self.checked_jobs[job.module_name].append(check_name) + return stop_retrying + + +def parse_cmd(): + opts = sys.argv[:][1:] + debug = False + trace = False + update_every = 1 + modules_to_run = list() + + v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None) + if v: + update_every = v + opts.remove(v) + if 'debug' in opts: + debug = True + opts.remove('debug') + if 'trace' in opts: + trace = True + opts.remove('trace') + if opts: + modules_to_run = list(opts) + + return collections.namedtuple( + 'CMD', + [ + 'update_every', + 'debug', + 'trace', + 'modules_to_run', + ], + )( + update_every, + debug, + trace, + modules_to_run, + ) + + +def main(): + cmd = parse_cmd() + logger = PythonDLogger() + + if cmd.debug: + logger.logger.severity = 'DEBUG' + if cmd.trace: + logger.log_traceback = True + + logger.info('using python v{0}'.format(PY_VERSION[0])) + + unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) + if unknown_modules: + logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules)))) + safe_print('DISABLE') + return + + plugin = Plugin( + cmd.update_every, + cmd.modules_to_run or AVAILABLE_MODULES, + ) + + HeartBeat(1).start() + + if not plugin.setup(): + safe_print('DISABLE') + return + + plugin.run() + logger.info('exiting from main...') + plugin.shutdown() if __name__ == '__main__': - DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd() - Logger = PythonDLogger() - if DEBUG: - Logger.logger.severity = 'DEBUG' - if TRACE: - Logger.log_traceback = True - Logger.info('Using python {version}'.format(version=PY_VERSION[0])) - - plugin = Plugin() - plugin.start() + main() |