summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin.in
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:19:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:20:17 +0000
commita64a253794ac64cb40befee54db53bde17dd0d49 (patch)
treec1024acc5f6e508814b944d99f112259bb28b1be /collectors/python.d.plugin/python.d.plugin.in
parentNew upstream version 1.10.0+dfsg (diff)
downloadnetdata-a64a253794ac64cb40befee54db53bde17dd0d49.tar.xz
netdata-a64a253794ac64cb40befee54db53bde17dd0d49.zip
New upstream version 1.11.0+dfsgupstream/1.11.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rwxr-xr-xcollectors/python.d.plugin/python.d.plugin.in427
1 files changed, 427 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
new file mode 100755
index 000000000..7ac03fd99
--- /dev/null
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -0,0 +1,427 @@
+#!/usr/bin/env bash
+'''':; exec "$(command -v python || command -v python3 || command -v python2 ||
+echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
+
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import gc
+import os
+import sys
+import threading
+
+from re import sub
+from sys import version_info, argv
+from time import sleep
+
+GC_RUN = True
+GC_COLLECT_EVERY = 300
+
+PY_VERSION = version_info[:2]
+
+USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
+STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
+
+PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
+PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
+
+
+PLUGINS_DIR = os.path.abspath(os.getenv(
+ 'NETDATA_PLUGINS_DIR',
+ os.path.dirname(__file__)) + '/../python.d')
+
+
+PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
+
+sys.path.append(PYTHON_MODULES_DIR)
+
+from bases.loaders import ModuleAndConfigLoader # noqa: E402
+from bases.loggers import PythonDLogger # noqa: E402
+from bases.collection import setdefault_values, run_and_exit # noqa: E402
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
+
+BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
+ 'retries': 60,
+ 'priority': 60000,
+ 'autodetection_retry': 0,
+ 'chart_cleanup': 10,
+ 'name': str()}
+
+
+MODULE_EXTENSION = '.chart.py'
+OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log']
+
+
+def module_ok(m):
+ return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
+
+
+ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
+
+
+def parse_cmd():
+ debug = 'debug' in argv[1:]
+ trace = 'trace' in argv[1:]
+ override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
+ modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
+ return debug, trace, override_update_every, modules or ALL_MODULES
+
+
+def multi_job_check(config):
+ return next((True for key in config if isinstance(config[key], dict)), False)
+
+
+class RawModule:
+ def __init__(self, name, path, explicitly_enabled=True):
+ self.name = name
+ self.path = path
+ self.explicitly_enabled = explicitly_enabled
+
+
+class Job(object):
+ def __init__(self, initialized_job, job_id):
+ """
+ :param initialized_job: instance of <Class Service>
+ :param job_id: <str>
+ """
+ self.job = initialized_job
+ self.id = job_id # key in Modules.jobs()
+ self.module_name = self.job.__module__ # used in Plugin.delete_job()
+ self.recheck_every = self.job.configuration.pop('autodetection_retry')
+ self.checked = False # used in Plugin.check_job()
+ self.created = False # used in Plugin.create_job_charts()
+ if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
+ self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
+
+ def __getattr__(self, item):
+ return getattr(self.job, item)
+
+ def __repr__(self):
+ return self.job.__repr__()
+
+ def is_dead(self):
+ return bool(self.ident) and not self.is_alive()
+
+ def not_launched(self):
+ return not bool(self.ident)
+
+ def is_autodetect(self):
+ return self.recheck_every
+
+
+class Module(object):
+ def __init__(self, service, config):
+ """
+ :param service: <Module>
+ :param config: <dict>
+ """
+ self.service = service
+ self.name = service.__name__
+ self.config = self.jobs_configurations_builder(config)
+ self.jobs = OrderedDict()
+ self.counter = 1
+
+ self.initialize_jobs()
+
+ def __repr__(self):
+ return "<Class Module '{name}'>".format(name=self.name)
+
+ def __iter__(self):
+ return iter(OrderedDict(self.jobs).values())
+
+ def __getitem__(self, item):
+ return self.jobs[item]
+
+ def __delitem__(self, key):
+ del self.jobs[key]
+
+ def __len__(self):
+ return len(self.jobs)
+
+ def __bool__(self):
+ return bool(self.jobs)
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def jobs_configurations_builder(self, config):
+ """
+ :param config: <dict>
+ :return:
+ """
+ counter = 0
+ job_base_config = dict()
+
+ for attr in BASE_CONFIG:
+ job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
+
+ if not config:
+ config = {str(): dict()}
+ elif not multi_job_check(config):
+ config = {str(): config}
+
+ for job_name in config:
+ if not isinstance(config[job_name], dict):
+ continue
+
+ job_config = setdefault_values(config[job_name], base_dict=job_base_config)
+ job_name = sub(r'\s+', '_', job_name)
+ config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
+ counter += 1
+ job_id = 'job' + str(counter).zfill(3)
+
+ yield job_id, job_name, job_config
+
+ def initialize_jobs(self):
+ """
+ :return:
+ """
+ for job_id, job_name, job_config in self.config:
+ job_config['job_name'] = job_name
+ job_config['override_name'] = job_config.pop('name')
+
+ try:
+ initialized_job = self.service.Service(configuration=job_config)
+ except Exception as error:
+ Logger.error("job initialization: '{module_name} {job_name}' "
+ "=> ['FAILED'] ({error})".format(module_name=self.name,
+ job_name=job_name,
+ error=error))
+ continue
+ else:
+ Logger.debug("job initialization: '{module_name} {job_name}' "
+ "=> ['OK']".format(module_name=self.name,
+ job_name=job_name or self.name))
+ self.jobs[job_id] = Job(initialized_job=initialized_job,
+ job_id=job_id)
+ del self.config
+ del self.service
+
+
+class Plugin(object):
+ def __init__(self):
+ self.loader = ModuleAndConfigLoader()
+ self.modules = OrderedDict()
+ self.sleep_time = 1
+ self.runs_counter = 0
+
+ user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
+ stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
+
+ Logger.debug("loading '{0}'".format(user_config))
+ self.config, error = self.loader.load_config_from_file(user_config)
+
+ if error:
+ Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
+ Logger.debug("loading '{0}'".format(stock_config))
+ self.config, error = self.loader.load_config_from_file(stock_config)
+ if error:
+ Logger.error("cannot load '{0}': {1}".format(stock_config, error))
+
+ self.do_gc = self.config.get("gc_run", GC_RUN)
+ self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
+
+ if not self.config.get('enabled', True):
+ run_and_exit(Logger.info)('DISABLED in configuration file.')
+
+ self.load_and_initialize_modules()
+ if not self.modules:
+ run_and_exit(Logger.info)('No modules to run. Exit...')
+
+ def __iter__(self):
+ return iter(OrderedDict(self.modules).values())
+
+ @property
+ def jobs(self):
+ return (job for mod in self for job in mod)
+
+ @property
+ def dead_jobs(self):
+ return (job for job in self.jobs if job.is_dead())
+
+ @property
+ def autodetect_jobs(self):
+ return [job for job in self.jobs if job.not_launched()]
+
+ def enabled_modules(self):
+ for mod in MODULES_TO_RUN:
+ mod_name = mod[:-len(MODULE_EXTENSION)]
+ mod_path = os.path.join(PLUGINS_DIR, mod)
+ if any(
+ [
+ self.config.get('default_run', True) and self.config.get(mod_name, True),
+ (not self.config.get('default_run')) and self.config.get(mod_name),
+ ]
+ ):
+ yield RawModule(
+ name=mod_name,
+ path=mod_path,
+ explicitly_enabled=self.config.get(mod_name),
+ )
+
+ def load_and_initialize_modules(self):
+ for mod in self.enabled_modules():
+
+ # Load module from file ------------------------------------------------------------
+ loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
+ log = Logger.error if error else Logger.debug
+ log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
+ module_name=mod.name))
+ if error:
+ Logger.error("load source error : {0}".format(error))
+ continue
+
+ # Load module config from file ------------------------------------------------------
+ user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
+ stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
+
+ Logger.debug("loading '{0}'".format(user_config))
+ loaded_config, error = self.loader.load_config_from_file(user_config)
+ if error:
+ Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
+ Logger.debug("loading '{0}'".format(stock_config))
+ loaded_config, error = self.loader.load_config_from_file(stock_config)
+
+ if error:
+ Logger.error("cannot load '{0}': {1}".format(stock_config, error))
+
+ # Skip disabled modules
+ if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
+ Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
+ continue
+
+ # Module initialization ---------------------------------------------------
+
+ initialized_module = Module(service=loaded_module, config=loaded_config)
+ Logger.debug("module status: '{module_name}' => [{status}] "
+ "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
+ module_name=initialized_module.name,
+ jobs_number=len(initialized_module)))
+ if initialized_module:
+ self.modules[initialized_module.name] = initialized_module
+
+ @staticmethod
+ def check_job(job):
+ """
+ :param job: <Job>
+ :return:
+ """
+ try:
+ check_ok = bool(job.check())
+ except Exception as error:
+ job.error('check() unhandled exception: {error}'.format(error=error))
+ return None
+ else:
+ return check_ok
+
+ @staticmethod
+ def create_job_charts(job):
+ """
+ :param job: <Job>
+ :return:
+ """
+ try:
+ create_ok = job.create()
+ except Exception as error:
+ job.error('create() unhandled exception: {error}'.format(error=error))
+ return False
+ else:
+ return create_ok
+
+ def delete_job(self, job):
+ """
+ :param job: <Job>
+ :return:
+ """
+ del self.modules[job.module_name][job.id]
+
+ def run_check(self):
+ checked = list()
+ for job in self.jobs:
+ if job.name in checked:
+ job.info('check() => [DROPPED] (already served by another job)')
+ self.delete_job(job)
+ continue
+ ok = self.check_job(job)
+ if ok:
+ job.info('check() => [OK]')
+ checked.append(job.name)
+ job.checked = True
+ continue
+ if not job.is_autodetect() or ok is None:
+ job.info('check() => [FAILED]')
+ self.delete_job(job)
+ else:
+ job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
+
+ def run_create(self):
+ for job in self.jobs:
+ if not job.checked:
+ # skip autodetection_retry jobs
+ continue
+ ok = self.create_job_charts(job)
+ if ok:
+ job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
+ job.created = True
+ continue
+ job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
+ self.delete_job(job)
+
+ def start(self):
+ self.run_check()
+ self.run_create()
+ for job in self.jobs:
+ if job.created:
+ job.start()
+
+ while True:
+ if threading.active_count() <= 1 and not self.autodetect_jobs:
+ run_and_exit(Logger.info)('FINISHED')
+
+ sleep(self.sleep_time)
+ self.cleanup()
+ self.autodetect_retry()
+
+ # FIXME: https://github.com/netdata/netdata/issues/3817
+ if self.do_gc and self.runs_counter % self.gc_interval == 0:
+ v = gc.collect()
+ Logger.debug("GC full collection run result: {0}".format(v))
+
+ def cleanup(self):
+ for job in self.dead_jobs:
+ self.delete_job(job)
+ for mod in self:
+ if not mod:
+ del self.modules[mod.name]
+
+ def autodetect_retry(self):
+ self.runs_counter += self.sleep_time
+ for job in self.autodetect_jobs:
+ if self.runs_counter % job.recheck_every == 0:
+ checked = self.check_job(job)
+ if checked:
+ created = self.create_job_charts(job)
+ if not created:
+ self.delete_job(job)
+ continue
+ job.start()
+
+
+if __name__ == '__main__':
+ DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
+ Logger = PythonDLogger()
+ if DEBUG:
+ Logger.logger.severity = 'DEBUG'
+ if TRACE:
+ Logger.log_traceback = True
+ Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
+
+ plugin = Plugin()
+ plugin.start()