summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin')
-rw-r--r--collectors/python.d.plugin/python.d.plugin733
1 files changed, 0 insertions, 733 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin b/collectors/python.d.plugin/python.d.plugin
deleted file mode 100644
index 468df13a4..000000000
--- a/collectors/python.d.plugin/python.d.plugin
+++ /dev/null
@@ -1,733 +0,0 @@
-#!/usr/bin/env bash
-'''':;
-if [[ "$OSTYPE" == "darwin"* ]]; then
- export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
-fi
-exec "$(command -v python || command -v python3 || command -v python2 ||
-echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
-
-# -*- coding: utf-8 -*-
-# Description:
-# Author: Pawel Krupa (paulfantom)
-# Author: Ilya Mashchenko (l2isbad)
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-
-import collections
-import copy
-import gc
-import multiprocessing
-import os
-import re
-import sys
-import time
-import threading
-import types
-
-PY_VERSION = sys.version_info[:2]
-
-if PY_VERSION > (3, 1):
- from importlib.machinery import SourceFileLoader
-else:
- from imp import load_source as SourceFileLoader
-
-
-ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
-ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
-ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
-ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
-
-
-def dirs():
- user_config = os.getenv(
- ENV_NETDATA_USER_CONFIG_DIR,
- '/etc/netdata',
- )
- stock_config = os.getenv(
- ENV_NETDATA_STOCK_CONFIG_DIR,
- '/usr/lib/netdata/conf.d',
- )
- modules_user_config = os.path.join(user_config, 'python.d')
- modules_stock_config = os.path.join(stock_config, 'python.d')
-
- modules = os.path.abspath(
- os.getenv(
- ENV_NETDATA_PLUGINS_DIR,
- os.path.dirname(__file__),
- ) + '/../python.d'
- )
- pythond_packages = os.path.join(modules, 'python_modules')
-
- return collections.namedtuple(
- 'Dirs',
- [
- 'user_config',
- 'stock_config',
- 'modules_user_config',
- 'modules_stock_config',
- 'modules',
- 'pythond_packages',
- ]
- )(
- user_config,
- stock_config,
- modules_user_config,
- modules_stock_config,
- modules,
- pythond_packages,
- )
-
-
-DIRS = dirs()
-
-sys.path.append(DIRS.pythond_packages)
-
-
-from bases.collection import safe_print
-from bases.loggers import PythonDLogger
-from bases.loaders import load_config
-
-try:
- from collections import OrderedDict
-except ImportError:
- from third_party.ordereddict import OrderedDict
-
-
-END_TASK_MARKER = None
-
-IS_ATTY = sys.stdout.isatty()
-
-PLUGIN_CONF_FILE = 'python.d.conf'
-
-MODULE_SUFFIX = '.chart.py'
-
-OBSOLETED_MODULES = (
- 'apache_cache', # replaced by web_log
- 'cpuidle', # rewritten in C
- 'cpufreq', # rewritten in C
- 'gunicorn_log', # replaced by web_log
- 'linux_power_supply', # rewritten in C
- 'nginx_log', # replaced by web_log
- 'mdstat', # rewritten in C
- 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
-)
-
-
-AVAILABLE_MODULES = [
- m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
- if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
-]
-
-PLUGIN_BASE_CONF = {
- 'enabled': True,
- 'default_run': True,
- 'gc_run': True,
- 'gc_interval': 300,
-}
-
-JOB_BASE_CONF = {
- 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
- 'priority': 60000,
- 'autodetection_retry': 0,
- 'chart_cleanup': 10,
- 'penalty': True,
- 'name': str(),
-}
-
-
-def heartbeat():
- if IS_ATTY:
- return
- safe_print('\n')
-
-
-class HeartBeat(threading.Thread):
- def __init__(self, every):
- threading.Thread.__init__(self)
- self.daemon = True
- self.every = every
-
- def run(self):
- while True:
- time.sleep(self.every)
- heartbeat()
-
-
-def load_module(name):
- abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
- module = SourceFileLoader(name, abs_path)
- if isinstance(module, types.ModuleType):
- return module
- return module.load_module()
-
-
-def multi_path_find(name, paths):
- for path in paths:
- abs_name = os.path.join(path, name)
- if os.path.isfile(abs_name):
- return abs_name
- return ''
-
-
-Task = collections.namedtuple(
- 'Task',
- [
- 'module_name',
- 'explicitly_enabled',
- ],
-)
-
-Result = collections.namedtuple(
- 'Result',
- [
- 'module_name',
- 'jobs_configs',
- ],
-)
-
-
-class ModuleChecker(multiprocessing.Process):
- def __init__(
- self,
- task_queue,
- result_queue,
- ):
- multiprocessing.Process.__init__(self)
- self.log = PythonDLogger()
- self.log.job_name = 'checker'
- self.task_queue = task_queue
- self.result_queue = result_queue
-
- def run(self):
- self.log.info('starting...')
- HeartBeat(1).start()
- while self.run_once():
- pass
- self.log.info('terminating...')
-
- def run_once(self):
- task = self.task_queue.get()
-
- if task is END_TASK_MARKER:
- # TODO: find better solution, understand why heartbeat thread doesn't work
- heartbeat()
- self.task_queue.task_done()
- self.result_queue.put(END_TASK_MARKER)
- return False
-
- result = self.do_task(task)
- if result:
- self.result_queue.put(result)
- self.task_queue.task_done()
-
- return True
-
- def do_task(self, task):
- self.log.info("{0} : checking".format(task.module_name))
-
- # LOAD SOURCE
- module = Module(task.module_name)
- try:
- module.load_source()
- except Exception as error:
- self.log.warning("{0} : error on loading source : {1}, skipping module".format(
- task.module_name,
- error,
- ))
- return None
- else:
- self.log.info("{0} : source successfully loaded".format(task.module_name))
-
- if module.is_disabled_by_default() and not task.explicitly_enabled:
- self.log.info("{0} : disabled by default".format(task.module_name))
- return None
-
- # LOAD CONFIG
- paths = [
- DIRS.modules_user_config,
- DIRS.modules_stock_config,
- ]
-
- conf_abs_path = multi_path_find(
- name='{0}.conf'.format(task.module_name),
- paths=paths,
- )
-
- if conf_abs_path:
- self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
- try:
- module.load_config(conf_abs_path)
- except Exception as error:
- self.log.warning("{0} : error on loading config : {1}, skipping module".format(
- task.module_name, error))
- return None
- else:
- self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
- task.module_name, paths))
-
- # CHECK JOBS
- jobs = module.create_jobs()
- self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
-
- successful_jobs_configs = list()
- for job in jobs:
- if job.autodetection_retry() > 0:
- successful_jobs_configs.append(job.config)
- self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
- continue
-
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
- task.module_name, job.name, error))
- continue
-
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- task.module_name, job.name, error))
- continue
-
- if not ok:
- self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
- continue
-
- self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
-
- job.config['autodetection_retry'] = job.config['update_every']
- successful_jobs_configs.append(job.config)
-
- if not successful_jobs_configs:
- self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
- return None
-
- return Result(module.source.__name__, successful_jobs_configs)
-
-
-class JobConf(OrderedDict):
- def __init__(self, *args):
- OrderedDict.__init__(self, *args)
-
- def set_defaults_from_module(self, module):
- for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
- self[k] = getattr(module, k)
-
- def set_defaults_from_config(self, module_config):
- for k in [k for k in JOB_BASE_CONF if k in module_config]:
- self[k] = module_config[k]
-
- def set_job_name(self, name):
- self['job_name'] = re.sub(r'\s+', '_', name)
-
- def set_override_name(self, name):
- self['override_name'] = re.sub(r'\s+', '_', name)
-
- def as_dict(self):
- return copy.deepcopy(OrderedDict(self))
-
-
-class Job:
- def __init__(
- self,
- service,
- module_name,
- config,
- ):
- self.service = service
- self.config = config
- self.module_name = module_name
- self.name = config['job_name']
- self.override_name = config['override_name']
- self.wrapped = None
-
- def init(self):
- self.wrapped = self.service(configuration=self.config.as_dict())
-
- def check(self):
- return self.wrapped.check()
-
- def post_check(self, min_update_every):
- if self.wrapped.update_every < min_update_every:
- self.wrapped.update_every = min_update_every
-
- def create(self):
- return self.wrapped.create()
-
- def autodetection_retry(self):
- return self.config['autodetection_retry']
-
- def run(self):
- self.wrapped.run()
-
-
-class Module:
- def __init__(self, name):
- self.name = name
- self.source = None
- self.config = dict()
-
- def is_disabled_by_default(self):
- return bool(getattr(self.source, 'disabled_by_default', False))
-
- def load_source(self):
- self.source = load_module(self.name)
-
- def load_config(self, abs_path):
- self.config = load_config(abs_path) or dict()
-
- def gather_jobs_configs(self):
- job_names = [v for v in self.config if isinstance(self.config[v], dict)]
-
- if len(job_names) == 0:
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.update(self.config)
- job_conf.set_job_name(self.name)
- job_conf.set_override_name(job_conf.pop('name'))
- return [job_conf]
-
- configs = list()
- for job_name in job_names:
- raw_job_conf = self.config[job_name]
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.set_defaults_from_config(self.config)
- job_conf.update(raw_job_conf)
- job_conf.set_job_name(job_name)
- job_conf.set_override_name(job_conf.pop('name'))
- configs.append(job_conf)
-
- return configs
-
- def create_jobs(self, jobs_conf=None):
- return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
-
-
-class JobRunner(threading.Thread):
- def __init__(self, job):
- threading.Thread.__init__(self)
- self.daemon = True
- self.wrapped = job
-
- def run(self):
- self.wrapped.run()
-
-
-class PluginConf(dict):
- def __init__(self, *args):
- dict.__init__(self, *args)
-
- def is_module_enabled(self, module_name, explicit):
- if module_name in self:
- return self[module_name]
- if explicit:
- return False
- return self['default_run']
-
-
-class Plugin:
- def __init__(
- self,
- min_update_every=1,
- modules_to_run=tuple(AVAILABLE_MODULES),
- ):
- self.log = PythonDLogger()
- self.config = PluginConf(PLUGIN_BASE_CONF)
- self.task_queue = multiprocessing.JoinableQueue()
- self.result_queue = multiprocessing.JoinableQueue()
- self.min_update_every = min_update_every
- self.modules_to_run = modules_to_run
- self.auto_detection_jobs = list()
- self.tasks = list()
- self.results = list()
- self.checked_jobs = collections.defaultdict(list)
- self.runs = 0
-
- @staticmethod
- def shutdown():
- safe_print('DISABLE')
- exit(0)
-
- def run(self):
- jobs = self.create_jobs()
- if not jobs:
- return
-
- for job in self.prepare_jobs(jobs):
- self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
- JobRunner(job).start()
-
- self.serve()
-
- def enqueue_tasks(self):
- for task in self.tasks:
- self.task_queue.put(task)
- self.task_queue.put(END_TASK_MARKER)
-
- def dequeue_results(self):
- while True:
- result = self.result_queue.get()
- self.result_queue.task_done()
- if result is END_TASK_MARKER:
- break
- self.results.append(result)
-
- def load_config(self):
- paths = [
- DIRS.user_config,
- DIRS.stock_config,
- ]
-
- self.log.info("checking for config in {0}".format(paths))
- abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
- if not abs_path:
- self.log.warning('config was not found, using defaults')
- return True
-
- self.log.info("config found, loading config '{0}'".format(abs_path))
- try:
- config = load_config(abs_path) or dict()
- except Exception as error:
- self.log.error('error on loading config : {0}'.format(error))
- return False
-
- self.log.info('config successfully loaded')
- self.config.update(config)
- return True
-
- def setup(self):
- self.log.info('starting setup')
- if not self.load_config():
- return False
-
- if not self.config['enabled']:
- self.log.info('disabled in configuration file')
- return False
-
- for mod in self.modules_to_run:
- if self.config.is_module_enabled(mod, False):
- task = Task(mod, self.config.is_module_enabled(mod, True))
- self.tasks.append(task)
- else:
- self.log.info("{0} : disabled in configuration file".format(mod))
-
- if not self.tasks:
- self.log.info('no modules to run')
- return False
-
- worker = ModuleChecker(self.task_queue, self.result_queue)
- self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
- worker.start()
-
- # TODO: timeouts?
- self.enqueue_tasks()
- self.task_queue.join()
- self.dequeue_results()
- self.result_queue.join()
- self.task_queue.close()
- self.result_queue.close()
- self.log.info('stopping checker process')
- worker.join()
-
- if not self.results:
- self.log.info('no modules to run')
- return False
-
- self.log.info("setup complete, {0} active module(s) : '{1}'".format(
- len(self.results),
- [v.module_name for v in self.results])
- )
-
- return True
-
- def create_jobs(self):
- jobs = list()
- for result in self.results:
- module = Module(result.module_name)
- try:
- module.load_source()
- except Exception as error:
- self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
- result.module_name, error))
- continue
-
- module_jobs = module.create_jobs(result.jobs_configs)
- self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
- jobs.extend(module_jobs)
-
- return jobs
-
- def prepare_jobs(self, jobs):
- prepared = list()
-
- for job in jobs:
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
- job.module_name, job.name))
- continue
-
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
- job.module_name, job.name, error))
- continue
-
- self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
-
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- job.module_name, job.name, error))
- continue
-
- if not ok:
- self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
- if job.autodetection_retry() > 0:
- self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- self.auto_detection_jobs.append(job)
- continue
-
- self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
-
- job.post_check(int(self.min_update_every))
-
- if not job.create():
- self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
-
- self.checked_jobs[job.module_name].append(check_name)
- prepared.append(job)
-
- return prepared
-
- def serve(self):
- gc_run = self.config['gc_run']
- gc_interval = self.config['gc_interval']
-
- while True:
- self.runs += 1
-
- # threads: main + heartbeat
- if threading.active_count() <= 2 and not self.auto_detection_jobs:
- return
-
- time.sleep(1)
-
- if gc_run and self.runs % gc_interval == 0:
- v = gc.collect()
- self.log.debug('GC collection run result: {0}'.format(v))
-
- self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
-
- def retry_job(self, job):
- stop_retrying = True
- retry_later = False
-
- if self.runs % job.autodetection_retry() != 0:
- return retry_later
-
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
- job.module_name, job.name))
- return stop_retrying
-
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
- job.module_name, job.name, error))
- return stop_retrying
-
- if not ok:
- self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- return retry_later
- self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
-
- if not job.create():
- return stop_retrying
-
- job.post_check(int(self.min_update_every))
- self.checked_jobs[job.module_name].append(check_name)
- JobRunner(job).start()
-
- return stop_retrying
-
-
-def parse_cmd():
- opts = sys.argv[:][1:]
- debug = False
- trace = False
- update_every = 1
- modules_to_run = list()
-
- v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
- if v:
- update_every = v
- opts.remove(v)
- if 'debug' in opts:
- debug = True
- opts.remove('debug')
- if 'trace' in opts:
- trace = True
- opts.remove('trace')
- if opts:
- modules_to_run = list(opts)
-
- return collections.namedtuple(
- 'CMD',
- [
- 'update_every',
- 'debug',
- 'trace',
- 'modules_to_run',
- ],
- )(
- update_every,
- debug,
- trace,
- modules_to_run,
- )
-
-
-def main():
- cmd = parse_cmd()
- logger = PythonDLogger()
-
- if cmd.debug:
- logger.logger.severity = 'DEBUG'
- if cmd.trace:
- logger.log_traceback = True
-
- logger.info('using python v{0}'.format(PY_VERSION[0]))
-
- unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
- if unknown_modules:
- logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
- safe_print('DISABLE')
- return
-
- plugin = Plugin(
- cmd.update_every,
- cmd.modules_to_run or AVAILABLE_MODULES,
- )
-
- HeartBeat(1).start()
-
- if not plugin.setup():
- safe_print('DISABLE')
- return
-
- plugin.run()
- logger.info('exiting from main...')
- plugin.shutdown()
-
-
-if __name__ == '__main__':
- main()