diff options
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin.in | 852 |
1 files changed, 852 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in new file mode 100644 index 0000000..9d575d8 --- /dev/null +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -0,0 +1,852 @@ +#!/usr/bin/env bash +'''':; +pybinary=$(which python || which python3 || which python2) +filtered=() +for arg in "$@" +do + case $arg in + -p*) pybinary=${arg:2} + shift 1 ;; + *) filtered+=("$arg") ;; + esac +done +if [ "$pybinary" = "" ] +then + echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM" + exit 1 +fi +exec "$pybinary" "$0" "${filtered[@]}" # ''' + +# -*- coding: utf-8 -*- +# Description: +# Author: Pawel Krupa (paulfantom) +# Author: Ilya Mashchenko (l2isbad) +# SPDX-License-Identifier: GPL-3.0-or-later + +import collections +import copy +import gc +import json +import os +import pprint +import re +import sys +import time +import threading +import types + +try: + from queue import Queue +except ImportError: + from Queue import Queue + +PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0) + +if PY_VERSION > (3, 1): + from importlib.machinery import SourceFileLoader +else: + from imp import load_source as SourceFileLoader + +ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' +ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' +ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' +ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' +ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' +ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR' + + +def add_pythond_packages(): + pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__)) + pythond = os.path.abspath(pluginsd + '/../python.d') + packages = os.path.join(pythond, 'python_modules') + sys.path.append(packages) + + +add_pythond_packages() + +from bases.collection import safe_print +from bases.loggers import PythonDLogger +from bases.loaders import load_config +from third_party import filelock + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + + +def dirs(): + var_lib = os.getenv( + ENV_NETDATA_LIB_DIR, + '@varlibdir_POST@', + ) + plugin_user_config = os.getenv( + ENV_NETDATA_USER_CONFIG_DIR, + '@configdir_POST@', + ) + plugin_stock_config = os.getenv( + ENV_NETDATA_STOCK_CONFIG_DIR, + '@libconfigdir_POST@', + ) + pluginsd = os.getenv( + ENV_NETDATA_PLUGINS_DIR, + os.path.dirname(__file__), + ) + locks = os.getenv( + ENV_NETDATA_LOCK_DIR, + os.path.join('@varlibdir_POST@', 'lock') + ) + modules_user_config = os.path.join(plugin_user_config, 'python.d') + modules_stock_config = os.path.join(plugin_stock_config, 'python.d') + modules = os.path.abspath(pluginsd + '/../python.d') + + Dirs = collections.namedtuple( + 'Dirs', + [ + 'plugin_user_config', + 'plugin_stock_config', + 'modules_user_config', + 'modules_stock_config', + 'modules', + 'var_lib', + 'locks', + ] + ) + return Dirs( + plugin_user_config, + plugin_stock_config, + modules_user_config, + modules_stock_config, + modules, + var_lib, + locks, + ) + + +DIRS = dirs() + +IS_ATTY = sys.stdout.isatty() + +MODULE_SUFFIX = '.chart.py' + + +def available_modules(): + obsolete = ( + 'apache_cache', # replaced by web_log + 'cpuidle', # rewritten in C + 'cpufreq', # rewritten in C + 'gunicorn_log', # replaced by web_log + 'linux_power_supply', # rewritten in C + 'nginx_log', # replaced by web_log + 'mdstat', # rewritten in C + 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624 + 'unbound', # rewritten in Go + ) + + files = sorted(os.listdir(DIRS.modules)) + modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)] + avail = [m for m in modules if m not in obsolete] + return tuple(avail) + + +AVAILABLE_MODULES = available_modules() + +JOB_BASE_CONF = { + 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)), + 'priority': 60000, + 'autodetection_retry': 0, + 'chart_cleanup': 10, + 'penalty': True, + 'name': str(), +} + +PLUGIN_BASE_CONF = { + 'enabled': True, + 'default_run': True, + 'gc_run': True, + 'gc_interval': 300, +} + + +def multi_path_find(name, *paths): + for path in paths: + abs_name = os.path.join(path, name) + if os.path.isfile(abs_name): + return abs_name + return str() + + +def load_module(name): + abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) + module = SourceFileLoader('pythond_' + name, abs_path) + if isinstance(module, types.ModuleType): + return module + return module.load_module() + + +class ModuleConfig: + def __init__(self, name, config=None): + self.name = name + self.config = config or OrderedDict() + + def load(self, abs_path): + self.config.update(load_config(abs_path) or dict()) + + def defaults(self): + keys = ( + 'update_every', + 'priority', + 'autodetection_retry', + 'chart_cleanup', + 'penalty', + ) + return dict((k, self.config[k]) for k in keys if k in self.config) + + def create_job(self, job_name, job_config=None): + job_config = job_config or dict() + + config = OrderedDict() + config.update(job_config) + config['job_name'] = job_name + for k, v in self.defaults().items(): + config.setdefault(k, v) + + return config + + def job_names(self): + return [v for v in self.config if isinstance(self.config.get(v), dict)] + + def single_job(self): + return [self.create_job(self.name, self.config)] + + def multi_job(self): + return [self.create_job(n, self.config[n]) for n in self.job_names()] + + def create_jobs(self): + return self.multi_job() or self.single_job() + + +class JobsConfigsBuilder: + def __init__(self, config_dirs): + self.config_dirs = config_dirs + self.log = PythonDLogger() + self.job_defaults = None + self.module_defaults = None + self.min_update_every = None + + def load_module_config(self, module_name): + name = '{0}.conf'.format(module_name) + self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs)) + config = ModuleConfig(module_name) + + abs_path = multi_path_find(name, *self.config_dirs) + if not abs_path: + self.log.warning("[{0}] '{1}' was not found".format(module_name, name)) + return config + + self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path)) + try: + config.load(abs_path) + except Exception as error: + self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error))) + return None + + self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path)) + return config + + @staticmethod + def apply_defaults(jobs, defaults): + if defaults is None: + return + for k, v in defaults.items(): + for job in jobs: + job.setdefault(k, v) + + def set_min_update_every(self, jobs, min_update_every): + if min_update_every is None: + return + for job in jobs: + if 'update_every' in job and job['update_every'] < self.min_update_every: + job['update_every'] = self.min_update_every + + def build(self, module_name): + config = self.load_module_config(module_name) + if config is None: + return None + + configs = config.create_jobs() + self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs))) + + self.apply_defaults(configs, self.module_defaults) + self.apply_defaults(configs, self.job_defaults) + self.set_min_update_every(configs, self.min_update_every) + + return configs + + +JOB_STATUS_ACTIVE = 'active' +JOB_STATUS_RECOVERING = 'recovering' +JOB_STATUS_DROPPED = 'dropped' +JOB_STATUS_INIT = 'initial' + + +class Job(threading.Thread): + inf = -1 + + def __init__(self, service, module_name, config): + threading.Thread.__init__(self) + self.daemon = True + self.service = service + self.module_name = module_name + self.config = config + self.real_name = config['job_name'] + self.actual_name = config['override_name'] or self.real_name + self.autodetection_retry = config['autodetection_retry'] + self.checks = self.inf + self.job = None + self.status = JOB_STATUS_INIT + + def is_inited(self): + return self.job is not None + + def init(self): + self.job = self.service(configuration=copy.deepcopy(self.config)) + + def full_name(self): + return self.job.name + + def check(self): + ok = self.job.check() + self.checks -= self.checks != self.inf and not ok + return ok + + def create(self): + self.job.create() + + def need_to_recheck(self): + return self.autodetection_retry != 0 and self.checks != 0 + + def run(self): + self.job.run() + + +class ModuleSrc: + def __init__(self, name): + self.name = name + self.src = None + + def load(self): + self.src = load_module(self.name) + + def get(self, key): + return getattr(self.src, key, None) + + def service(self): + return self.get('Service') + + def defaults(self): + keys = ( + 'update_every', + 'priority', + 'autodetection_retry', + 'chart_cleanup', + 'penalty', + ) + return dict((k, self.get(k)) for k in keys if self.get(k) is not None) + + def is_disabled_by_default(self): + return bool(self.get('disabled_by_default')) + + +class JobsStatuses: + def __init__(self): + self.items = OrderedDict() + + def dump(self): + return json.dumps(self.items, indent=2) + + def get(self, module_name, job_name): + if module_name not in self.items: + return None + return self.items[module_name].get(job_name) + + def has(self, module_name, job_name): + return self.get(module_name, job_name) is not None + + def from_file(self, path): + with open(path) as f: + data = json.load(f) + return self.from_json(data) + + @staticmethod + def from_json(items): + if not isinstance(items, dict): + raise Exception('items obj has wrong type : {0}'.format(type(items))) + if not items: + return JobsStatuses() + + v = OrderedDict() + for mod_name in sorted(items): + if not items[mod_name]: + continue + v[mod_name] = OrderedDict() + for job_name in sorted(items[mod_name]): + v[mod_name][job_name] = items[mod_name][job_name] + + rv = JobsStatuses() + rv.items = v + return rv + + @staticmethod + def from_jobs(jobs): + v = OrderedDict() + for job in jobs: + status = job.status + if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING): + continue + if job.module_name not in v: + v[job.module_name] = OrderedDict() + v[job.module_name][job.real_name] = status + + rv = JobsStatuses() + rv.items = v + return rv + + +class StdoutSaver: + @staticmethod + def save(dump): + print(dump) + + +class CachedFileSaver: + def __init__(self, path): + self.last_save_success = False + self.last_saved_dump = str() + self.path = path + + def save(self, dump): + if self.last_save_success and self.last_saved_dump == dump: + return + try: + with open(self.path, 'w') as out: + out.write(dump) + except Exception: + self.last_save_success = False + raise + self.last_saved_dump = dump + self.last_save_success = True + + +class PluginConfig(dict): + def __init__(self, *args): + dict.__init__(self, *args) + + def is_module_explicitly_enabled(self, module_name): + return self._is_module_enabled(module_name, True) + + def is_module_enabled(self, module_name): + return self._is_module_enabled(module_name, False) + + def _is_module_enabled(self, module_name, explicit): + if module_name in self: + return self[module_name] + if explicit: + return False + return self['default_run'] + + +class FileLockRegistry: + def __init__(self, path): + self.path = path + self.locks = dict() + + def register(self, name): + if name in self.locks: + return + file = os.path.join(self.path, '{0}.collector.lock'.format(name)) + lock = filelock.FileLock(file) + lock.acquire(timeout=0) + self.locks[name] = lock + + def unregister(self, name): + if name not in self.locks: + return + lock = self.locks[name] + lock.release() + del self.locks[name] + + +class DummyRegistry: + def register(self, name): + pass + + def unregister(self, name): + pass + + +class Plugin: + config_name = 'python.d.conf' + jobs_status_dump_name = 'pythond-jobs-statuses.json' + + def __init__(self, modules_to_run, min_update_every, registry): + self.modules_to_run = modules_to_run + self.min_update_every = min_update_every + self.config = PluginConfig(PLUGIN_BASE_CONF) + self.log = PythonDLogger() + self.registry = registry + self.started_jobs = collections.defaultdict(dict) + self.jobs = list() + self.saver = None + self.runs = 0 + + def load_config(self): + paths = [ + DIRS.plugin_user_config, + DIRS.plugin_stock_config, + ] + self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths)) + abs_path = multi_path_find(self.config_name, *paths) + if not abs_path: + self.log.warning("'{0}' was not found, using defaults".format(self.config_name)) + return True + + self.log.debug("loading '{0}'".format(abs_path)) + try: + config = load_config(abs_path) + except Exception as error: + self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error))) + return False + + self.log.debug("'{0}' is loaded".format(abs_path)) + self.config.update(config) + return True + + def load_job_statuses(self): + self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib)) + abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib) + if not abs_path: + self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name)) + return + + self.log.debug("loading '{0}'".format(abs_path)) + try: + statuses = JobsStatuses().from_file(abs_path) + except Exception as error: + self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error))) + return None + self.log.debug("'{0}' is loaded".format(abs_path)) + return statuses + + def create_jobs(self, job_statuses=None): + paths = [ + DIRS.modules_user_config, + DIRS.modules_stock_config, + ] + + builder = JobsConfigsBuilder(paths) + builder.job_defaults = JOB_BASE_CONF + builder.min_update_every = self.min_update_every + + jobs = list() + for mod_name in self.modules_to_run: + if not self.config.is_module_enabled(mod_name): + self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name)) + continue + + src = ModuleSrc(mod_name) + try: + src.load() + except Exception as error: + self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error))) + continue + + if not (src.service() and callable(src.service())): + self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name)) + continue + + if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name): + self.log.info("[{0}] is disabled by default, skipping it".format(mod_name)) + continue + + builder.module_defaults = src.defaults() + configs = builder.build(mod_name) + if not configs: + self.log.info("[{0}] has no job configs, skipping it".format(mod_name)) + continue + + for config in configs: + config['job_name'] = re.sub(r'\s+', '_', config['job_name']) + config['override_name'] = re.sub(r'\s+', '_', config.pop('name')) + + job = Job(src.service(), mod_name, config) + + was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name) + if was_previously_active and job.autodetection_retry == 0: + self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format( + job.module_name, job.real_name)) + job.checks = 11 + job.autodetection_retry = 30 + + jobs.append(job) + + return jobs + + def setup(self): + if not self.load_config(): + return False + + if not self.config['enabled']: + self.log.info('disabled in the configuration file') + return False + + statuses = self.load_job_statuses() + + self.jobs = self.create_jobs(statuses) + if not self.jobs: + self.log.info('no jobs to run') + return False + + if not IS_ATTY: + abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name) + self.saver = CachedFileSaver(abs_path) + return True + + def start_jobs(self, *jobs): + for job in jobs: + if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING): + continue + + if job.actual_name in self.started_jobs[job.module_name]: + self.log.info('{0}[{1}] : already served by another job, skipping it'.format( + job.module_name, job.real_name)) + job.status = JOB_STATUS_DROPPED + continue + + if not job.is_inited(): + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) + job.status = JOB_STATUS_DROPPED + continue + + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) + job.status = JOB_STATUS_DROPPED + continue + if not ok: + self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name)) + job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED + continue + self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) + + try: + self.registry.register(job.full_name()) + except filelock.Timeout as error: + self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + except Exception as error: + self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + + try: + job.create() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( + job.module_name, job.real_name, repr(error))) + job.status = JOB_STATUS_DROPPED + try: + self.registry.unregister(job.full_name()) + except Exception as error: + self.log.warning('{0}[{1}] : deregistration failed: {2}'.format( + job.module_name, job.real_name, error)) + continue + + self.started_jobs[job.module_name] = job.actual_name + job.status = JOB_STATUS_ACTIVE + job.start() + + @staticmethod + def keep_alive(): + if not IS_ATTY: + safe_print('\n') + + def garbage_collection(self): + if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0: + v = gc.collect() + self.log.debug('GC collection run result: {0}'.format(v)) + + def restart_recovering_jobs(self): + for job in self.jobs: + if job.status != JOB_STATUS_RECOVERING: + continue + if self.runs % job.autodetection_retry != 0: + continue + self.start_jobs(job) + + def cleanup_jobs(self): + self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED] + + def have_alive_jobs(self): + return next( + (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)), + False, + ) + + def save_job_statuses(self): + if self.saver is None: + return + if self.runs % 10 != 0: + return + dump = JobsStatuses().from_jobs(self.jobs).dump() + try: + self.saver.save(dump) + except Exception as error: + self.log.error("error on saving jobs statuses dump : {0}".format(repr(error))) + + def serve_once(self): + if not self.have_alive_jobs(): + self.log.info('no jobs to serve') + return False + + time.sleep(1) + self.runs += 1 + + self.keep_alive() + self.garbage_collection() + self.cleanup_jobs() + self.restart_recovering_jobs() + self.save_job_statuses() + return True + + def serve(self): + while self.serve_once(): + pass + + def run(self): + self.start_jobs(*self.jobs) + self.serve() + + +def parse_command_line(): + opts = sys.argv[:][1:] + + debug = False + trace = False + nolock = False + update_every = 1 + modules_to_run = list() + + def find_first_positive_int(values): + return next((v for v in values if v.isdigit() and int(v) >= 1), None) + + u = find_first_positive_int(opts) + if u is not None: + update_every = int(u) + opts.remove(u) + if 'debug' in opts: + debug = True + opts.remove('debug') + if 'trace' in opts: + trace = True + opts.remove('trace') + if 'nolock' in opts: + nolock = True + opts.remove('nolock') + if opts: + modules_to_run = list(opts) + + cmd = collections.namedtuple( + 'CMD', + [ + 'update_every', + 'debug', + 'trace', + 'nolock', + 'modules_to_run', + ]) + return cmd( + update_every, + debug, + trace, + nolock, + modules_to_run, + ) + + +def guess_module(modules, *names): + def guess(n): + found = None + for i, _ in enumerate(n): + cur = [x for x in modules if x.startswith(name[:i + 1])] + if not cur: + return found + found = cur + return found + + guessed = list() + for name in names: + name = name.lower() + m = guess(name) + if m: + guessed.extend(m) + return sorted(set(guessed)) + + +def disable(): + if not IS_ATTY: + safe_print('DISABLE') + exit(0) + + +def main(): + cmd = parse_command_line() + log = PythonDLogger() + + if cmd.debug: + log.logger.severity = 'DEBUG' + if cmd.trace: + log.log_traceback = True + + log.info('using python v{0}'.format(PY_VERSION[0])) + + unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) + if unknown: + log.error('unknown modules : {0}'.format(sorted(list(unknown)))) + guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run) + if guessed: + log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1))) + return + + if DIRS.locks and not cmd.nolock: + registry = FileLockRegistry(DIRS.locks) + else: + registry = DummyRegistry() + + p = Plugin( + cmd.modules_to_run or AVAILABLE_MODULES, + cmd.update_every, + registry, + ) + + try: + if not p.setup(): + return + p.run() + except KeyboardInterrupt: + pass + log.info('exiting from main...') + + +if __name__ == "__main__": + main() + disable() |