diff options
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin | 733 |
1 files changed, 0 insertions, 733 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin b/collectors/python.d.plugin/python.d.plugin deleted file mode 100644 index 468df13a4..000000000 --- a/collectors/python.d.plugin/python.d.plugin +++ /dev/null @@ -1,733 +0,0 @@ -#!/usr/bin/env bash -'''':; -if [[ "$OSTYPE" == "darwin"* ]]; then - export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES -fi -exec "$(command -v python || command -v python3 || command -v python2 || -echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' - -# -*- coding: utf-8 -*- -# Description: -# Author: Pawel Krupa (paulfantom) -# Author: Ilya Mashchenko (l2isbad) -# SPDX-License-Identifier: GPL-3.0-or-later - - -import collections -import copy -import gc -import multiprocessing -import os -import re -import sys -import time -import threading -import types - -PY_VERSION = sys.version_info[:2] - -if PY_VERSION > (3, 1): - from importlib.machinery import SourceFileLoader -else: - from imp import load_source as SourceFileLoader - - -ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' -ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' -ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' -ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' - - -def dirs(): - user_config = os.getenv( - ENV_NETDATA_USER_CONFIG_DIR, - '/etc/netdata', - ) - stock_config = os.getenv( - ENV_NETDATA_STOCK_CONFIG_DIR, - '/usr/lib/netdata/conf.d', - ) - modules_user_config = os.path.join(user_config, 'python.d') - modules_stock_config = os.path.join(stock_config, 'python.d') - - modules = os.path.abspath( - os.getenv( - ENV_NETDATA_PLUGINS_DIR, - os.path.dirname(__file__), - ) + '/../python.d' - ) - pythond_packages = os.path.join(modules, 'python_modules') - - return collections.namedtuple( - 'Dirs', - [ - 'user_config', - 'stock_config', - 'modules_user_config', - 'modules_stock_config', - 'modules', - 'pythond_packages', - ] - )( - user_config, - stock_config, - modules_user_config, - modules_stock_config, - modules, - pythond_packages, - ) - - -DIRS = dirs() - -sys.path.append(DIRS.pythond_packages) - - -from bases.collection import safe_print -from bases.loggers import PythonDLogger -from bases.loaders import load_config - -try: - from collections import OrderedDict -except ImportError: - from third_party.ordereddict import OrderedDict - - -END_TASK_MARKER = None - -IS_ATTY = sys.stdout.isatty() - -PLUGIN_CONF_FILE = 'python.d.conf' - -MODULE_SUFFIX = '.chart.py' - -OBSOLETED_MODULES = ( - 'apache_cache', # replaced by web_log - 'cpuidle', # rewritten in C - 'cpufreq', # rewritten in C - 'gunicorn_log', # replaced by web_log - 'linux_power_supply', # rewritten in C - 'nginx_log', # replaced by web_log - 'mdstat', # rewritten in C - 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624 -) - - -AVAILABLE_MODULES = [ - m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules)) - if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES -] - -PLUGIN_BASE_CONF = { - 'enabled': True, - 'default_run': True, - 'gc_run': True, - 'gc_interval': 300, -} - -JOB_BASE_CONF = { - 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1), - 'priority': 60000, - 'autodetection_retry': 0, - 'chart_cleanup': 10, - 'penalty': True, - 'name': str(), -} - - -def heartbeat(): - if IS_ATTY: - return - safe_print('\n') - - -class HeartBeat(threading.Thread): - def __init__(self, every): - threading.Thread.__init__(self) - self.daemon = True - self.every = every - - def run(self): - while True: - time.sleep(self.every) - heartbeat() - - -def load_module(name): - abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) - module = SourceFileLoader(name, abs_path) - if isinstance(module, types.ModuleType): - return module - return module.load_module() - - -def multi_path_find(name, paths): - for path in paths: - abs_name = os.path.join(path, name) - if os.path.isfile(abs_name): - return abs_name - return '' - - -Task = collections.namedtuple( - 'Task', - [ - 'module_name', - 'explicitly_enabled', - ], -) - -Result = collections.namedtuple( - 'Result', - [ - 'module_name', - 'jobs_configs', - ], -) - - -class ModuleChecker(multiprocessing.Process): - def __init__( - self, - task_queue, - result_queue, - ): - multiprocessing.Process.__init__(self) - self.log = PythonDLogger() - self.log.job_name = 'checker' - self.task_queue = task_queue - self.result_queue = result_queue - - def run(self): - self.log.info('starting...') - HeartBeat(1).start() - while self.run_once(): - pass - self.log.info('terminating...') - - def run_once(self): - task = self.task_queue.get() - - if task is END_TASK_MARKER: - # TODO: find better solution, understand why heartbeat thread doesn't work - heartbeat() - self.task_queue.task_done() - self.result_queue.put(END_TASK_MARKER) - return False - - result = self.do_task(task) - if result: - self.result_queue.put(result) - self.task_queue.task_done() - - return True - - def do_task(self, task): - self.log.info("{0} : checking".format(task.module_name)) - - # LOAD SOURCE - module = Module(task.module_name) - try: - module.load_source() - except Exception as error: - self.log.warning("{0} : error on loading source : {1}, skipping module".format( - task.module_name, - error, - )) - return None - else: - self.log.info("{0} : source successfully loaded".format(task.module_name)) - - if module.is_disabled_by_default() and not task.explicitly_enabled: - self.log.info("{0} : disabled by default".format(task.module_name)) - return None - - # LOAD CONFIG - paths = [ - DIRS.modules_user_config, - DIRS.modules_stock_config, - ] - - conf_abs_path = multi_path_find( - name='{0}.conf'.format(task.module_name), - paths=paths, - ) - - if conf_abs_path: - self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path)) - try: - module.load_config(conf_abs_path) - except Exception as error: - self.log.warning("{0} : error on loading config : {1}, skipping module".format( - task.module_name, error)) - return None - else: - self.log.info("{0} : config was not found in '{1}', using default 1 job config".format( - task.module_name, paths)) - - # CHECK JOBS - jobs = module.create_jobs() - self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs))) - - successful_jobs_configs = list() - for job in jobs: - if job.autodetection_retry() > 0: - successful_jobs_configs.append(job.config) - self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name)) - continue - - try: - job.init() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format( - task.module_name, job.name, error)) - continue - - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - task.module_name, job.name, error)) - continue - - if not ok: - self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name)) - continue - - self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name)) - - job.config['autodetection_retry'] = job.config['update_every'] - successful_jobs_configs.append(job.config) - - if not successful_jobs_configs: - self.log.info("{0} : all jobs failed, skipping module".format(task.module_name)) - return None - - return Result(module.source.__name__, successful_jobs_configs) - - -class JobConf(OrderedDict): - def __init__(self, *args): - OrderedDict.__init__(self, *args) - - def set_defaults_from_module(self, module): - for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]: - self[k] = getattr(module, k) - - def set_defaults_from_config(self, module_config): - for k in [k for k in JOB_BASE_CONF if k in module_config]: - self[k] = module_config[k] - - def set_job_name(self, name): - self['job_name'] = re.sub(r'\s+', '_', name) - - def set_override_name(self, name): - self['override_name'] = re.sub(r'\s+', '_', name) - - def as_dict(self): - return copy.deepcopy(OrderedDict(self)) - - -class Job: - def __init__( - self, - service, - module_name, - config, - ): - self.service = service - self.config = config - self.module_name = module_name - self.name = config['job_name'] - self.override_name = config['override_name'] - self.wrapped = None - - def init(self): - self.wrapped = self.service(configuration=self.config.as_dict()) - - def check(self): - return self.wrapped.check() - - def post_check(self, min_update_every): - if self.wrapped.update_every < min_update_every: - self.wrapped.update_every = min_update_every - - def create(self): - return self.wrapped.create() - - def autodetection_retry(self): - return self.config['autodetection_retry'] - - def run(self): - self.wrapped.run() - - -class Module: - def __init__(self, name): - self.name = name - self.source = None - self.config = dict() - - def is_disabled_by_default(self): - return bool(getattr(self.source, 'disabled_by_default', False)) - - def load_source(self): - self.source = load_module(self.name) - - def load_config(self, abs_path): - self.config = load_config(abs_path) or dict() - - def gather_jobs_configs(self): - job_names = [v for v in self.config if isinstance(self.config[v], dict)] - - if len(job_names) == 0: - job_conf = JobConf(JOB_BASE_CONF) - job_conf.set_defaults_from_module(self.source) - job_conf.update(self.config) - job_conf.set_job_name(self.name) - job_conf.set_override_name(job_conf.pop('name')) - return [job_conf] - - configs = list() - for job_name in job_names: - raw_job_conf = self.config[job_name] - job_conf = JobConf(JOB_BASE_CONF) - job_conf.set_defaults_from_module(self.source) - job_conf.set_defaults_from_config(self.config) - job_conf.update(raw_job_conf) - job_conf.set_job_name(job_name) - job_conf.set_override_name(job_conf.pop('name')) - configs.append(job_conf) - - return configs - - def create_jobs(self, jobs_conf=None): - return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()] - - -class JobRunner(threading.Thread): - def __init__(self, job): - threading.Thread.__init__(self) - self.daemon = True - self.wrapped = job - - def run(self): - self.wrapped.run() - - -class PluginConf(dict): - def __init__(self, *args): - dict.__init__(self, *args) - - def is_module_enabled(self, module_name, explicit): - if module_name in self: - return self[module_name] - if explicit: - return False - return self['default_run'] - - -class Plugin: - def __init__( - self, - min_update_every=1, - modules_to_run=tuple(AVAILABLE_MODULES), - ): - self.log = PythonDLogger() - self.config = PluginConf(PLUGIN_BASE_CONF) - self.task_queue = multiprocessing.JoinableQueue() - self.result_queue = multiprocessing.JoinableQueue() - self.min_update_every = min_update_every - self.modules_to_run = modules_to_run - self.auto_detection_jobs = list() - self.tasks = list() - self.results = list() - self.checked_jobs = collections.defaultdict(list) - self.runs = 0 - - @staticmethod - def shutdown(): - safe_print('DISABLE') - exit(0) - - def run(self): - jobs = self.create_jobs() - if not jobs: - return - - for job in self.prepare_jobs(jobs): - self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name)) - JobRunner(job).start() - - self.serve() - - def enqueue_tasks(self): - for task in self.tasks: - self.task_queue.put(task) - self.task_queue.put(END_TASK_MARKER) - - def dequeue_results(self): - while True: - result = self.result_queue.get() - self.result_queue.task_done() - if result is END_TASK_MARKER: - break - self.results.append(result) - - def load_config(self): - paths = [ - DIRS.user_config, - DIRS.stock_config, - ] - - self.log.info("checking for config in {0}".format(paths)) - abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths) - if not abs_path: - self.log.warning('config was not found, using defaults') - return True - - self.log.info("config found, loading config '{0}'".format(abs_path)) - try: - config = load_config(abs_path) or dict() - except Exception as error: - self.log.error('error on loading config : {0}'.format(error)) - return False - - self.log.info('config successfully loaded') - self.config.update(config) - return True - - def setup(self): - self.log.info('starting setup') - if not self.load_config(): - return False - - if not self.config['enabled']: - self.log.info('disabled in configuration file') - return False - - for mod in self.modules_to_run: - if self.config.is_module_enabled(mod, False): - task = Task(mod, self.config.is_module_enabled(mod, True)) - self.tasks.append(task) - else: - self.log.info("{0} : disabled in configuration file".format(mod)) - - if not self.tasks: - self.log.info('no modules to run') - return False - - worker = ModuleChecker(self.task_queue, self.result_queue) - self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks))) - worker.start() - - # TODO: timeouts? - self.enqueue_tasks() - self.task_queue.join() - self.dequeue_results() - self.result_queue.join() - self.task_queue.close() - self.result_queue.close() - self.log.info('stopping checker process') - worker.join() - - if not self.results: - self.log.info('no modules to run') - return False - - self.log.info("setup complete, {0} active module(s) : '{1}'".format( - len(self.results), - [v.module_name for v in self.results]) - ) - - return True - - def create_jobs(self): - jobs = list() - for result in self.results: - module = Module(result.module_name) - try: - module.load_source() - except Exception as error: - self.log.warning("{0} : error on loading module source : {1}, skipping module".format( - result.module_name, error)) - continue - - module_jobs = module.create_jobs(result.jobs_configs) - self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs))) - jobs.extend(module_jobs) - - return jobs - - def prepare_jobs(self, jobs): - prepared = list() - - for job in jobs: - check_name = job.override_name or job.name - if check_name in self.checked_jobs[job.module_name]: - self.log.info('{0}[{1}] : already served by another job, skipping the job'.format( - job.module_name, job.name)) - continue - - try: - job.init() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( - job.module_name, job.name, error)) - continue - - self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name)) - - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - job.module_name, job.name, error)) - continue - - if not ok: - self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name)) - if job.autodetection_retry() > 0: - self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format( - job.module_name, job.name, job.autodetection_retry())) - self.auto_detection_jobs.append(job) - continue - - self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name)) - - job.post_check(int(self.min_update_every)) - - if not job.create(): - self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name)) - - self.checked_jobs[job.module_name].append(check_name) - prepared.append(job) - - return prepared - - def serve(self): - gc_run = self.config['gc_run'] - gc_interval = self.config['gc_interval'] - - while True: - self.runs += 1 - - # threads: main + heartbeat - if threading.active_count() <= 2 and not self.auto_detection_jobs: - return - - time.sleep(1) - - if gc_run and self.runs % gc_interval == 0: - v = gc.collect() - self.log.debug('GC collection run result: {0}'.format(v)) - - self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)] - - def retry_job(self, job): - stop_retrying = True - retry_later = False - - if self.runs % job.autodetection_retry() != 0: - return retry_later - - check_name = job.override_name or job.name - if check_name in self.checked_jobs[job.module_name]: - self.log.info("{0}[{1}]: already served by another job, give up on retrying".format( - job.module_name, job.name)) - return stop_retrying - - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format( - job.module_name, job.name, error)) - return stop_retrying - - if not ok: - self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format( - job.module_name, job.name, job.autodetection_retry())) - return retry_later - self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name)) - - if not job.create(): - return stop_retrying - - job.post_check(int(self.min_update_every)) - self.checked_jobs[job.module_name].append(check_name) - JobRunner(job).start() - - return stop_retrying - - -def parse_cmd(): - opts = sys.argv[:][1:] - debug = False - trace = False - update_every = 1 - modules_to_run = list() - - v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None) - if v: - update_every = v - opts.remove(v) - if 'debug' in opts: - debug = True - opts.remove('debug') - if 'trace' in opts: - trace = True - opts.remove('trace') - if opts: - modules_to_run = list(opts) - - return collections.namedtuple( - 'CMD', - [ - 'update_every', - 'debug', - 'trace', - 'modules_to_run', - ], - )( - update_every, - debug, - trace, - modules_to_run, - ) - - -def main(): - cmd = parse_cmd() - logger = PythonDLogger() - - if cmd.debug: - logger.logger.severity = 'DEBUG' - if cmd.trace: - logger.log_traceback = True - - logger.info('using python v{0}'.format(PY_VERSION[0])) - - unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) - if unknown_modules: - logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules)))) - safe_print('DISABLE') - return - - plugin = Plugin( - cmd.update_every, - cmd.modules_to_run or AVAILABLE_MODULES, - ) - - HeartBeat(1).start() - - if not plugin.setup(): - safe_print('DISABLE') - return - - plugin.run() - logger.info('exiting from main...') - plugin.shutdown() - - -if __name__ == '__main__': - main() |