summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin.in
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-03-16 07:50:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-03-16 07:50:20 +0000
commitb26be28df9fd4db2106cc2a557966c9d2a7345d9 (patch)
tree437e6106c0aa2e73f2dd68d0551545ae503f60d7 /collectors/python.d.plugin/python.d.plugin.in
parentAdding upstream version 1.12.2. (diff)
downloadnetdata-b26be28df9fd4db2106cc2a557966c9d2a7345d9.tar.xz
netdata-b26be28df9fd4db2106cc2a557966c9d2a7345d9.zip
Adding upstream version 1.13.0.upstream/1.13.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rw-r--r--collectors/python.d.plugin/python.d.plugin.in988
1 files changed, 638 insertions, 350 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
index 240c44e0f..dc6c5be9f 100644
--- a/collectors/python.d.plugin/python.d.plugin.in
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -8,426 +8,714 @@ echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later
+
+import collections
+import copy
import gc
+import multiprocessing
import os
+import re
import sys
+import time
import threading
+import types
+
+PY_VERSION = sys.version_info[:2]
+
+if PY_VERSION > (3, 1):
+ from importlib.machinery import SourceFileLoader
+else:
+ from imp import load_source as SourceFileLoader
+
+
+ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
+ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
+ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
+ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
+
+
+def dirs():
+ user_config = os.getenv(
+ ENV_NETDATA_USER_CONFIG_DIR,
+ '@configdir_POST@',
+ )
+ stock_config = os.getenv(
+ ENV_NETDATA_STOCK_CONFIG_DIR,
+ '@libconfigdir_POST@',
+ )
+ modules_user_config = os.path.join(user_config, 'python.d')
+ modules_stock_config = os.path.join(stock_config, 'python.d')
+
+ modules = os.path.abspath(
+ os.getenv(
+ ENV_NETDATA_PLUGINS_DIR,
+ os.path.dirname(__file__),
+ ) + '/../python.d'
+ )
+ pythond_packages = os.path.join(modules, 'python_modules')
+
+ return collections.namedtuple(
+ 'Dirs',
+ [
+ 'user_config',
+ 'stock_config',
+ 'modules_user_config',
+ 'modules_stock_config',
+ 'modules',
+ 'pythond_packages',
+ ]
+ )(
+ user_config,
+ stock_config,
+ modules_user_config,
+ modules_stock_config,
+ modules,
+ pythond_packages,
+ )
+
+
+DIRS = dirs()
+
+sys.path.append(DIRS.pythond_packages)
+
+
+from bases.collection import safe_print
+from bases.loggers import PythonDLogger
+from bases.loaders import load_config
-from re import sub
-from sys import version_info, argv, stdout
-from time import sleep
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
-GC_RUN = True
-GC_COLLECT_EVERY = 300
-PY_VERSION = version_info[:2]
+END_TASK_MARKER = None
-USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
-STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
+IS_ATTY = sys.stdout.isatty()
-PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
-PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
+PLUGIN_CONF_FILE = 'python.d.conf'
+MODULE_SUFFIX = '.chart.py'
-PLUGINS_DIR = os.path.abspath(os.getenv(
- 'NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + '/../python.d')
+OBSOLETED_MODULES = (
+ 'apache_cache', # replaced by web_log
+ 'cpuidle', # rewritten in C
+ 'cpufreq', # rewritten in C
+ 'gunicorn_log', # replaced by web_log
+ 'linux_power_supply', # rewritten in C
+ 'nginx_log', # replaced by web_log
+ 'mdstat', # rewritten in C
+ 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
+)
-PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
+AVAILABLE_MODULES = [
+ m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
+ if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
+]
-sys.path.append(PYTHON_MODULES_DIR)
+PLUGIN_BASE_CONF = {
+ 'enabled': True,
+ 'default_run': True,
+ 'gc_run': True,
+ 'gc_interval': 300,
+}
-from bases.loaders import ModuleAndConfigLoader # noqa: E402
-from bases.loggers import PythonDLogger # noqa: E402
-from bases.collection import setdefault_values, run_and_exit, safe_print # noqa: E402
+JOB_BASE_CONF = {
+ 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
+ 'priority': 60000,
+ 'autodetection_retry': 0,
+ 'chart_cleanup': 10,
+ 'penalty': True,
+ 'name': str(),
+}
-try:
- from collections import OrderedDict
-except ImportError:
- from third_party.ordereddict import OrderedDict
-IS_ATTY = stdout.isatty()
+class HeartBeat(threading.Thread):
+ def __init__(self, every):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.every = every
+
+ def run(self):
+ while True:
+ time.sleep(self.every)
+ if IS_ATTY:
+ continue
+ safe_print('\n')
+
+
+def load_module(name):
+ abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
+ module = SourceFileLoader(name, abs_path)
+ if isinstance(module, types.ModuleType):
+ return module
+ return module.load_module()
+
+
+def multi_path_find(name, paths):
+ for path in paths:
+ abs_name = os.path.join(path, name)
+ if os.path.isfile(abs_name):
+ return abs_name
+ return ''
+
+
+Task = collections.namedtuple(
+ 'Task',
+ [
+ 'module_name',
+ 'explicitly_enabled',
+ ],
+)
+
+Result = collections.namedtuple(
+ 'Result',
+ [
+ 'module_name',
+ 'jobs_configs',
+ ],
+)
+
+
+class ModuleChecker(multiprocessing.Process):
+ def __init__(
+ self,
+ task_queue,
+ result_queue,
+ ):
+ multiprocessing.Process.__init__(self)
+ self.log = PythonDLogger()
+ self.log.job_name = 'checker'
+ self.task_queue = task_queue
+ self.result_queue = result_queue
+
+ def run(self):
+ self.log.info('starting...')
+ HeartBeat(1).start()
+ while self.run_once():
+ pass
+ self.log.info('terminating...')
+
+ def run_once(self):
+ task = self.task_queue.get()
+
+ if task is END_TASK_MARKER:
+ self.task_queue.task_done()
+ self.result_queue.put(END_TASK_MARKER)
+ return False
+
+ result = self.do_task(task)
+ if result:
+ self.result_queue.put(result)
+ self.task_queue.task_done()
-BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
- 'priority': 60000,
- 'autodetection_retry': 0,
- 'chart_cleanup': 10,
- 'penalty': True,
- 'name': str()}
+ return True
+ def do_task(self, task):
+ self.log.info("{0} : checking".format(task.module_name))
-MODULE_EXTENSION = '.chart.py'
-OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq', 'cpuidle', 'mdstat', 'linux_power_supply']
+ # LOAD SOURCE
+ module = Module(task.module_name)
+ try:
+ module.load_source()
+ except Exception as error:
+ self.log.warning("{0} : error on loading source : {1}, skipping module".format(
+ task.module_name,
+ error,
+ ))
+ return None
+ else:
+ self.log.info("{0} : source successfully loaded".format(task.module_name))
+ if module.is_disabled_by_default() and not task.explicitly_enabled:
+ self.log.info("{0} : disabled by default".format(task.module_name))
+ return None
-def module_ok(m):
- return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
+ # LOAD CONFIG
+ paths = [
+ DIRS.modules_user_config,
+ DIRS.modules_stock_config,
+ ]
+ conf_abs_path = multi_path_find(
+ name='{0}.conf'.format(task.module_name),
+ paths=paths,
+ )
-ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
+ if conf_abs_path:
+ self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
+ try:
+ module.load_config(conf_abs_path)
+ except Exception as error:
+ self.log.warning("{0} : error on loading config : {1}, skipping module".format(
+ task.module_name, error))
+ return None
+ else:
+ self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
+ task.module_name, paths))
+
+ # CHECK JOBS
+ jobs = module.create_jobs()
+ self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
+
+ successful_jobs_configs = list()
+ for job in jobs:
+ if job.autodetection_retry() > 0:
+ successful_jobs_configs.append(job.config)
+ self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
+ continue
+ try:
+ job.init()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
+ task.module_name, job.name, error))
+ continue
-def parse_cmd():
- debug = 'debug' in argv[1:]
- trace = 'trace' in argv[1:]
- override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
- modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
- return debug, trace, override_update_every, modules or ALL_MODULES
+ try:
+ ok = job.check()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
+ task.module_name, job.name, error))
+ continue
+ if not ok:
+ self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
+ continue
-def multi_job_check(config):
- return next((True for key in config if isinstance(config[key], dict)), False)
+ self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
+ job.config['autodetection_retry'] = job.config['update_every']
+ successful_jobs_configs.append(job.config)
-class RawModule:
- def __init__(self, name, path, explicitly_enabled=True):
- self.name = name
- self.path = path
- self.explicitly_enabled = explicitly_enabled
-
-
-class Job(object):
- def __init__(self, initialized_job, job_id):
- """
- :param initialized_job: instance of <Class Service>
- :param job_id: <str>
- """
- self.job = initialized_job
- self.id = job_id # key in Modules.jobs()
- self.module_name = self.job.__module__ # used in Plugin.delete_job()
- self.recheck_every = self.job.configuration.pop('autodetection_retry')
- self.checked = False # used in Plugin.check_job()
- self.created = False # used in Plugin.create_job_charts()
- if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
- self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
-
- def __getattr__(self, item):
- return getattr(self.job, item)
-
- def __repr__(self):
- return self.job.__repr__()
-
- def is_dead(self):
- return bool(self.ident) and not self.is_alive()
-
- def not_launched(self):
- return not bool(self.ident)
-
- def is_autodetect(self):
- return self.recheck_every
-
-
-class Module(object):
- def __init__(self, service, config):
- """
- :param service: <Module>
- :param config: <dict>
- """
+ if not successful_jobs_configs:
+ self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
+ return None
+
+ return Result(module.source.__name__, successful_jobs_configs)
+
+
+class JobConf(OrderedDict):
+ def __init__(self, *args):
+ OrderedDict.__init__(self, *args)
+
+ def set_defaults_from_module(self, module):
+ for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
+ self[k] = getattr(module, k)
+
+ def set_defaults_from_config(self, module_config):
+ for k in [k for k in JOB_BASE_CONF if k in module_config]:
+ self[k] = module_config[k]
+
+ def set_job_name(self, name):
+ self['job_name'] = re.sub(r'\s+', '_', name)
+
+ def set_override_name(self, name):
+ self['override_name'] = re.sub(r'\s+', '_', name)
+
+ def as_dict(self):
+ return copy.deepcopy(OrderedDict(self))
+
+
+class Job:
+ def __init__(
+ self,
+ service,
+ module_name,
+ config,
+ ):
self.service = service
- self.name = service.__name__
- self.config = self.jobs_configurations_builder(config)
- self.jobs = OrderedDict()
- self.counter = 1
+ self.config = config
+ self.module_name = module_name
+ self.name = config['job_name']
+ self.override_name = config['override_name']
+ self.wrapped = None
- self.initialize_jobs()
+ def init(self):
+ self.wrapped = self.service(configuration=self.config.as_dict())
- def __repr__(self):
- return "<Class Module '{name}'>".format(name=self.name)
+ def check(self):
+ return self.wrapped.check()
- def __iter__(self):
- return iter(OrderedDict(self.jobs).values())
+ def post_check(self, min_update_every):
+ if self.wrapped.update_every < min_update_every:
+ self.wrapped.update_every = min_update_every
- def __getitem__(self, item):
- return self.jobs[item]
+ def create(self):
+ return self.wrapped.create()
- def __delitem__(self, key):
- del self.jobs[key]
+ def autodetection_retry(self):
+ return self.config['autodetection_retry']
- def __len__(self):
- return len(self.jobs)
+ def run(self):
+ self.wrapped.run()
- def __bool__(self):
- return bool(self.jobs)
- def __nonzero__(self):
- return self.__bool__()
+class Module:
+ def __init__(self, name):
+ self.name = name
+ self.source = None
+ self.config = dict()
- def jobs_configurations_builder(self, config):
- """
- :param config: <dict>
- :return:
- """
- counter = 0
- job_base_config = dict()
+ def is_disabled_by_default(self):
+ return bool(getattr(self.source, 'disabled_by_default', False))
- for attr in BASE_CONFIG:
- job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
+ def load_source(self):
+ self.source = load_module(self.name)
- if not config:
- config = {str(): dict()}
- elif not multi_job_check(config):
- config = {str(): config}
+ def load_config(self, abs_path):
+ self.config = load_config(abs_path) or dict()
- for job_name in config:
- if not isinstance(config[job_name], dict):
- continue
+ def gather_jobs_configs(self):
+ job_names = [v for v in self.config if isinstance(self.config[v], dict)]
- job_config = setdefault_values(config[job_name], base_dict=job_base_config)
- job_name = sub(r'\s+', '_', job_name)
- config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
- counter += 1
- job_id = 'job' + str(counter).zfill(3)
+ if len(job_names) == 0:
+ job_conf = JobConf(JOB_BASE_CONF)
+ job_conf.set_defaults_from_module(self.source)
+ job_conf.update(self.config)
+ job_conf.set_job_name(self.name)
+ job_conf.set_override_name(job_conf.pop('name'))
+ return [job_conf]
- yield job_id, job_name, job_config
+ configs = list()
+ for job_name in job_names:
+ raw_job_conf = self.config[job_name]
+ job_conf = JobConf(JOB_BASE_CONF)
+ job_conf.set_defaults_from_module(self.source)
+ job_conf.set_defaults_from_config(self.config)
+ job_conf.update(raw_job_conf)
+ job_conf.set_job_name(job_name)
+ job_conf.set_override_name(job_conf.pop('name'))
+ configs.append(job_conf)
- def initialize_jobs(self):
- """
- :return:
- """
- for job_id, job_name, job_config in self.config:
- job_config['job_name'] = job_name
- job_config['override_name'] = job_config.pop('name')
+ return configs
- try:
- initialized_job = self.service.Service(configuration=job_config)
- except Exception as error:
- Logger.error("job initialization: '{module_name} {job_name}' "
- "=> ['FAILED'] ({error})".format(module_name=self.name,
- job_name=job_name,
- error=error))
- continue
- else:
- Logger.debug("job initialization: '{module_name} {job_name}' "
- "=> ['OK']".format(module_name=self.name,
- job_name=job_name or self.name))
- self.jobs[job_id] = Job(initialized_job=initialized_job,
- job_id=job_id)
- del self.config
- del self.service
-
-
-class Plugin(object):
- def __init__(self):
- self.loader = ModuleAndConfigLoader()
- self.modules = OrderedDict()
- self.sleep_time = 1
- self.runs_counter = 0
-
- user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
- stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
-
- Logger.debug("loading '{0}'".format(user_config))
- self.config, error = self.loader.load_config_from_file(user_config)
-
- if error:
- Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
- Logger.debug("loading '{0}'".format(stock_config))
- self.config, error = self.loader.load_config_from_file(stock_config)
- if error:
- Logger.error("cannot load '{0}': {1}".format(stock_config, error))
-
- self.do_gc = self.config.get("gc_run", GC_RUN)
- self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
-
- if not self.config.get('enabled', True):
- run_and_exit(Logger.info)('DISABLED in configuration file.')
-
- self.load_and_initialize_modules()
- if not self.modules:
- run_and_exit(Logger.info)('No modules to run. Exit...')
-
- def __iter__(self):
- return iter(OrderedDict(self.modules).values())
-
- @property
- def jobs(self):
- return (job for mod in self for job in mod)
-
- @property
- def dead_jobs(self):
- return (job for job in self.jobs if job.is_dead())
-
- @property
- def autodetect_jobs(self):
- return [job for job in self.jobs if job.not_launched()]
-
- def enabled_modules(self):
- for mod in MODULES_TO_RUN:
- mod_name = mod[:-len(MODULE_EXTENSION)]
- mod_path = os.path.join(PLUGINS_DIR, mod)
- if any(
- [
- self.config.get('default_run', True) and self.config.get(mod_name, True),
- (not self.config.get('default_run')) and self.config.get(mod_name),
- ]
- ):
- yield RawModule(
- name=mod_name,
- path=mod_path,
- explicitly_enabled=self.config.get(mod_name),
- )
-
- def load_and_initialize_modules(self):
- for mod in self.enabled_modules():
-
- # Load module from file ------------------------------------------------------------
- loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
- log = Logger.error if error else Logger.debug
- log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
- module_name=mod.name))
- if error:
- Logger.error("load source error : {0}".format(error))
- continue
+ def create_jobs(self, jobs_conf=None):
+ return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
- # Load module config from file ------------------------------------------------------
- user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
- stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
- Logger.debug("loading '{0}'".format(user_config))
- loaded_config, error = self.loader.load_config_from_file(user_config)
- if error:
- Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
- Logger.debug("loading '{0}'".format(stock_config))
- loaded_config, error = self.loader.load_config_from_file(stock_config)
+class JobRunner(threading.Thread):
+ def __init__(self, job):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.wrapped = job
- if error:
- Logger.error("cannot load '{0}': {1}".format(stock_config, error))
+ def run(self):
+ self.wrapped.run()
- # Skip disabled modules
- if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
- Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
- continue
- # Module initialization ---------------------------------------------------
+class PluginConf(dict):
+ def __init__(self, *args):
+ dict.__init__(self, *args)
- initialized_module = Module(service=loaded_module, config=loaded_config)
- Logger.debug("module status: '{module_name}' => [{status}] "
- "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
- module_name=initialized_module.name,
- jobs_number=len(initialized_module)))
- if initialized_module:
- self.modules[initialized_module.name] = initialized_module
+ def is_module_enabled(self, module_name, explicit):
+ if module_name in self:
+ return self[module_name]
+ if explicit:
+ return False
+ return self['default_run']
+
+
+class Plugin:
+ def __init__(
+ self,
+ min_update_every=1,
+ modules_to_run=tuple(AVAILABLE_MODULES),
+ ):
+ self.log = PythonDLogger()
+ self.config = PluginConf(PLUGIN_BASE_CONF)
+ self.task_queue = multiprocessing.JoinableQueue()
+ self.result_queue = multiprocessing.JoinableQueue()
+ self.min_update_every = min_update_every
+ self.modules_to_run = modules_to_run
+ self.auto_detection_jobs = list()
+ self.tasks = list()
+ self.results = list()
+ self.checked_jobs = collections.defaultdict(list)
+ self.runs = 0
@staticmethod
- def check_job(job):
- """
- :param job: <Job>
- :return:
- """
- try:
- check_ok = bool(job.check())
- except Exception as error:
- job.error('check() unhandled exception: {error}'.format(error=error))
- return None
- else:
- return check_ok
+ def shutdown():
+ safe_print('DISABLE')
+ exit(0)
- @staticmethod
- def create_job_charts(job):
- """
- :param job: <Job>
- :return:
- """
+ def run(self):
+ jobs = self.create_jobs()
+ if not jobs:
+ return
+
+ for job in self.prepare_jobs(jobs):
+ self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
+ JobRunner(job).start()
+
+ self.serve()
+
+ def enqueue_tasks(self):
+ for task in self.tasks:
+ self.task_queue.put(task)
+ self.task_queue.put(END_TASK_MARKER)
+
+ def dequeue_results(self):
+ while True:
+ result = self.result_queue.get()
+ self.result_queue.task_done()
+ if result is END_TASK_MARKER:
+ break
+ self.results.append(result)
+
+ def load_config(self):
+ paths = [
+ DIRS.user_config,
+ DIRS.stock_config,
+ ]
+
+ self.log.info("checking for config in {0}".format(paths))
+ abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
+ if not abs_path:
+ self.log.warning('config was not found, using defaults')
+ return True
+
+ self.log.info("config found, loading config '{0}'".format(abs_path))
try:
- create_ok = job.create()
+ config = load_config(abs_path) or dict()
except Exception as error:
- job.error('create() unhandled exception: {error}'.format(error=error))
+ self.log.error('error on loading config : {0}'.format(error))
return False
- else:
- return create_ok
-
- def delete_job(self, job):
- """
- :param job: <Job>
- :return:
- """
- del self.modules[job.module_name][job.id]
-
- def run_check(self):
- checked = list()
- for job in self.jobs:
- if job.name in checked:
- job.info('check() => [DROPPED] (already served by another job)')
- self.delete_job(job)
+
+ self.log.info('config successfully loaded')
+ self.config.update(config)
+ return True
+
+ def setup(self):
+ self.log.info('starting setup')
+ if not self.load_config():
+ return False
+
+ if not self.config['enabled']:
+ self.log.info('disabled in configuration file')
+ return False
+
+ for mod in self.modules_to_run:
+ if self.config.is_module_enabled(mod, False):
+ task = Task(mod, self.config.is_module_enabled(mod, True))
+ self.tasks.append(task)
+ else:
+ self.log.info("{0} : disabled in configuration file".format(mod))
+
+ if not self.tasks:
+ self.log.info('no modules to run')
+ return False
+
+ worker = ModuleChecker(self.task_queue, self.result_queue)
+ self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
+ worker.start()
+
+ # TODO: timeouts?
+ self.enqueue_tasks()
+ self.task_queue.join()
+ self.dequeue_results()
+ self.result_queue.join()
+ self.task_queue.close()
+ self.result_queue.close()
+ self.log.info('stopping checker process')
+ worker.join()
+
+ if not self.results:
+ self.log.info('no modules to run')
+ return False
+
+ self.log.info("setup complete, {0} active module(s) : '{1}'".format(
+ len(self.results),
+ [v.module_name for v in self.results])
+ )
+
+ return True
+
+ def create_jobs(self):
+ jobs = list()
+ for result in self.results:
+ module = Module(result.module_name)
+ try:
+ module.load_source()
+ except Exception as error:
+ self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
+ result.module_name, error))
continue
- ok = self.check_job(job)
- if ok:
- job.info('check() => [OK]')
- checked.append(job.name)
- job.checked = True
+
+ module_jobs = module.create_jobs(result.jobs_configs)
+ self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
+ jobs.extend(module_jobs)
+
+ return jobs
+
+ def prepare_jobs(self, jobs):
+ prepared = list()
+
+ for job in jobs:
+ check_name = job.override_name or job.name
+ if check_name in self.checked_jobs[job.module_name]:
+ self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
+ job.module_name, job.name))
continue
- if not job.is_autodetect() or ok is None:
- job.info('check() => [FAILED]')
- self.delete_job(job)
- else:
- job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
- def run_create(self):
- for job in self.jobs:
- if not job.checked:
- # skip autodetection_retry jobs
+ try:
+ job.init()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
+ job.module_name, job.name, error))
+ continue
+
+ self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
+
+ try:
+ ok = job.check()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
+ job.module_name, job.name, error))
continue
- ok = self.create_job_charts(job)
- if ok:
- job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
- job.created = True
+
+ if not ok:
+ self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
+ if job.autodetection_retry() > 0:
+ self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
+ job.module_name, job.name, job.autodetection_retry()))
+ self.auto_detection_jobs.append(job)
continue
- job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
- self.delete_job(job)
- def start(self):
- self.run_check()
- self.run_create()
- for job in self.jobs:
- if job.created:
- job.start()
+ self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
+
+ job.post_check(int(self.min_update_every))
+
+ if not job.create():
+ self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
+
+ self.checked_jobs[job.module_name].append(check_name)
+ prepared.append(job)
+
+ return prepared
+
+ def serve(self):
+ gc_run = self.config['gc_run']
+ gc_interval = self.config['gc_interval']
while True:
- if threading.active_count() <= 1 and not self.autodetect_jobs:
- run_and_exit(Logger.info)('FINISHED')
+ self.runs += 1
+
+ # threads: main + heartbeat
+ if threading.active_count() <= 2 and not self.auto_detection_jobs:
+ return
- sleep(self.sleep_time)
- self.cleanup()
- self.autodetect_retry()
+ time.sleep(1)
- # FIXME: https://github.com/netdata/netdata/issues/3817
- if self.do_gc and self.runs_counter % self.gc_interval == 0:
+ if gc_run and self.runs % gc_interval == 0:
v = gc.collect()
- Logger.debug("GC full collection run result: {0}".format(v))
-
- # for exiting on SIGPIPE
- if not IS_ATTY:
- safe_print('\n')
-
- def cleanup(self):
- for job in self.dead_jobs:
- self.delete_job(job)
- for mod in self:
- if not mod:
- del self.modules[mod.name]
-
- def autodetect_retry(self):
- self.runs_counter += self.sleep_time
- for job in self.autodetect_jobs:
- if self.runs_counter % job.recheck_every == 0:
- checked = self.check_job(job)
- if checked:
- created = self.create_job_charts(job)
- if not created:
- self.delete_job(job)
- continue
- job.start()
+ self.log.debug('GC collection run result: {0}'.format(v))
+
+ self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
+
+ def retry_job(self, job):
+ stop_retrying = True
+ retry_later = False
+
+ if self.runs % job.autodetection_retry() != 0:
+ return retry_later
+
+ check_name = job.override_name or job.name
+ if check_name in self.checked_jobs[job.module_name]:
+ self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
+ job.module_name, job.name))
+ return stop_retrying
+
+ try:
+ ok = job.check()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
+ job.module_name, job.name, error))
+ return stop_retrying
+
+ if not ok:
+ self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
+ job.module_name, job.name, job.autodetection_retry()))
+ return retry_later
+ self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
+
+ if not job.create():
+ return stop_retrying
+
+ job.post_check(int(self.min_update_every))
+ self.checked_jobs[job.module_name].append(check_name)
+ return stop_retrying
+
+
+def parse_cmd():
+ opts = sys.argv[:][1:]
+ debug = False
+ trace = False
+ update_every = 1
+ modules_to_run = list()
+
+ v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
+ if v:
+ update_every = v
+ opts.remove(v)
+ if 'debug' in opts:
+ debug = True
+ opts.remove('debug')
+ if 'trace' in opts:
+ trace = True
+ opts.remove('trace')
+ if opts:
+ modules_to_run = list(opts)
+
+ return collections.namedtuple(
+ 'CMD',
+ [
+ 'update_every',
+ 'debug',
+ 'trace',
+ 'modules_to_run',
+ ],
+ )(
+ update_every,
+ debug,
+ trace,
+ modules_to_run,
+ )
+
+
+def main():
+ cmd = parse_cmd()
+ logger = PythonDLogger()
+
+ if cmd.debug:
+ logger.logger.severity = 'DEBUG'
+ if cmd.trace:
+ logger.log_traceback = True
+
+ logger.info('using python v{0}'.format(PY_VERSION[0]))
+
+ unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
+ if unknown_modules:
+ logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
+ safe_print('DISABLE')
+ return
+
+ plugin = Plugin(
+ cmd.update_every,
+ cmd.modules_to_run or AVAILABLE_MODULES,
+ )
+
+ HeartBeat(1).start()
+
+ if not plugin.setup():
+ safe_print('DISABLE')
+ return
+
+ plugin.run()
+ logger.info('exiting from main...')
+ plugin.shutdown()
if __name__ == '__main__':
- DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
- Logger = PythonDLogger()
- if DEBUG:
- Logger.logger.severity = 'DEBUG'
- if TRACE:
- Logger.log_traceback = True
- Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
-
- plugin = Plugin()
- plugin.start()
+ main()