#!/usr/bin/env bash '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # -*- coding: utf-8 -*- # Description: # Author: Pawel Krupa (paulfantom) # Author: Ilya Mashchenko (l2isbad) # SPDX-License-Identifier: GPL-3.0-or-later import gc import os import sys import threading from re import sub from sys import version_info, argv from time import sleep GC_RUN = True GC_COLLECT_EVERY = 300 PY_VERSION = version_info[:2] USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@') STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@') PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d') PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d') PLUGINS_DIR = os.path.abspath(os.getenv( 'NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + '/../python.d') PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules') sys.path.append(PYTHON_MODULES_DIR) from bases.loaders import ModuleAndConfigLoader # noqa: E402 from bases.loggers import PythonDLogger # noqa: E402 from bases.collection import setdefault_values, run_and_exit # noqa: E402 try: from collections import OrderedDict except ImportError: from third_party.ordereddict import OrderedDict BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), 'retries': 60, 'priority': 60000, 'autodetection_retry': 0, 'chart_cleanup': 10, 'name': str()} MODULE_EXTENSION = '.chart.py' OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq'] def module_ok(m): return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)] def parse_cmd(): debug = 'debug' in argv[1:] trace = 'trace' in argv[1:] override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False) modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES] return debug, trace, override_update_every, modules or ALL_MODULES def multi_job_check(config): return next((True for key in config if isinstance(config[key], dict)), False) class RawModule: def __init__(self, name, path, explicitly_enabled=True): self.name = name self.path = path self.explicitly_enabled = explicitly_enabled class Job(object): def __init__(self, initialized_job, job_id): """ :param initialized_job: instance of :param job_id: """ self.job = initialized_job self.id = job_id # key in Modules.jobs() self.module_name = self.job.__module__ # used in Plugin.delete_job() self.recheck_every = self.job.configuration.pop('autodetection_retry') self.checked = False # used in Plugin.check_job() self.created = False # used in Plugin.create_job_charts() if self.job.update_every < int(OVERRIDE_UPDATE_EVERY): self.job.update_every = int(OVERRIDE_UPDATE_EVERY) def __getattr__(self, item): return getattr(self.job, item) def __repr__(self): return self.job.__repr__() def is_dead(self): return bool(self.ident) and not self.is_alive() def not_launched(self): return not bool(self.ident) def is_autodetect(self): return self.recheck_every class Module(object): def __init__(self, service, config): """ :param service: :param config: """ self.service = service self.name = service.__name__ self.config = self.jobs_configurations_builder(config) self.jobs = OrderedDict() self.counter = 1 self.initialize_jobs() def __repr__(self): return "".format(name=self.name) def __iter__(self): return iter(OrderedDict(self.jobs).values()) def __getitem__(self, item): return self.jobs[item] def __delitem__(self, key): del self.jobs[key] def __len__(self): return len(self.jobs) def __bool__(self): return bool(self.jobs) def __nonzero__(self): return self.__bool__() def jobs_configurations_builder(self, config): """ :param config: :return: """ counter = 0 job_base_config = dict() for attr in BASE_CONFIG: job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr])) if not config: config = {str(): dict()} elif not multi_job_check(config): config = {str(): config} for job_name in config: if not isinstance(config[job_name], dict): continue job_config = setdefault_values(config[job_name], base_dict=job_base_config) job_name = sub(r'\s+', '_', job_name) config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name']) counter += 1 job_id = 'job' + str(counter).zfill(3) yield job_id, job_name, job_config def initialize_jobs(self): """ :return: """ for job_id, job_name, job_config in self.config: job_config['job_name'] = job_name job_config['override_name'] = job_config.pop('name') try: initialized_job = self.service.Service(configuration=job_config) except Exception as error: Logger.error("job initialization: '{module_name} {job_name}' " "=> ['FAILED'] ({error})".format(module_name=self.name, job_name=job_name, error=error)) continue else: Logger.debug("job initialization: '{module_name} {job_name}' " "=> ['OK']".format(module_name=self.name, job_name=job_name or self.name)) self.jobs[job_id] = Job(initialized_job=initialized_job, job_id=job_id) del self.config del self.service class Plugin(object): def __init__(self): self.loader = ModuleAndConfigLoader() self.modules = OrderedDict() self.sleep_time = 1 self.runs_counter = 0 user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf') stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf') Logger.debug("loading '{0}'".format(user_config)) self.config, error = self.loader.load_config_from_file(user_config) if error: Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error)) Logger.debug("loading '{0}'".format(stock_config)) self.config, error = self.loader.load_config_from_file(stock_config) if error: Logger.error("cannot load '{0}': {1}".format(stock_config, error)) self.do_gc = self.config.get("gc_run", GC_RUN) self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY) if not self.config.get('enabled', True): run_and_exit(Logger.info)('DISABLED in configuration file.') self.load_and_initialize_modules() if not self.modules: run_and_exit(Logger.info)('No modules to run. Exit...') def __iter__(self): return iter(OrderedDict(self.modules).values()) @property def jobs(self): return (job for mod in self for job in mod) @property def dead_jobs(self): return (job for job in self.jobs if job.is_dead()) @property def autodetect_jobs(self): return [job for job in self.jobs if job.not_launched()] def enabled_modules(self): for mod in MODULES_TO_RUN: mod_name = mod[:-len(MODULE_EXTENSION)] mod_path = os.path.join(PLUGINS_DIR, mod) if any( [ self.config.get('default_run', True) and self.config.get(mod_name, True), (not self.config.get('default_run')) and self.config.get(mod_name), ] ): yield RawModule( name=mod_name, path=mod_path, explicitly_enabled=self.config.get(mod_name), ) def load_and_initialize_modules(self): for mod in self.enabled_modules(): # Load module from file ------------------------------------------------------------ loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path) log = Logger.error if error else Logger.debug log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK', module_name=mod.name)) if error: Logger.error("load source error : {0}".format(error)) continue # Load module config from file ------------------------------------------------------ user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf') stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf') Logger.debug("loading '{0}'".format(user_config)) loaded_config, error = self.loader.load_config_from_file(user_config) if error: Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error)) Logger.debug("loading '{0}'".format(stock_config)) loaded_config, error = self.loader.load_config_from_file(stock_config) if error: Logger.error("cannot load '{0}': {1}".format(stock_config, error)) # Skip disabled modules if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled: Logger.info("module '{0}' disabled by default".format(loaded_module.__name__)) continue # Module initialization --------------------------------------------------- initialized_module = Module(service=loaded_module, config=loaded_config) Logger.debug("module status: '{module_name}' => [{status}] " "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED', module_name=initialized_module.name, jobs_number=len(initialized_module))) if initialized_module: self.modules[initialized_module.name] = initialized_module @staticmethod def check_job(job): """ :param job: :return: """ try: check_ok = bool(job.check()) except Exception as error: job.error('check() unhandled exception: {error}'.format(error=error)) return None else: return check_ok @staticmethod def create_job_charts(job): """ :param job: :return: """ try: create_ok = job.create() except Exception as error: job.error('create() unhandled exception: {error}'.format(error=error)) return False else: return create_ok def delete_job(self, job): """ :param job: :return: """ del self.modules[job.module_name][job.id] def run_check(self): checked = list() for job in self.jobs: if job.name in checked: job.info('check() => [DROPPED] (already served by another job)') self.delete_job(job) continue ok = self.check_job(job) if ok: job.info('check() => [OK]') checked.append(job.name) job.checked = True continue if not job.is_autodetect() or ok is None: job.info('check() => [FAILED]') self.delete_job(job) else: job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every)) def run_create(self): for job in self.jobs: if not job.checked: # skip autodetection_retry jobs continue ok = self.create_job_charts(job) if ok: job.debug('create() => [OK] (charts: {0})'.format(len(job.charts))) job.created = True continue job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts))) self.delete_job(job) def start(self): self.run_check() self.run_create() for job in self.jobs: if job.created: job.start() while True: if threading.active_count() <= 1 and not self.autodetect_jobs: run_and_exit(Logger.info)('FINISHED') sleep(self.sleep_time) self.cleanup() self.autodetect_retry() # FIXME: https://github.com/netdata/netdata/issues/3817 if self.do_gc and self.runs_counter % self.gc_interval == 0: v = gc.collect() Logger.debug("GC full collection run result: {0}".format(v)) def cleanup(self): for job in self.dead_jobs: self.delete_job(job) for mod in self: if not mod: del self.modules[mod.name] def autodetect_retry(self): self.runs_counter += self.sleep_time for job in self.autodetect_jobs: if self.runs_counter % job.recheck_every == 0: checked = self.check_job(job) if checked: created = self.create_job_charts(job) if not created: self.delete_job(job) continue job.start() if __name__ == '__main__': DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd() Logger = PythonDLogger() if DEBUG: Logger.logger.severity = 'DEBUG' if TRACE: Logger.log_traceback = True Logger.info('Using python {version}'.format(version=PY_VERSION[0])) plugin = Plugin() plugin.start()