diff options
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin | 784 |
1 files changed, 0 insertions, 784 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin b/collectors/python.d.plugin/python.d.plugin deleted file mode 100644 index 7b27acdd5..000000000 --- a/collectors/python.d.plugin/python.d.plugin +++ /dev/null @@ -1,784 +0,0 @@ -#!/usr/bin/env bash -'''':; -pybinary=$(which python || which python3 || which python2) -filtered=() -for arg in "$@" -do - case $arg in - -p*) pybinary=${arg:2} - shift 1 ;; - *) filtered+=("$arg") ;; - esac -done -if [ "$pybinary" = "" ] -then - echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM" - exit 1 -fi -exec "$pybinary" "$0" "${filtered[@]}" # ''' - -# -*- coding: utf-8 -*- -# Description: -# Author: Pawel Krupa (paulfantom) -# Author: Ilya Mashchenko (l2isbad) -# SPDX-License-Identifier: GPL-3.0-or-later - -import collections -import copy -import gc -import json -import os -import pprint -import re -import sys -import time -import threading -import types - -try: - from queue import Queue -except ImportError: - from Queue import Queue - -PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0) - - -if PY_VERSION > (3, 1): - from importlib.machinery import SourceFileLoader -else: - from imp import load_source as SourceFileLoader - - -ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' -ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' -ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' -ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' -ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' - - -def add_pythond_packages(): - pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__)) - pythond = os.path.abspath(pluginsd + '/../python.d') - packages = os.path.join(pythond, 'python_modules') - sys.path.append(packages) - - -add_pythond_packages() - - -from bases.collection import safe_print -from bases.loggers import PythonDLogger -from bases.loaders import load_config - -try: - from collections import OrderedDict -except ImportError: - from third_party.ordereddict import OrderedDict - - -def dirs(): - var_lib = os.getenv( - ENV_NETDATA_LIB_DIR, - '/var/lib/netdata', - ) - plugin_user_config = os.getenv( - ENV_NETDATA_USER_CONFIG_DIR, - '/etc/netdata', - ) - plugin_stock_config = os.getenv( - ENV_NETDATA_STOCK_CONFIG_DIR, - '/usr/lib/netdata/conf.d', - ) - pluginsd = os.getenv( - ENV_NETDATA_PLUGINS_DIR, - os.path.dirname(__file__), - ) - modules_user_config = os.path.join(plugin_user_config, 'python.d') - modules_stock_config = os.path.join(plugin_stock_config, 'python.d') - modules = os.path.abspath(pluginsd + '/../python.d') - - Dirs = collections.namedtuple( - 'Dirs', - [ - 'plugin_user_config', - 'plugin_stock_config', - 'modules_user_config', - 'modules_stock_config', - 'modules', - 'var_lib', - ] - ) - return Dirs( - plugin_user_config, - plugin_stock_config, - modules_user_config, - modules_stock_config, - modules, - var_lib, - ) - - -DIRS = dirs() - -IS_ATTY = sys.stdout.isatty() - -MODULE_SUFFIX = '.chart.py' - - -def available_modules(): - obsolete = ( - 'apache_cache', # replaced by web_log - 'cpuidle', # rewritten in C - 'cpufreq', # rewritten in C - 'gunicorn_log', # replaced by web_log - 'linux_power_supply', # rewritten in C - 'nginx_log', # replaced by web_log - 'mdstat', # rewritten in C - 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624 - 'unbound', # rewritten in Go - ) - - files = sorted(os.listdir(DIRS.modules)) - modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)] - avail = [m for m in modules if m not in obsolete] - return tuple(avail) - - -AVAILABLE_MODULES = available_modules() - -JOB_BASE_CONF = { - 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)), - 'priority': 60000, - 'autodetection_retry': 0, - 'chart_cleanup': 10, - 'penalty': True, - 'name': str(), -} - -PLUGIN_BASE_CONF = { - 'enabled': True, - 'default_run': True, - 'gc_run': True, - 'gc_interval': 300, -} - - -def multi_path_find(name, *paths): - for path in paths: - abs_name = os.path.join(path, name) - if os.path.isfile(abs_name): - return abs_name - return str() - - -def load_module(name): - abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) - module = SourceFileLoader(name, abs_path) - if isinstance(module, types.ModuleType): - return module - return module.load_module() - - -class ModuleConfig: - def __init__(self, name, config=None): - self.name = name - self.config = config or OrderedDict() - - def load(self, abs_path): - self.config.update(load_config(abs_path) or dict()) - - def defaults(self): - keys = ( - 'update_every', - 'priority', - 'autodetection_retry', - 'chart_cleanup', - 'penalty', - ) - return dict((k, self.config[k]) for k in keys if k in self.config) - - def create_job(self, job_name, job_config=None): - job_config = job_config or dict() - - config = OrderedDict() - config.update(job_config) - config['job_name'] = job_name - for k, v in self.defaults().items(): - config.setdefault(k, v) - - return config - - def job_names(self): - return [v for v in self.config if isinstance(self.config.get(v), dict)] - - def single_job(self): - return [self.create_job(self.name, self.config)] - - def multi_job(self): - return [self.create_job(n, self.config[n]) for n in self.job_names()] - - def create_jobs(self): - return self.multi_job() or self.single_job() - - -class JobsConfigsBuilder: - def __init__(self, config_dirs): - self.config_dirs = config_dirs - self.log = PythonDLogger() - self.job_defaults = None - self.module_defaults = None - self.min_update_every = None - - def load_module_config(self, module_name): - name = '{0}.conf'.format(module_name) - self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs)) - config = ModuleConfig(module_name) - - abs_path = multi_path_find(name, *self.config_dirs) - if not abs_path: - self.log.warning("[{0}] '{1}' was not found".format(module_name, name)) - return config - - self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path)) - try: - config.load(abs_path) - except Exception as error: - self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error))) - return None - - self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path)) - return config - - @staticmethod - def apply_defaults(jobs, defaults): - if defaults is None: - return - for k, v in defaults.items(): - for job in jobs: - job.setdefault(k, v) - - def set_min_update_every(self, jobs, min_update_every): - if min_update_every is None: - return - for job in jobs: - if 'update_every' in job and job['update_every'] < self.min_update_every: - job['update_every'] = self.min_update_every - - def build(self, module_name): - config = self.load_module_config(module_name) - if config is None: - return None - - configs = config.create_jobs() - self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs))) - - self.apply_defaults(configs, self.module_defaults) - self.apply_defaults(configs, self.job_defaults) - self.set_min_update_every(configs, self.min_update_every) - - return configs - - -JOB_STATUS_ACTIVE = 'active' -JOB_STATUS_RECOVERING = 'recovering' -JOB_STATUS_DROPPED = 'dropped' -JOB_STATUS_INIT = 'initial' - - -class Job(threading.Thread): - inf = -1 - - def __init__(self, service, module_name, config): - threading.Thread.__init__(self) - self.daemon = True - self.service = service - self.module_name = module_name - self.config = config - self.real_name = config['job_name'] - self.actual_name = config['override_name'] or self.real_name - self.autodetection_retry = config['autodetection_retry'] - self.checks = self.inf - self.job = None - self.status = JOB_STATUS_INIT - - def is_inited(self): - return self.job is not None - - def init(self): - self.job = self.service(configuration=copy.deepcopy(self.config)) - - def check(self): - ok = self.job.check() - self.checks -= self.checks != self.inf and not ok - return ok - - def create(self): - self.job.create() - - def need_to_recheck(self): - return self.autodetection_retry != 0 and self.checks != 0 - - def run(self): - self.job.run() - - -class ModuleSrc: - def __init__(self, name): - self.name = name - self.src = None - - def load(self): - self.src = load_module(self.name) - - def get(self, key): - return getattr(self.src, key, None) - - def service(self): - return self.get('Service') - - def defaults(self): - keys = ( - 'update_every', - 'priority', - 'autodetection_retry', - 'chart_cleanup', - 'penalty', - ) - return dict((k, self.get(k)) for k in keys if self.get(k) is not None) - - def is_disabled_by_default(self): - return bool(self.get('disabled_by_default')) - - -class JobsStatuses: - def __init__(self): - self.items = OrderedDict() - - def dump(self): - return json.dumps(self.items, indent=2) - - def get(self, module_name, job_name): - if module_name not in self.items: - return None - return self.items[module_name].get(job_name) - - def has(self, module_name, job_name): - return self.get(module_name, job_name) is not None - - def from_file(self, path): - with open(path) as f: - data = json.load(f) - return self.from_json(data) - - @staticmethod - def from_json(items): - if not isinstance(items, dict): - raise Exception('items obj has wrong type : {0}'.format(type(items))) - if not items: - return JobsStatuses() - - v = OrderedDict() - for mod_name in sorted(items): - if not items[mod_name]: - continue - v[mod_name] = OrderedDict() - for job_name in sorted(items[mod_name]): - v[mod_name][job_name] = items[mod_name][job_name] - - rv = JobsStatuses() - rv.items = v - return rv - - @staticmethod - def from_jobs(jobs): - v = OrderedDict() - for job in jobs: - status = job.status - if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING): - continue - if job.module_name not in v: - v[job.module_name] = OrderedDict() - v[job.module_name][job.real_name] = status - - rv = JobsStatuses() - rv.items = v - return rv - - -class StdoutSaver: - @staticmethod - def save(dump): - print(dump) - - -class CachedFileSaver: - def __init__(self, path): - self.last_save_success = False - self.last_saved_dump = str() - self.path = path - - def save(self, dump): - if self.last_save_success and self.last_saved_dump == dump: - return - try: - with open(self.path, 'w') as out: - out.write(dump) - except Exception: - self.last_save_success = False - raise - self.last_saved_dump = dump - self.last_save_success = True - - -class PluginConfig(dict): - def __init__(self, *args): - dict.__init__(self, *args) - - def is_module_explicitly_enabled(self, module_name): - return self._is_module_enabled(module_name, True) - - def is_module_enabled(self, module_name): - return self._is_module_enabled(module_name, False) - - def _is_module_enabled(self, module_name, explicit): - if module_name in self: - return self[module_name] - if explicit: - return False - return self['default_run'] - - -class Plugin: - config_name = 'python.d.conf' - jobs_status_dump_name = 'pythond-jobs-statuses.json' - - def __init__(self, modules_to_run, min_update_every): - self.modules_to_run = modules_to_run - self.min_update_every = min_update_every - self.config = PluginConfig(PLUGIN_BASE_CONF) - self.log = PythonDLogger() - self.started_jobs = collections.defaultdict(dict) - self.jobs = list() - self.saver = None - self.runs = 0 - - def load_config(self): - paths = [ - DIRS.plugin_user_config, - DIRS.plugin_stock_config, - ] - self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths)) - abs_path = multi_path_find(self.config_name, *paths) - if not abs_path: - self.log.warning("'{0}' was not found, using defaults".format(self.config_name)) - return True - - self.log.debug("loading '{0}'".format(abs_path)) - try: - config = load_config(abs_path) - except Exception as error: - self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error))) - return False - - self.log.debug("'{0}' is loaded".format(abs_path)) - self.config.update(config) - return True - - def load_job_statuses(self): - self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib)) - abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib) - if not abs_path: - self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name)) - return - - self.log.debug("loading '{0}'".format(abs_path)) - try: - statuses = JobsStatuses().from_file(abs_path) - except Exception as error: - self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error))) - return None - self.log.debug("'{0}' is loaded".format(abs_path)) - return statuses - - def create_jobs(self, job_statuses=None): - paths = [ - DIRS.modules_user_config, - DIRS.modules_stock_config, - ] - - builder = JobsConfigsBuilder(paths) - builder.job_defaults = JOB_BASE_CONF - builder.min_update_every = self.min_update_every - - jobs = list() - for mod_name in self.modules_to_run: - if not self.config.is_module_enabled(mod_name): - self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name)) - continue - - src = ModuleSrc(mod_name) - try: - src.load() - except Exception as error: - self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error))) - continue - - if not (src.service() and callable(src.service())): - self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name)) - continue - - if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name): - self.log.info("[{0}] is disabled by default, skipping it".format(mod_name)) - continue - - builder.module_defaults = src.defaults() - configs = builder.build(mod_name) - if not configs: - self.log.info("[{0}] has no job configs, skipping it".format(mod_name)) - continue - - for config in configs: - config['job_name'] = re.sub(r'\s+', '_', config['job_name']) - config['override_name'] = re.sub(r'\s+', '_', config.pop('name')) - - job = Job(src.service(), mod_name, config) - - was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name) - if was_previously_active and job.autodetection_retry == 0: - self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format( - job.module_name, job.real_name)) - job.checks = 11 - job.autodetection_retry = 30 - - jobs.append(job) - - return jobs - - def setup(self): - if not self.load_config(): - return False - - if not self.config['enabled']: - self.log.info('disabled in the configuration file') - return False - - statuses = self.load_job_statuses() - - self.jobs = self.create_jobs(statuses) - if not self.jobs: - self.log.info('no jobs to run') - return False - - if not IS_ATTY: - abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name) - self.saver = CachedFileSaver(abs_path) - return True - - def start_jobs(self, *jobs): - for job in jobs: - if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING): - continue - - if job.actual_name in self.started_jobs[job.module_name]: - self.log.info('{0}[{1}] : already served by another job, skipping it'.format( - job.module_name, job.real_name)) - job.status = JOB_STATUS_DROPPED - continue - - if not job.is_inited(): - try: - job.init() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) - job.status = JOB_STATUS_DROPPED - continue - - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) - job.status = JOB_STATUS_DROPPED - continue - if not ok: - self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name)) - job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED - continue - self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) - - try: - job.create() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) - job.status = JOB_STATUS_DROPPED - continue - - self.started_jobs[job.module_name] = job.actual_name - job.status = JOB_STATUS_ACTIVE - job.start() - - @staticmethod - def keep_alive(): - if not IS_ATTY: - safe_print('\n') - - def garbage_collection(self): - if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0: - v = gc.collect() - self.log.debug('GC collection run result: {0}'.format(v)) - - def restart_recovering_jobs(self): - for job in self.jobs: - if job.status != JOB_STATUS_RECOVERING: - continue - if self.runs % job.autodetection_retry != 0: - continue - self.start_jobs(job) - - def cleanup_jobs(self): - self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED] - - def have_alive_jobs(self): - return next( - (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)), - False, - ) - - def save_job_statuses(self): - if self.saver is None: - return - if self.runs % 10 != 0: - return - dump = JobsStatuses().from_jobs(self.jobs).dump() - try: - self.saver.save(dump) - except Exception as error: - self.log.error("error on saving jobs statuses dump : {0}".format(repr(error))) - - def serve_once(self): - if not self.have_alive_jobs(): - self.log.info('no jobs to serve') - return False - - time.sleep(1) - self.runs += 1 - - self.keep_alive() - self.garbage_collection() - self.cleanup_jobs() - self.restart_recovering_jobs() - self.save_job_statuses() - return True - - def serve(self): - while self.serve_once(): - pass - - def run(self): - self.start_jobs(*self.jobs) - self.serve() - - -def parse_command_line(): - opts = sys.argv[:][1:] - - debug = False - trace = False - update_every = 1 - modules_to_run = list() - - def find_first_positive_int(values): - return next((v for v in values if v.isdigit() and int(v) >= 1), None) - - u = find_first_positive_int(opts) - if u is not None: - update_every = int(u) - opts.remove(u) - if 'debug' in opts: - debug = True - opts.remove('debug') - if 'trace' in opts: - trace = True - opts.remove('trace') - if opts: - modules_to_run = list(opts) - - cmd = collections.namedtuple( - 'CMD', - [ - 'update_every', - 'debug', - 'trace', - 'modules_to_run', - ]) - return cmd( - update_every, - debug, - trace, - modules_to_run - ) - - -def guess_module(modules, *names): - def guess(n): - found = None - for i, _ in enumerate(n): - cur = [x for x in modules if x.startswith(name[:i + 1])] - if not cur: - return found - found = cur - return found - - guessed = list() - for name in names: - name = name.lower() - m = guess(name) - if m: - guessed.extend(m) - return sorted(set(guessed)) - - -def disable(): - if not IS_ATTY: - safe_print('DISABLE') - exit(0) - - -def main(): - cmd = parse_command_line() - log = PythonDLogger() - - if cmd.debug: - log.logger.severity = 'DEBUG' - if cmd.trace: - log.log_traceback = True - - log.info('using python v{0}'.format(PY_VERSION[0])) - - unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) - if unknown: - log.error('unknown modules : {0}'.format(sorted(list(unknown)))) - guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run) - if guessed: - log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1))) - return - - p = Plugin( - cmd.modules_to_run or AVAILABLE_MODULES, - cmd.update_every, - ) - - try: - if not p.setup(): - return - p.run() - except KeyboardInterrupt: - pass - log.info('exiting from main...') - - -if __name__ == "__main__": - main() - disable() |