#!/usr/bin/env bash '''':; if [[ "$OSTYPE" == "darwin"* ]]; then export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES fi exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # -*- coding: utf-8 -*- # Description: # Author: Pawel Krupa (paulfantom) # Author: Ilya Mashchenko (l2isbad) # SPDX-License-Identifier: GPL-3.0-or-later import collections import copy import gc import multiprocessing import os import re import sys import time import threading import types PY_VERSION = sys.version_info[:2] if PY_VERSION > (3, 1): from importlib.machinery import SourceFileLoader else: from imp import load_source as SourceFileLoader ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' def dirs(): user_config = os.getenv( ENV_NETDATA_USER_CONFIG_DIR, '/etc/netdata', ) stock_config = os.getenv( ENV_NETDATA_STOCK_CONFIG_DIR, '/usr/lib/netdata/conf.d', ) modules_user_config = os.path.join(user_config, 'python.d') modules_stock_config = os.path.join(stock_config, 'python.d') modules = os.path.abspath( os.getenv( ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__), ) + '/../python.d' ) pythond_packages = os.path.join(modules, 'python_modules') return collections.namedtuple( 'Dirs', [ 'user_config', 'stock_config', 'modules_user_config', 'modules_stock_config', 'modules', 'pythond_packages', ] )( user_config, stock_config, modules_user_config, modules_stock_config, modules, pythond_packages, ) DIRS = dirs() sys.path.append(DIRS.pythond_packages) from bases.collection import safe_print from bases.loggers import PythonDLogger from bases.loaders import load_config try: from collections import OrderedDict except ImportError: from third_party.ordereddict import OrderedDict END_TASK_MARKER = None IS_ATTY = sys.stdout.isatty() PLUGIN_CONF_FILE = 'python.d.conf' MODULE_SUFFIX = '.chart.py' OBSOLETED_MODULES = ( 'apache_cache', # replaced by web_log 'cpuidle', # rewritten in C 'cpufreq', # rewritten in C 'gunicorn_log', # replaced by web_log 'linux_power_supply', # rewritten in C 'nginx_log', # replaced by web_log 'mdstat', # rewritten in C 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624 ) AVAILABLE_MODULES = [ m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules)) if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES ] PLUGIN_BASE_CONF = { 'enabled': True, 'default_run': True, 'gc_run': True, 'gc_interval': 300, } JOB_BASE_CONF = { 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1), 'priority': 60000, 'autodetection_retry': 0, 'chart_cleanup': 10, 'penalty': True, 'name': str(), } def heartbeat(): if IS_ATTY: return safe_print('\n') class HeartBeat(threading.Thread): def __init__(self, every): threading.Thread.__init__(self) self.daemon = True self.every = every def run(self): while True: time.sleep(self.every) heartbeat() def load_module(name): abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) module = SourceFileLoader(name, abs_path) if isinstance(module, types.ModuleType): return module return module.load_module() def multi_path_find(name, paths): for path in paths: abs_name = os.path.join(path, name) if os.path.isfile(abs_name): return abs_name return '' Task = collections.namedtuple( 'Task', [ 'module_name', 'explicitly_enabled', ], ) Result = collections.namedtuple( 'Result', [ 'module_name', 'jobs_configs', ], ) class ModuleChecker(multiprocessing.Process): def __init__( self, task_queue, result_queue, ): multiprocessing.Process.__init__(self) self.log = PythonDLogger() self.log.job_name = 'checker' self.task_queue = task_queue self.result_queue = result_queue def run(self): self.log.info('starting...') HeartBeat(1).start() while self.run_once(): pass self.log.info('terminating...') def run_once(self): task = self.task_queue.get() if task is END_TASK_MARKER: # TODO: find better solution, understand why heartbeat thread doesn't work heartbeat() self.task_queue.task_done() self.result_queue.put(END_TASK_MARKER) return False result = self.do_task(task) if result: self.result_queue.put(result) self.task_queue.task_done() return True def do_task(self, task): self.log.info("{0} : checking".format(task.module_name)) # LOAD SOURCE module = Module(task.module_name) try: module.load_source() except Exception as error: self.log.warning("{0} : error on loading source : {1}, skipping module".format( task.module_name, error, )) return None else: self.log.info("{0} : source successfully loaded".format(task.module_name)) if module.is_disabled_by_default() and not task.explicitly_enabled: self.log.info("{0} : disabled by default".format(task.module_name)) return None # LOAD CONFIG paths = [ DIRS.modules_user_config, DIRS.modules_stock_config, ] conf_abs_path = multi_path_find( name='{0}.conf'.format(task.module_name), paths=paths, ) if conf_abs_path: self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path)) try: module.load_config(conf_abs_path) except Exception as error: self.log.warning("{0} : error on loading config : {1}, skipping module".format( task.module_name, error)) return None else: self.log.info("{0} : config was not found in '{1}', using default 1 job config".format( task.module_name, paths)) # CHECK JOBS jobs = module.create_jobs() self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs))) successful_jobs_configs = list() for job in jobs: if job.autodetection_retry() > 0: successful_jobs_configs.append(job.config) self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name)) continue try: job.init() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format( task.module_name, job.name, error)) continue try: ok = job.check() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( task.module_name, job.name, error)) continue if not ok: self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name)) continue self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name)) job.config['autodetection_retry'] = job.config['update_every'] successful_jobs_configs.append(job.config) if not successful_jobs_configs: self.log.info("{0} : all jobs failed, skipping module".format(task.module_name)) return None return Result(module.source.__name__, successful_jobs_configs) class JobConf(OrderedDict): def __init__(self, *args): OrderedDict.__init__(self, *args) def set_defaults_from_module(self, module): for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]: self[k] = getattr(module, k) def set_defaults_from_config(self, module_config): for k in [k for k in JOB_BASE_CONF if k in module_config]: self[k] = module_config[k] def set_job_name(self, name): self['job_name'] = re.sub(r'\s+', '_', name) def set_override_name(self, name): self['override_name'] = re.sub(r'\s+', '_', name) def as_dict(self): return copy.deepcopy(OrderedDict(self)) class Job: def __init__( self, service, module_name, config, ): self.service = service self.config = config self.module_name = module_name self.name = config['job_name'] self.override_name = config['override_name'] self.wrapped = None def init(self): self.wrapped = self.service(configuration=self.config.as_dict()) def check(self): return self.wrapped.check() def post_check(self, min_update_every): if self.wrapped.update_every < min_update_every: self.wrapped.update_every = min_update_every def create(self): return self.wrapped.create() def autodetection_retry(self): return self.config['autodetection_retry'] def run(self): self.wrapped.run() class Module: def __init__(self, name): self.name = name self.source = None self.config = dict() def is_disabled_by_default(self): return bool(getattr(self.source, 'disabled_by_default', False)) def load_source(self): self.source = load_module(self.name) def load_config(self, abs_path): self.config = load_config(abs_path) or dict() def gather_jobs_configs(self): job_names = [v for v in self.config if isinstance(self.config[v], dict)] if len(job_names) == 0: job_conf = JobConf(JOB_BASE_CONF) job_conf.set_defaults_from_module(self.source) job_conf.update(self.config) job_conf.set_job_name(self.name) job_conf.set_override_name(job_conf.pop('name')) return [job_conf] configs = list() for job_name in job_names: raw_job_conf = self.config[job_name] job_conf = JobConf(JOB_BASE_CONF) job_conf.set_defaults_from_module(self.source) job_conf.set_defaults_from_config(self.config) job_conf.update(raw_job_conf) job_conf.set_job_name(job_name) job_conf.set_override_name(job_conf.pop('name')) configs.append(job_conf) return configs def create_jobs(self, jobs_conf=None): return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()] class JobRunner(threading.Thread): def __init__(self, job): threading.Thread.__init__(self) self.daemon = True self.wrapped = job def run(self): self.wrapped.run() class PluginConf(dict): def __init__(self, *args): dict.__init__(self, *args) def is_module_enabled(self, module_name, explicit): if module_name in self: return self[module_name] if explicit: return False return self['default_run'] class Plugin: def __init__( self, min_update_every=1, modules_to_run=tuple(AVAILABLE_MODULES), ): self.log = PythonDLogger() self.config = PluginConf(PLUGIN_BASE_CONF) self.task_queue = multiprocessing.JoinableQueue() self.result_queue = multiprocessing.JoinableQueue() self.min_update_every = min_update_every self.modules_to_run = modules_to_run self.auto_detection_jobs = list() self.tasks = list() self.results = list() self.checked_jobs = collections.defaultdict(list) self.runs = 0 @staticmethod def shutdown(): safe_print('DISABLE') exit(0) def run(self): jobs = self.create_jobs() if not jobs: return for job in self.prepare_jobs(jobs): self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name)) JobRunner(job).start() self.serve() def enqueue_tasks(self): for task in self.tasks: self.task_queue.put(task) self.task_queue.put(END_TASK_MARKER) def dequeue_results(self): while True: result = self.result_queue.get() self.result_queue.task_done() if result is END_TASK_MARKER: break self.results.append(result) def load_config(self): paths = [ DIRS.user_config, DIRS.stock_config, ] self.log.info("checking for config in {0}".format(paths)) abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths) if not abs_path: self.log.warning('config was not found, using defaults') return True self.log.info("config found, loading config '{0}'".format(abs_path)) try: config = load_config(abs_path) or dict() except Exception as error: self.log.error('error on loading config : {0}'.format(error)) return False self.log.info('config successfully loaded') self.config.update(config) return True def setup(self): self.log.info('starting setup') if not self.load_config(): return False if not self.config['enabled']: self.log.info('disabled in configuration file') return False for mod in self.modules_to_run: if self.config.is_module_enabled(mod, False): task = Task(mod, self.config.is_module_enabled(mod, True)) self.tasks.append(task) else: self.log.info("{0} : disabled in configuration file".format(mod)) if not self.tasks: self.log.info('no modules to run') return False worker = ModuleChecker(self.task_queue, self.result_queue) self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks))) worker.start() # TODO: timeouts? self.enqueue_tasks() self.task_queue.join() self.dequeue_results() self.result_queue.join() self.task_queue.close() self.result_queue.close() self.log.info('stopping checker process') worker.join() if not self.results: self.log.info('no modules to run') return False self.log.info("setup complete, {0} active module(s) : '{1}'".format( len(self.results), [v.module_name for v in self.results]) ) return True def create_jobs(self): jobs = list() for result in self.results: module = Module(result.module_name) try: module.load_source() except Exception as error: self.log.warning("{0} : error on loading module source : {1}, skipping module".format( result.module_name, error)) continue module_jobs = module.create_jobs(result.jobs_configs) self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs))) jobs.extend(module_jobs) return jobs def prepare_jobs(self, jobs): prepared = list() for job in jobs: check_name = job.override_name or job.name if check_name in self.checked_jobs[job.module_name]: self.log.info('{0}[{1}] : already served by another job, skipping the job'.format( job.module_name, job.name)) continue try: job.init() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( job.module_name, job.name, error)) continue self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name)) try: ok = job.check() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( job.module_name, job.name, error)) continue if not ok: self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name)) if job.autodetection_retry() > 0: self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format( job.module_name, job.name, job.autodetection_retry())) self.auto_detection_jobs.append(job) continue self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name)) job.post_check(int(self.min_update_every)) if not job.create(): self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name)) self.checked_jobs[job.module_name].append(check_name) prepared.append(job) return prepared def serve(self): gc_run = self.config['gc_run'] gc_interval = self.config['gc_interval'] while True: self.runs += 1 # threads: main + heartbeat if threading.active_count() <= 2 and not self.auto_detection_jobs: return time.sleep(1) if gc_run and self.runs % gc_interval == 0: v = gc.collect() self.log.debug('GC collection run result: {0}'.format(v)) self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)] def retry_job(self, job): stop_retrying = True retry_later = False if self.runs % job.autodetection_retry() != 0: return retry_later check_name = job.override_name or job.name if check_name in self.checked_jobs[job.module_name]: self.log.info("{0}[{1}]: already served by another job, give up on retrying".format( job.module_name, job.name)) return stop_retrying try: ok = job.check() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format( job.module_name, job.name, error)) return stop_retrying if not ok: self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format( job.module_name, job.name, job.autodetection_retry())) return retry_later self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name)) if not job.create(): return stop_retrying job.post_check(int(self.min_update_every)) self.checked_jobs[job.module_name].append(check_name) JobRunner(job).start() return stop_retrying def parse_cmd(): opts = sys.argv[:][1:] debug = False trace = False update_every = 1 modules_to_run = list() v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None) if v: update_every = v opts.remove(v) if 'debug' in opts: debug = True opts.remove('debug') if 'trace' in opts: trace = True opts.remove('trace') if opts: modules_to_run = list(opts) return collections.namedtuple( 'CMD', [ 'update_every', 'debug', 'trace', 'modules_to_run', ], )( update_every, debug, trace, modules_to_run, ) def main(): cmd = parse_cmd() logger = PythonDLogger() if cmd.debug: logger.logger.severity = 'DEBUG' if cmd.trace: logger.log_traceback = True logger.info('using python v{0}'.format(PY_VERSION[0])) unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) if unknown_modules: logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules)))) safe_print('DISABLE') return plugin = Plugin( cmd.update_every, cmd.modules_to_run or AVAILABLE_MODULES, ) HeartBeat(1).start() if not plugin.setup(): safe_print('DISABLE') return plugin.run() logger.info('exiting from main...') plugin.shutdown() if __name__ == '__main__': main()