diff options
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin | 733 |
1 files changed, 733 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin b/collectors/python.d.plugin/python.d.plugin new file mode 100644 index 000000000..468df13a4 --- /dev/null +++ b/collectors/python.d.plugin/python.d.plugin @@ -0,0 +1,733 @@ +#!/usr/bin/env bash +'''':; +if [[ "$OSTYPE" == "darwin"* ]]; then + export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES +fi +exec "$(command -v python || command -v python3 || command -v python2 || +echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' + +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) +# SPDX-License-Identifier: GPL-3.0-or-later + + +import collections +import copy +import gc +import multiprocessing +import os +import re +import sys +import time +import threading +import types + +PY_VERSION = sys.version_info[:2] + +if PY_VERSION > (3, 1): + from importlib.machinery import SourceFileLoader +else: + from imp import load_source as SourceFileLoader + + +ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' +ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' +ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' +ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' + + +def dirs(): + user_config = os.getenv( + ENV_NETDATA_USER_CONFIG_DIR, + '/etc/netdata', + ) + stock_config = os.getenv( + ENV_NETDATA_STOCK_CONFIG_DIR, + '/usr/lib/netdata/conf.d', + ) + modules_user_config = os.path.join(user_config, 'python.d') + modules_stock_config = os.path.join(stock_config, 'python.d') + + modules = os.path.abspath( + os.getenv( + ENV_NETDATA_PLUGINS_DIR, + os.path.dirname(__file__), + ) + '/../python.d' + ) + pythond_packages = os.path.join(modules, 'python_modules') + + return collections.namedtuple( + 'Dirs', + [ + 'user_config', + 'stock_config', + 'modules_user_config', + 'modules_stock_config', + 'modules', + 'pythond_packages', + ] + )( + user_config, + stock_config, + modules_user_config, + modules_stock_config, + modules, + pythond_packages, + ) + + +DIRS = dirs() + +sys.path.append(DIRS.pythond_packages) + + +from bases.collection import safe_print +from bases.loggers import PythonDLogger +from bases.loaders import load_config + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + + +END_TASK_MARKER = None + +IS_ATTY = sys.stdout.isatty() + +PLUGIN_CONF_FILE = 'python.d.conf' + +MODULE_SUFFIX = '.chart.py' + +OBSOLETED_MODULES = ( + 'apache_cache', # replaced by web_log + 'cpuidle', # rewritten in C + 'cpufreq', # rewritten in C + 'gunicorn_log', # replaced by web_log + 'linux_power_supply', # rewritten in C + 'nginx_log', # replaced by web_log + 'mdstat', # rewritten in C + 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624 +) + + +AVAILABLE_MODULES = [ + m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules)) + if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES +] + +PLUGIN_BASE_CONF = { + 'enabled': True, + 'default_run': True, + 'gc_run': True, + 'gc_interval': 300, +} + +JOB_BASE_CONF = { + 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1), + 'priority': 60000, + 'autodetection_retry': 0, + 'chart_cleanup': 10, + 'penalty': True, + 'name': str(), +} + + +def heartbeat(): + if IS_ATTY: + return + safe_print('\n') + + +class HeartBeat(threading.Thread): + def __init__(self, every): + threading.Thread.__init__(self) + self.daemon = True + self.every = every + + def run(self): + while True: + time.sleep(self.every) + heartbeat() + + +def load_module(name): + abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) + module = SourceFileLoader(name, abs_path) + if isinstance(module, types.ModuleType): + return module + return module.load_module() + + +def multi_path_find(name, paths): + for path in paths: + abs_name = os.path.join(path, name) + if os.path.isfile(abs_name): + return abs_name + return '' + + +Task = collections.namedtuple( + 'Task', + [ + 'module_name', + 'explicitly_enabled', + ], +) + +Result = collections.namedtuple( + 'Result', + [ + 'module_name', + 'jobs_configs', + ], +) + + +class ModuleChecker(multiprocessing.Process): + def __init__( + self, + task_queue, + result_queue, + ): + multiprocessing.Process.__init__(self) + self.log = PythonDLogger() + self.log.job_name = 'checker' + self.task_queue = task_queue + self.result_queue = result_queue + + def run(self): + self.log.info('starting...') + HeartBeat(1).start() + while self.run_once(): + pass + self.log.info('terminating...') + + def run_once(self): + task = self.task_queue.get() + + if task is END_TASK_MARKER: + # TODO: find better solution, understand why heartbeat thread doesn't work + heartbeat() + self.task_queue.task_done() + self.result_queue.put(END_TASK_MARKER) + return False + + result = self.do_task(task) + if result: + self.result_queue.put(result) + self.task_queue.task_done() + + return True + + def do_task(self, task): + self.log.info("{0} : checking".format(task.module_name)) + + # LOAD SOURCE + module = Module(task.module_name) + try: + module.load_source() + except Exception as error: + self.log.warning("{0} : error on loading source : {1}, skipping module".format( + task.module_name, + error, + )) + return None + else: + self.log.info("{0} : source successfully loaded".format(task.module_name)) + + if module.is_disabled_by_default() and not task.explicitly_enabled: + self.log.info("{0} : disabled by default".format(task.module_name)) + return None + + # LOAD CONFIG + paths = [ + DIRS.modules_user_config, + DIRS.modules_stock_config, + ] + + conf_abs_path = multi_path_find( + name='{0}.conf'.format(task.module_name), + paths=paths, + ) + + if conf_abs_path: + self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path)) + try: + module.load_config(conf_abs_path) + except Exception as error: + self.log.warning("{0} : error on loading config : {1}, skipping module".format( + task.module_name, error)) + return None + else: + self.log.info("{0} : config was not found in '{1}', using default 1 job config".format( + task.module_name, paths)) + + # CHECK JOBS + jobs = module.create_jobs() + self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs))) + + successful_jobs_configs = list() + for job in jobs: + if job.autodetection_retry() > 0: + successful_jobs_configs.append(job.config) + self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name)) + continue + + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format( + task.module_name, job.name, error)) + continue + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + task.module_name, job.name, error)) + continue + + if not ok: + self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name)) + continue + + self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name)) + + job.config['autodetection_retry'] = job.config['update_every'] + successful_jobs_configs.append(job.config) + + if not successful_jobs_configs: + self.log.info("{0} : all jobs failed, skipping module".format(task.module_name)) + return None + + return Result(module.source.__name__, successful_jobs_configs) + + +class JobConf(OrderedDict): + def __init__(self, *args): + OrderedDict.__init__(self, *args) + + def set_defaults_from_module(self, module): + for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]: + self[k] = getattr(module, k) + + def set_defaults_from_config(self, module_config): + for k in [k for k in JOB_BASE_CONF if k in module_config]: + self[k] = module_config[k] + + def set_job_name(self, name): + self['job_name'] = re.sub(r'\s+', '_', name) + + def set_override_name(self, name): + self['override_name'] = re.sub(r'\s+', '_', name) + + def as_dict(self): + return copy.deepcopy(OrderedDict(self)) + + +class Job: + def __init__( + self, + service, + module_name, + config, + ): + self.service = service + self.config = config + self.module_name = module_name + self.name = config['job_name'] + self.override_name = config['override_name'] + self.wrapped = None + + def init(self): + self.wrapped = self.service(configuration=self.config.as_dict()) + + def check(self): + return self.wrapped.check() + + def post_check(self, min_update_every): + if self.wrapped.update_every < min_update_every: + self.wrapped.update_every = min_update_every + + def create(self): + return self.wrapped.create() + + def autodetection_retry(self): + return self.config['autodetection_retry'] + + def run(self): + self.wrapped.run() + + +class Module: + def __init__(self, name): + self.name = name + self.source = None + self.config = dict() + + def is_disabled_by_default(self): + return bool(getattr(self.source, 'disabled_by_default', False)) + + def load_source(self): + self.source = load_module(self.name) + + def load_config(self, abs_path): + self.config = load_config(abs_path) or dict() + + def gather_jobs_configs(self): + job_names = [v for v in self.config if isinstance(self.config[v], dict)] + + if len(job_names) == 0: + job_conf = JobConf(JOB_BASE_CONF) + job_conf.set_defaults_from_module(self.source) + job_conf.update(self.config) + job_conf.set_job_name(self.name) + job_conf.set_override_name(job_conf.pop('name')) + return [job_conf] + + configs = list() + for job_name in job_names: + raw_job_conf = self.config[job_name] + job_conf = JobConf(JOB_BASE_CONF) + job_conf.set_defaults_from_module(self.source) + job_conf.set_defaults_from_config(self.config) + job_conf.update(raw_job_conf) + job_conf.set_job_name(job_name) + job_conf.set_override_name(job_conf.pop('name')) + configs.append(job_conf) + + return configs + + def create_jobs(self, jobs_conf=None): + return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()] + + +class JobRunner(threading.Thread): + def __init__(self, job): + threading.Thread.__init__(self) + self.daemon = True + self.wrapped = job + + def run(self): + self.wrapped.run() + + +class PluginConf(dict): + def __init__(self, *args): + dict.__init__(self, *args) + + def is_module_enabled(self, module_name, explicit): + if module_name in self: + return self[module_name] + if explicit: + return False + return self['default_run'] + + +class Plugin: + def __init__( + self, + min_update_every=1, + modules_to_run=tuple(AVAILABLE_MODULES), + ): + self.log = PythonDLogger() + self.config = PluginConf(PLUGIN_BASE_CONF) + self.task_queue = multiprocessing.JoinableQueue() + self.result_queue = multiprocessing.JoinableQueue() + self.min_update_every = min_update_every + self.modules_to_run = modules_to_run + self.auto_detection_jobs = list() + self.tasks = list() + self.results = list() + self.checked_jobs = collections.defaultdict(list) + self.runs = 0 + + @staticmethod + def shutdown(): + safe_print('DISABLE') + exit(0) + + def run(self): + jobs = self.create_jobs() + if not jobs: + return + + for job in self.prepare_jobs(jobs): + self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name)) + JobRunner(job).start() + + self.serve() + + def enqueue_tasks(self): + for task in self.tasks: + self.task_queue.put(task) + self.task_queue.put(END_TASK_MARKER) + + def dequeue_results(self): + while True: + result = self.result_queue.get() + self.result_queue.task_done() + if result is END_TASK_MARKER: + break + self.results.append(result) + + def load_config(self): + paths = [ + DIRS.user_config, + DIRS.stock_config, + ] + + self.log.info("checking for config in {0}".format(paths)) + abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths) + if not abs_path: + self.log.warning('config was not found, using defaults') + return True + + self.log.info("config found, loading config '{0}'".format(abs_path)) + try: + config = load_config(abs_path) or dict() + except Exception as error: + self.log.error('error on loading config : {0}'.format(error)) + return False + + self.log.info('config successfully loaded') + self.config.update(config) + return True + + def setup(self): + self.log.info('starting setup') + if not self.load_config(): + return False + + if not self.config['enabled']: + self.log.info('disabled in configuration file') + return False + + for mod in self.modules_to_run: + if self.config.is_module_enabled(mod, False): + task = Task(mod, self.config.is_module_enabled(mod, True)) + self.tasks.append(task) + else: + self.log.info("{0} : disabled in configuration file".format(mod)) + + if not self.tasks: + self.log.info('no modules to run') + return False + + worker = ModuleChecker(self.task_queue, self.result_queue) + self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks))) + worker.start() + + # TODO: timeouts? + self.enqueue_tasks() + self.task_queue.join() + self.dequeue_results() + self.result_queue.join() + self.task_queue.close() + self.result_queue.close() + self.log.info('stopping checker process') + worker.join() + + if not self.results: + self.log.info('no modules to run') + return False + + self.log.info("setup complete, {0} active module(s) : '{1}'".format( + len(self.results), + [v.module_name for v in self.results]) + ) + + return True + + def create_jobs(self): + jobs = list() + for result in self.results: + module = Module(result.module_name) + try: + module.load_source() + except Exception as error: + self.log.warning("{0} : error on loading module source : {1}, skipping module".format( + result.module_name, error)) + continue + + module_jobs = module.create_jobs(result.jobs_configs) + self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs))) + jobs.extend(module_jobs) + + return jobs + + def prepare_jobs(self, jobs): + prepared = list() + + for job in jobs: + check_name = job.override_name or job.name + if check_name in self.checked_jobs[job.module_name]: + self.log.info('{0}[{1}] : already served by another job, skipping the job'.format( + job.module_name, job.name)) + continue + + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( + job.module_name, job.name, error)) + continue + + self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name)) + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + job.module_name, job.name, error)) + continue + + if not ok: + self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name)) + if job.autodetection_retry() > 0: + self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format( + job.module_name, job.name, job.autodetection_retry())) + self.auto_detection_jobs.append(job) + continue + + self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name)) + + job.post_check(int(self.min_update_every)) + + if not job.create(): + self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name)) + + self.checked_jobs[job.module_name].append(check_name) + prepared.append(job) + + return prepared + + def serve(self): + gc_run = self.config['gc_run'] + gc_interval = self.config['gc_interval'] + + while True: + self.runs += 1 + + # threads: main + heartbeat + if threading.active_count() <= 2 and not self.auto_detection_jobs: + return + + time.sleep(1) + + if gc_run and self.runs % gc_interval == 0: + v = gc.collect() + self.log.debug('GC collection run result: {0}'.format(v)) + + self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)] + + def retry_job(self, job): + stop_retrying = True + retry_later = False + + if self.runs % job.autodetection_retry() != 0: + return retry_later + + check_name = job.override_name or job.name + if check_name in self.checked_jobs[job.module_name]: + self.log.info("{0}[{1}]: already served by another job, give up on retrying".format( + job.module_name, job.name)) + return stop_retrying + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format( + job.module_name, job.name, error)) + return stop_retrying + + if not ok: + self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format( + job.module_name, job.name, job.autodetection_retry())) + return retry_later + self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name)) + + if not job.create(): + return stop_retrying + + job.post_check(int(self.min_update_every)) + self.checked_jobs[job.module_name].append(check_name) + JobRunner(job).start() + + return stop_retrying + + +def parse_cmd(): + opts = sys.argv[:][1:] + debug = False + trace = False + update_every = 1 + modules_to_run = list() + + v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None) + if v: + update_every = v + opts.remove(v) + if 'debug' in opts: + debug = True + opts.remove('debug') + if 'trace' in opts: + trace = True + opts.remove('trace') + if opts: + modules_to_run = list(opts) + + return collections.namedtuple( + 'CMD', + [ + 'update_every', + 'debug', + 'trace', + 'modules_to_run', + ], + )( + update_every, + debug, + trace, + modules_to_run, + ) + + +def main(): + cmd = parse_cmd() + logger = PythonDLogger() + + if cmd.debug: + logger.logger.severity = 'DEBUG' + if cmd.trace: + logger.log_traceback = True + + logger.info('using python v{0}'.format(PY_VERSION[0])) + + unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) + if unknown_modules: + logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules)))) + safe_print('DISABLE') + return + + plugin = Plugin( + cmd.update_every, + cmd.modules_to_run or AVAILABLE_MODULES, + ) + + HeartBeat(1).start() + + if not plugin.setup(): + safe_print('DISABLE') + return + + plugin.run() + logger.info('exiting from main...') + plugin.shutdown() + + +if __name__ == '__main__': + main() |