diff options
Diffstat (limited to 'plugins.d/python.d.plugin')
-rwxr-xr-x | plugins.d/python.d.plugin | 382 |
1 files changed, 0 insertions, 382 deletions
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin deleted file mode 100755 index c9b26016..00000000 --- a/plugins.d/python.d.plugin +++ /dev/null @@ -1,382 +0,0 @@ -#!/usr/bin/env bash -'''':; exec "$(command -v python || command -v python3 || command -v python2 || -echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' - -# -*- coding: utf-8 -*- -# Description: -# Author: Pawel Krupa (paulfantom) -# Author: Ilya Mashchenko (l2isbad) - -import os -import sys -import threading - -from re import sub -from sys import version_info, argv -from time import sleep - -try: - from time import monotonic as time -except ImportError: - from time import time - -PY_VERSION = version_info[:2] -PLUGIN_CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', os.path.dirname(__file__) + '/../../../../etc/netdata') + '/' -CHARTS_PY_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + '/../python.d') + '/' -CHARTS_PY_CONFIG_DIR = PLUGIN_CONFIG_DIR + 'python.d/' -PYTHON_MODULES_DIR = CHARTS_PY_DIR + 'python_modules' - -sys.path.append(PYTHON_MODULES_DIR) - -from bases.loaders import ModuleAndConfigLoader -from bases.loggers import PythonDLogger -from bases.collection import setdefault_values, run_and_exit - -try: - from collections import OrderedDict -except ImportError: - from third_party.ordereddict import OrderedDict - -BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), - 'retries': 60, - 'priority': 60000, - 'autodetection_retry': 0, - 'chart_cleanup': 10, - 'name': str()} - - -MODULE_EXTENSION = '.chart.py' -OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log'] - - -def module_ok(m): - return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES - - -ALL_MODULES = [m for m in sorted(os.listdir(CHARTS_PY_DIR)) if module_ok(m)] - - -def parse_cmd(): - debug = 'debug' in argv[1:] - trace = 'trace' in argv[1:] - override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False) - modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES] - return debug, trace, override_update_every, modules or ALL_MODULES - - -def multi_job_check(config): - return next((True for key in config if isinstance(config[key], dict)), False) - - -class Job(object): - def __init__(self, initialized_job, job_id): - """ - :param initialized_job: instance of <Class Service> - :param job_id: <str> - """ - self.job = initialized_job - self.id = job_id # key in Modules.jobs() - self.module_name = self.job.__module__ # used in Plugin.delete_job() - self.recheck_every = self.job.configuration.pop('autodetection_retry') - self.checked = False # used in Plugin.check_job() - self.created = False # used in Plugin.create_job_charts() - if OVERRIDE_UPDATE_EVERY: - self.job.update_every = int(OVERRIDE_UPDATE_EVERY) - - def __getattr__(self, item): - return getattr(self.job, item) - - def __repr__(self): - return self.job.__repr__() - - def is_dead(self): - return bool(self.ident) and not self.is_alive() - - def not_launched(self): - return not bool(self.ident) - - def is_autodetect(self): - return self.recheck_every - - -class Module(object): - def __init__(self, service, config): - """ - :param service: <Module> - :param config: <dict> - """ - self.service = service - self.name = service.__name__ - self.config = self.jobs_configurations_builder(config) - self.jobs = OrderedDict() - self.counter = 1 - - self.initialize_jobs() - - def __repr__(self): - return "<Class Module '{name}'>".format(name=self.name) - - def __iter__(self): - return iter(OrderedDict(self.jobs).values()) - - def __getitem__(self, item): - return self.jobs[item] - - def __delitem__(self, key): - del self.jobs[key] - - def __len__(self): - return len(self.jobs) - - def __bool__(self): - return bool(self.jobs) - - def __nonzero__(self): - return self.__bool__() - - def jobs_configurations_builder(self, config): - """ - :param config: <dict> - :return: - """ - counter = 0 - job_base_config = dict() - - for attr in BASE_CONFIG: - job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr])) - - if not config: - config = {str(): dict()} - elif not multi_job_check(config): - config = {str(): config} - - for job_name in config: - if not isinstance(config[job_name], dict): - continue - - job_config = setdefault_values(config[job_name], base_dict=job_base_config) - job_name = sub(r'\s+', '_', job_name) - config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name']) - counter += 1 - job_id = 'job' + str(counter).zfill(3) - - yield job_id, job_name, job_config - - def initialize_jobs(self): - """ - :return: - """ - for job_id, job_name, job_config in self.config: - job_config['job_name'] = job_name - job_config['override_name'] = job_config.pop('name') - - try: - initialized_job = self.service.Service(configuration=job_config) - except Exception as error: - Logger.error("job initialization: '{module_name} {job_name}' " - "=> ['FAILED'] ({error})".format(module_name=self.name, - job_name=job_name, - error=error)) - continue - else: - Logger.debug("job initialization: '{module_name} {job_name}' " - "=> ['OK']".format(module_name=self.name, - job_name=job_name or self.name)) - self.jobs[job_id] = Job(initialized_job=initialized_job, - job_id=job_id) - del self.config - del self.service - - -class Plugin(object): - def __init__(self): - self.loader = ModuleAndConfigLoader() - self.modules = OrderedDict() - self.sleep_time = 1 - self.runs_counter = 0 - self.config, error = self.loader.load_config_from_file(PLUGIN_CONFIG_DIR + 'python.d.conf') - if error: - Logger.error('"python.d.conf" configuration file not found. Using defaults.') - - if not self.config.get('enabled', True): - run_and_exit(Logger.info)('DISABLED in configuration file.') - - self.load_and_initialize_modules() - if not self.modules: - run_and_exit(Logger.info)('No modules to run. Exit...') - - def __iter__(self): - return iter(OrderedDict(self.modules).values()) - - @property - def jobs(self): - return (job for mod in self for job in mod) - - @property - def dead_jobs(self): - return (job for job in self.jobs if job.is_dead()) - - @property - def autodetect_jobs(self): - return [job for job in self.jobs if job.not_launched()] - - def enabled_modules(self): - for mod in MODULES_TO_RUN: - mod_name = mod[:-len(MODULE_EXTENSION)] - mod_path = CHARTS_PY_DIR + mod - conf_path = ''.join([CHARTS_PY_CONFIG_DIR, mod_name, '.conf']) - - if DEBUG: - yield mod, mod_name, mod_path, conf_path - else: - if all([self.config.get('default_run', True), - self.config.get(mod_name, True)]): - yield mod, mod_name, mod_path, conf_path - - elif all([not self.config.get('default_run'), - self.config.get(mod_name)]): - yield mod, mod_name, mod_path, conf_path - - def load_and_initialize_modules(self): - for mod, mod_name, mod_path, conf_path in self.enabled_modules(): - - # Load module from file ------------------------------------------------------------ - loaded_module, error = self.loader.load_module_from_file(mod_name, mod_path) - log = Logger.error if error else Logger.debug - log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', - module_name=mod_name)) - if error: - Logger.error("load source error : {0}".format(error)) - continue - - # Load module config from file ------------------------------------------------------ - loaded_config, error = self.loader.load_config_from_file(conf_path) - log = Logger.error if error else Logger.debug - log("module load config: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', - module_name=mod_name)) - if error: - Logger.error('load config error : {0}'.format(error)) - - # Service instance initialization --------------------------------------------------- - initialized_module = Module(service=loaded_module, config=loaded_config) - Logger.debug("module status: '{module_name}' => [{status}] " - "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED', - module_name=initialized_module.name, - jobs_number=len(initialized_module))) - - if initialized_module: - self.modules[initialized_module.name] = initialized_module - - @staticmethod - def check_job(job): - """ - :param job: <Job> - :return: - """ - try: - check_ok = bool(job.check()) - except Exception as error: - job.error('check() unhandled exception: {error}'.format(error=error)) - return None - else: - return check_ok - - @staticmethod - def create_job_charts(job): - """ - :param job: <Job> - :return: - """ - try: - create_ok = job.create() - except Exception as error: - job.error('create() unhandled exception: {error}'.format(error=error)) - return False - else: - return create_ok - - def delete_job(self, job): - """ - :param job: <Job> - :return: - """ - del self.modules[job.module_name][job.id] - - def run_check(self): - checked = list() - for job in self.jobs: - if job.name in checked: - job.info('check() => [DROPPED] (already served by another job)') - self.delete_job(job) - continue - ok = self.check_job(job) - if ok: - job.info('check() => [OK]') - checked.append(job.name) - job.checked = True - continue - if not job.is_autodetect() or ok is None: - job.info('check() => [FAILED]') - self.delete_job(job) - else: - job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every)) - - def run_create(self): - for job in self.jobs: - if not job.checked: - # skip autodetection_retry jobs - continue - ok = self.create_job_charts(job) - if ok: - job.debug('create() => [OK] (charts: {0})'.format(len(job.charts))) - job.created = True - continue - job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts))) - self.delete_job(job) - - def start(self): - self.run_check() - self.run_create() - for job in self.jobs: - if job.created: - job.start() - - while True: - if threading.active_count() <= 1 and not self.autodetect_jobs: - run_and_exit(Logger.info)('FINISHED') - - sleep(self.sleep_time) - self.cleanup() - self.autodetect_retry() - - def cleanup(self): - for job in self.dead_jobs: - self.delete_job(job) - for mod in self: - if not mod: - del self.modules[mod.name] - - def autodetect_retry(self): - self.runs_counter += self.sleep_time - for job in self.autodetect_jobs: - if self.runs_counter % job.recheck_every == 0: - checked = self.check_job(job) - if checked: - created = self.create_job_charts(job) - if not created: - self.delete_job(job) - continue - job.start() - - -if __name__ == '__main__': - DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd() - Logger = PythonDLogger() - if DEBUG: - Logger.logger.severity = 'DEBUG' - if TRACE: - Logger.log_traceback = True - Logger.info('Using python {version}'.format(version=PY_VERSION[0])) - - plugin = Plugin() - plugin.start() |