diff options
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rwxr-xr-x | collectors/python.d.plugin/python.d.plugin.in | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in new file mode 100755 index 000000000..7ac03fd99 --- /dev/null +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -0,0 +1,427 @@ +#!/usr/bin/env bash +'''':; exec "$(command -v python || command -v python3 || command -v python2 || +echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' + +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) +# SPDX-License-Identifier: GPL-3.0-or-later + +import gc +import os +import sys +import threading + +from re import sub +from sys import version_info, argv +from time import sleep + +GC_RUN = True +GC_COLLECT_EVERY = 300 + +PY_VERSION = version_info[:2] + +USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@') +STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@') + +PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d') +PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d') + + +PLUGINS_DIR = os.path.abspath(os.getenv( + 'NETDATA_PLUGINS_DIR', + os.path.dirname(__file__)) + '/../python.d') + + +PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules') + +sys.path.append(PYTHON_MODULES_DIR) + +from bases.loaders import ModuleAndConfigLoader # noqa: E402 +from bases.loggers import PythonDLogger # noqa: E402 +from bases.collection import setdefault_values, run_and_exit # noqa: E402 + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + +BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), + 'retries': 60, + 'priority': 60000, + 'autodetection_retry': 0, + 'chart_cleanup': 10, + 'name': str()} + + +MODULE_EXTENSION = '.chart.py' +OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log'] + + +def module_ok(m): + return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES + + +ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)] + + +def parse_cmd(): + debug = 'debug' in argv[1:] + trace = 'trace' in argv[1:] + override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False) + modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES] + return debug, trace, override_update_every, modules or ALL_MODULES + + +def multi_job_check(config): + return next((True for key in config if isinstance(config[key], dict)), False) + + +class RawModule: + def __init__(self, name, path, explicitly_enabled=True): + self.name = name + self.path = path + self.explicitly_enabled = explicitly_enabled + + +class Job(object): + def __init__(self, initialized_job, job_id): + """ + :param initialized_job: instance of <Class Service> + :param job_id: <str> + """ + self.job = initialized_job + self.id = job_id # key in Modules.jobs() + self.module_name = self.job.__module__ # used in Plugin.delete_job() + self.recheck_every = self.job.configuration.pop('autodetection_retry') + self.checked = False # used in Plugin.check_job() + self.created = False # used in Plugin.create_job_charts() + if self.job.update_every < int(OVERRIDE_UPDATE_EVERY): + self.job.update_every = int(OVERRIDE_UPDATE_EVERY) + + def __getattr__(self, item): + return getattr(self.job, item) + + def __repr__(self): + return self.job.__repr__() + + def is_dead(self): + return bool(self.ident) and not self.is_alive() + + def not_launched(self): + return not bool(self.ident) + + def is_autodetect(self): + return self.recheck_every + + +class Module(object): + def __init__(self, service, config): + """ + :param service: <Module> + :param config: <dict> + """ + self.service = service + self.name = service.__name__ + self.config = self.jobs_configurations_builder(config) + self.jobs = OrderedDict() + self.counter = 1 + + self.initialize_jobs() + + def __repr__(self): + return "<Class Module '{name}'>".format(name=self.name) + + def __iter__(self): + return iter(OrderedDict(self.jobs).values()) + + def __getitem__(self, item): + return self.jobs[item] + + def __delitem__(self, key): + del self.jobs[key] + + def __len__(self): + return len(self.jobs) + + def __bool__(self): + return bool(self.jobs) + + def __nonzero__(self): + return self.__bool__() + + def jobs_configurations_builder(self, config): + """ + :param config: <dict> + :return: + """ + counter = 0 + job_base_config = dict() + + for attr in BASE_CONFIG: + job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr])) + + if not config: + config = {str(): dict()} + elif not multi_job_check(config): + config = {str(): config} + + for job_name in config: + if not isinstance(config[job_name], dict): + continue + + job_config = setdefault_values(config[job_name], base_dict=job_base_config) + job_name = sub(r'\s+', '_', job_name) + config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name']) + counter += 1 + job_id = 'job' + str(counter).zfill(3) + + yield job_id, job_name, job_config + + def initialize_jobs(self): + """ + :return: + """ + for job_id, job_name, job_config in self.config: + job_config['job_name'] = job_name + job_config['override_name'] = job_config.pop('name') + + try: + initialized_job = self.service.Service(configuration=job_config) + except Exception as error: + Logger.error("job initialization: '{module_name} {job_name}' " + "=> ['FAILED'] ({error})".format(module_name=self.name, + job_name=job_name, + error=error)) + continue + else: + Logger.debug("job initialization: '{module_name} {job_name}' " + "=> ['OK']".format(module_name=self.name, + job_name=job_name or self.name)) + self.jobs[job_id] = Job(initialized_job=initialized_job, + job_id=job_id) + del self.config + del self.service + + +class Plugin(object): + def __init__(self): + self.loader = ModuleAndConfigLoader() + self.modules = OrderedDict() + self.sleep_time = 1 + self.runs_counter = 0 + + user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf') + stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf') + + Logger.debug("loading '{0}'".format(user_config)) + self.config, error = self.loader.load_config_from_file(user_config) + + if error: + Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error)) + Logger.debug("loading '{0}'".format(stock_config)) + self.config, error = self.loader.load_config_from_file(stock_config) + if error: + Logger.error("cannot load '{0}': {1}".format(stock_config, error)) + + self.do_gc = self.config.get("gc_run", GC_RUN) + self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY) + + if not self.config.get('enabled', True): + run_and_exit(Logger.info)('DISABLED in configuration file.') + + self.load_and_initialize_modules() + if not self.modules: + run_and_exit(Logger.info)('No modules to run. Exit...') + + def __iter__(self): + return iter(OrderedDict(self.modules).values()) + + @property + def jobs(self): + return (job for mod in self for job in mod) + + @property + def dead_jobs(self): + return (job for job in self.jobs if job.is_dead()) + + @property + def autodetect_jobs(self): + return [job for job in self.jobs if job.not_launched()] + + def enabled_modules(self): + for mod in MODULES_TO_RUN: + mod_name = mod[:-len(MODULE_EXTENSION)] + mod_path = os.path.join(PLUGINS_DIR, mod) + if any( + [ + self.config.get('default_run', True) and self.config.get(mod_name, True), + (not self.config.get('default_run')) and self.config.get(mod_name), + ] + ): + yield RawModule( + name=mod_name, + path=mod_path, + explicitly_enabled=self.config.get(mod_name), + ) + + def load_and_initialize_modules(self): + for mod in self.enabled_modules(): + + # Load module from file ------------------------------------------------------------ + loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path) + log = Logger.error if error else Logger.debug + log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', + module_name=mod.name)) + if error: + Logger.error("load source error : {0}".format(error)) + continue + + # Load module config from file ------------------------------------------------------ + user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf') + stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf') + + Logger.debug("loading '{0}'".format(user_config)) + loaded_config, error = self.loader.load_config_from_file(user_config) + if error: + Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error)) + Logger.debug("loading '{0}'".format(stock_config)) + loaded_config, error = self.loader.load_config_from_file(stock_config) + + if error: + Logger.error("cannot load '{0}': {1}".format(stock_config, error)) + + # Skip disabled modules + if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled: + Logger.info("module '{0}' disabled by default".format(loaded_module.__name__)) + continue + + # Module initialization --------------------------------------------------- + + initialized_module = Module(service=loaded_module, config=loaded_config) + Logger.debug("module status: '{module_name}' => [{status}] " + "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED', + module_name=initialized_module.name, + jobs_number=len(initialized_module))) + if initialized_module: + self.modules[initialized_module.name] = initialized_module + + @staticmethod + def check_job(job): + """ + :param job: <Job> + :return: + """ + try: + check_ok = bool(job.check()) + except Exception as error: + job.error('check() unhandled exception: {error}'.format(error=error)) + return None + else: + return check_ok + + @staticmethod + def create_job_charts(job): + """ + :param job: <Job> + :return: + """ + try: + create_ok = job.create() + except Exception as error: + job.error('create() unhandled exception: {error}'.format(error=error)) + return False + else: + return create_ok + + def delete_job(self, job): + """ + :param job: <Job> + :return: + """ + del self.modules[job.module_name][job.id] + + def run_check(self): + checked = list() + for job in self.jobs: + if job.name in checked: + job.info('check() => [DROPPED] (already served by another job)') + self.delete_job(job) + continue + ok = self.check_job(job) + if ok: + job.info('check() => [OK]') + checked.append(job.name) + job.checked = True + continue + if not job.is_autodetect() or ok is None: + job.info('check() => [FAILED]') + self.delete_job(job) + else: + job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every)) + + def run_create(self): + for job in self.jobs: + if not job.checked: + # skip autodetection_retry jobs + continue + ok = self.create_job_charts(job) + if ok: + job.debug('create() => [OK] (charts: {0})'.format(len(job.charts))) + job.created = True + continue + job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts))) + self.delete_job(job) + + def start(self): + self.run_check() + self.run_create() + for job in self.jobs: + if job.created: + job.start() + + while True: + if threading.active_count() <= 1 and not self.autodetect_jobs: + run_and_exit(Logger.info)('FINISHED') + + sleep(self.sleep_time) + self.cleanup() + self.autodetect_retry() + + # FIXME: https://github.com/netdata/netdata/issues/3817 + if self.do_gc and self.runs_counter % self.gc_interval == 0: + v = gc.collect() + Logger.debug("GC full collection run result: {0}".format(v)) + + def cleanup(self): + for job in self.dead_jobs: + self.delete_job(job) + for mod in self: + if not mod: + del self.modules[mod.name] + + def autodetect_retry(self): + self.runs_counter += self.sleep_time + for job in self.autodetect_jobs: + if self.runs_counter % job.recheck_every == 0: + checked = self.check_job(job) + if checked: + created = self.create_job_charts(job) + if not created: + self.delete_job(job) + continue + job.start() + + +if __name__ == '__main__': + DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd() + Logger = PythonDLogger() + if DEBUG: + Logger.logger.severity = 'DEBUG' + if TRACE: + Logger.log_traceback = True + Logger.info('Using python {version}'.format(version=PY_VERSION[0])) + + plugin = Plugin() + plugin.start() |