#!/usr/bin/env bash '''':; pybinary=$(which python3 || which python || which python2) filtered=() for arg in "$@" do case $arg in -p*) pybinary=${arg:2} shift 1 ;; *) filtered+=("$arg") ;; esac done if [ "$pybinary" = "" ] then echo 1>&2 "python.d ERROR: python is not installed on this system" echo "EXIT" exit 1 fi exec "$pybinary" "$0" "${filtered[@]}" # ''' # -*- coding: utf-8 -*- # Description: # Author: Pawel Krupa (paulfantom) # Author: Ilya Mashchenko (l2isbad) # SPDX-License-Identifier: GPL-3.0-or-later import collections import copy import gc import json import os import pprint import re import sys import threading import time import types try: from queue import Queue except ImportError: from Queue import Queue PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0) if PY_VERSION > (3, 1): from importlib.machinery import SourceFileLoader else: from imp import load_source as SourceFileLoader ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS' ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR' def add_pythond_packages(): pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__)) pythond = os.path.abspath(pluginsd + '/../python.d') packages = os.path.join(pythond, 'python_modules') sys.path.append(packages) add_pythond_packages() from bases.collection import safe_print from bases.loggers import PythonDLogger from bases.loaders import load_config from third_party import filelock try: from collections import OrderedDict except ImportError: from third_party.ordereddict import OrderedDict def dirs(): var_lib = os.getenv( ENV_NETDATA_LIB_DIR, '@varlibdir_POST@', ) plugin_user_config = os.getenv( ENV_NETDATA_USER_CONFIG_DIR, '@configdir_POST@', ) plugin_stock_config = os.getenv( ENV_NETDATA_STOCK_CONFIG_DIR, '@libconfigdir_POST@', ) pluginsd = os.getenv( ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__), ) locks = os.getenv( ENV_NETDATA_LOCK_DIR, os.path.join('@varlibdir_POST@', 'lock') ) modules_user_config = os.path.join(plugin_user_config, 'python.d') modules_stock_config = os.path.join(plugin_stock_config, 'python.d') modules = os.path.abspath(pluginsd + '/../python.d') user_modules = [os.path.join(p, 'python.d') for p in os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if p] Dirs = collections.namedtuple( 'Dirs', [ 'plugin_user_config', 'plugin_stock_config', 'modules_user_config', 'modules_stock_config', 'modules', 'user_modules', 'var_lib', 'locks', ] ) return Dirs( plugin_user_config, plugin_stock_config, modules_user_config, modules_stock_config, modules, user_modules, var_lib, locks, ) DIRS = dirs() IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty() MODULE_SUFFIX = '.chart.py' def find_available_modules(*directories): AvailableModule = collections.namedtuple( 'AvailableModule', [ 'filepath', 'name', ] ) available = list() for d in directories: try: if not os.path.isdir(d): continue files = sorted(os.listdir(d)) except OSError: continue modules = [m for m in files if m.endswith(MODULE_SUFFIX)] available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules]) return available def available_modules(): obsolete = ( 'apache_cache', # replaced by web_log 'cpuidle', # rewritten in C 'cpufreq', # rewritten in C 'gunicorn_log', # replaced by web_log 'linux_power_supply', # rewritten in C 'nginx_log', # replaced by web_log 'mdstat', # rewritten in C 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624 'unbound', # rewritten in Go ) stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete] user = find_available_modules(*DIRS.user_modules) available, seen = list(), set() for m in user + stock: if m.name in seen: continue seen.add(m.name) available.append(m) return available AVAILABLE_MODULES = available_modules() JOB_BASE_CONF = { 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)), 'priority': 60000, 'autodetection_retry': 0, 'chart_cleanup': 10, 'penalty': True, 'name': str(), } PLUGIN_BASE_CONF = { 'enabled': True, 'default_run': True, 'gc_run': True, 'gc_interval': 300, } def multi_path_find(name, *paths): for path in paths: abs_name = os.path.join(path, name) if os.path.isfile(abs_name): return abs_name return str() def load_module(name, filepath): module = SourceFileLoader('pythond_' + name, filepath) if isinstance(module, types.ModuleType): return module return module.load_module() class ModuleConfig: def __init__(self, name, config=None): self.name = name self.config = config or OrderedDict() self.is_stock = False def load(self, abs_path): if not IS_ATTY: self.is_stock = abs_path.startswith(DIRS.modules_stock_config) self.config.update(load_config(abs_path) or dict()) def defaults(self): keys = ( 'update_every', 'priority', 'autodetection_retry', 'chart_cleanup', 'penalty', ) return dict((k, self.config[k]) for k in keys if k in self.config) def create_job(self, job_name, job_config=None): job_config = job_config or dict() config = OrderedDict() config.update(job_config) config['job_name'] = job_name config['__is_stock'] = self.is_stock for k, v in self.defaults().items(): config.setdefault(k, v) return config def job_names(self): return [v for v in self.config if isinstance(self.config.get(v), dict)] def single_job(self): return [self.create_job(self.name, self.config)] def multi_job(self): return [self.create_job(n, self.config[n]) for n in self.job_names()] def create_jobs(self): return self.multi_job() or self.single_job() class JobsConfigsBuilder: def __init__(self, config_dirs): self.config_dirs = config_dirs self.log = PythonDLogger() self.job_defaults = None self.module_defaults = None self.min_update_every = None def load_module_config(self, module_name): name = '{0}.conf'.format(module_name) self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs)) config = ModuleConfig(module_name) abs_path = multi_path_find(name, *self.config_dirs) if not abs_path: self.log.warning("[{0}] '{1}' was not found".format(module_name, name)) return config self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path)) try: config.load(abs_path) except Exception as error: self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error))) return None self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path)) return config @staticmethod def apply_defaults(jobs, defaults): if defaults is None: return for k, v in defaults.items(): for job in jobs: job.setdefault(k, v) def set_min_update_every(self, jobs, min_update_every): if min_update_every is None: return for job in jobs: if 'update_every' in job and job['update_every'] < self.min_update_every: job['update_every'] = self.min_update_every def build(self, module_name): config = self.load_module_config(module_name) if config is None: return None configs = config.create_jobs() if not config.is_stock: self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs))) self.apply_defaults(configs, self.module_defaults) self.apply_defaults(configs, self.job_defaults) self.set_min_update_every(configs, self.min_update_every) return configs JOB_STATUS_ACTIVE = 'active' JOB_STATUS_RECOVERING = 'recovering' JOB_STATUS_DROPPED = 'dropped' JOB_STATUS_INIT = 'initial' class Job(threading.Thread): inf = -1 def __init__(self, service, module_name, config): threading.Thread.__init__(self) self.daemon = True self.service = service self.module_name = module_name self.config = config self.real_name = config['job_name'] self.actual_name = config['override_name'] or self.real_name self.autodetection_retry = config['autodetection_retry'] self.checks = self.inf self.job = None self.is_stock = config.get('__is_stock', False) self.status = JOB_STATUS_INIT def is_inited(self): return self.job is not None def init(self): self.job = self.service(configuration=copy.deepcopy(self.config)) def full_name(self): return self.job.name def check(self): if self.is_stock: self.job.logger.mute() ok = self.job.check() self.job.logger.unmute() self.checks -= self.checks != self.inf and not ok return ok def create(self): self.job.create() def need_to_recheck(self): return self.autodetection_retry != 0 and self.checks != 0 def run(self): self.job.run() class ModuleSrc: def __init__(self, m): self.name = m.name self.filepath = m.filepath self.src = None def load(self): self.src = load_module(self.name, self.filepath) def get(self, key): return getattr(self.src, key, None) def service(self): return self.get('Service') def defaults(self): keys = ( 'update_every', 'priority', 'autodetection_retry', 'chart_cleanup', 'penalty', ) return dict((k, self.get(k)) for k in keys if self.get(k) is not None) def is_disabled_by_default(self): return bool(self.get('disabled_by_default')) class JobsStatuses: def __init__(self): self.items = OrderedDict() def dump(self): return json.dumps(self.items, indent=2) def get(self, module_name, job_name): if module_name not in self.items: return None return self.items[module_name].get(job_name) def has(self, module_name, job_name): return self.get(module_name, job_name) is not None def from_file(self, path): with open(path) as f: data = json.load(f) return self.from_json(data) @staticmethod def from_json(items): if not isinstance(items, dict): raise Exception('items obj has wrong type : {0}'.format(type(items))) if not items: return JobsStatuses() v = OrderedDict() for mod_name in sorted(items): if not items[mod_name]: continue v[mod_name] = OrderedDict() for job_name in sorted(items[mod_name]): v[mod_name][job_name] = items[mod_name][job_name] rv = JobsStatuses() rv.items = v return rv @staticmethod def from_jobs(jobs): v = OrderedDict() for job in jobs: status = job.status if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING): continue if job.module_name not in v: v[job.module_name] = OrderedDict() v[job.module_name][job.real_name] = status rv = JobsStatuses() rv.items = v return rv class StdoutSaver: @staticmethod def save(dump): print(dump) class CachedFileSaver: def __init__(self, path): self.last_save_success = False self.last_saved_dump = str() self.path = path def save(self, dump): if self.last_save_success and self.last_saved_dump == dump: return try: with open(self.path, 'w') as out: out.write(dump) except Exception: self.last_save_success = False raise self.last_saved_dump = dump self.last_save_success = True class PluginConfig(dict): def __init__(self, *args): dict.__init__(self, *args) def is_module_explicitly_enabled(self, module_name): return self._is_module_enabled(module_name, True) def is_module_enabled(self, module_name): return self._is_module_enabled(module_name, False) def _is_module_enabled(self, module_name, explicit): if module_name in self: return self[module_name] if explicit: return False return self['default_run'] class FileLockRegistry: def __init__(self, path): self.path = path self.locks = dict() @staticmethod def rename(name): # go version name is 'docker' if name.startswith("dockerd"): name = "docker" + name[7:] return name def register(self, name): name = self.rename(name) if name in self.locks: return file = os.path.join(self.path, '{0}.collector.lock'.format(name)) lock = filelock.FileLock(file) lock.acquire(timeout=0) self.locks[name] = lock def unregister(self, name): name = self.rename(name) if name not in self.locks: return lock = self.locks[name] lock.release() del self.locks[name] class DummyRegistry: def register(self, name): pass def unregister(self, name): pass class Plugin: config_name = 'python.d.conf' jobs_status_dump_name = 'pythond-jobs-statuses.json' def __init__(self, modules_to_run, min_update_every, registry): self.modules_to_run = modules_to_run self.min_update_every = min_update_every self.config = PluginConfig(PLUGIN_BASE_CONF) self.log = PythonDLogger() self.registry = registry self.started_jobs = collections.defaultdict(dict) self.jobs = list() self.saver = None self.runs = 0 def load_config_file(self, filepath, expected): self.log.debug("looking for '{0}'".format(filepath)) if not os.path.isfile(filepath): log = self.log.info if not expected else self.log.error log("'{0}' was not found".format(filepath)) return dict() try: config = load_config(filepath) except Exception as error: self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error))) return dict() self.log.debug("'{0}' is loaded".format(filepath)) return config def load_config(self): user_config = self.load_config_file( filepath=os.path.join(DIRS.plugin_user_config, self.config_name), expected=False, ) stock_config = self.load_config_file( filepath=os.path.join(DIRS.plugin_stock_config, self.config_name), expected=True, ) self.config.update(stock_config) self.config.update(user_config) def load_job_statuses(self): self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib)) abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib) if not abs_path: self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name)) return self.log.debug("loading '{0}'".format(abs_path)) try: statuses = JobsStatuses().from_file(abs_path) except Exception as error: self.log.error("'{0}' invalid JSON format: {1}".format( abs_path, ' '.join([v.strip() for v in str(error).split('\n')]))) return None self.log.debug("'{0}' is loaded".format(abs_path)) return statuses def create_jobs(self, job_statuses=None): paths = [ DIRS.modules_user_config, DIRS.modules_stock_config, ] builder = JobsConfigsBuilder(paths) builder.job_defaults = JOB_BASE_CONF builder.min_update_every = self.min_update_every jobs = list() for m in self.modules_to_run: if not self.config.is_module_enabled(m.name): self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name)) continue src = ModuleSrc(m) try: src.load() except Exception as error: self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error))) continue self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath)) if not (src.service() and callable(src.service())): self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name)) continue if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name): self.log.info("[{0}] is disabled by default, skipping it".format(m.name)) continue builder.module_defaults = src.defaults() configs = builder.build(m.name) if not configs: self.log.info("[{0}] has no job configs, skipping it".format(m.name)) continue for config in configs: config['job_name'] = re.sub(r'\s+', '_', config['job_name']) config['override_name'] = re.sub(r'\s+', '_', config.pop('name')) job = Job(src.service(), m.name, config) was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name) if was_previously_active and job.autodetection_retry == 0: self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format( job.module_name, job.real_name)) job.checks = 11 job.autodetection_retry = 30 jobs.append(job) return jobs def setup(self): self.load_config() if not self.config['enabled']: self.log.info('disabled in the configuration file') return False statuses = self.load_job_statuses() self.jobs = self.create_jobs(statuses) if not self.jobs: self.log.info('no jobs to run') return False if not IS_ATTY: abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name) self.saver = CachedFileSaver(abs_path) return True def start_jobs(self, *jobs): for job in jobs: if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING): continue if job.actual_name in self.started_jobs[job.module_name]: self.log.info('{0}[{1}] : already served by another job, skipping it'.format( job.module_name, job.real_name)) job.status = JOB_STATUS_DROPPED continue if not job.is_inited(): try: job.init() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue try: ok = job.check() except Exception as error: if not job.is_stock: self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue if not ok: if not job.is_stock: self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name)) job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED continue self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) try: self.registry.register(job.full_name()) except filelock.Timeout as error: self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format( job.module_name, job.real_name, error)) job.status = JOB_STATUS_DROPPED continue except Exception as error: self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format( job.module_name, job.real_name, error)) job.status = JOB_STATUS_DROPPED continue try: job.create() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED try: self.registry.unregister(job.full_name()) except Exception as error: self.log.warning('{0}[{1}] : deregistration failed: {2}'.format( job.module_name, job.real_name, error)) continue self.started_jobs[job.module_name] = job.actual_name job.status = JOB_STATUS_ACTIVE job.start() @staticmethod def keep_alive(): if not IS_ATTY: safe_print('\n') def garbage_collection(self): if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0: v = gc.collect() self.log.debug('GC collection run result: {0}'.format(v)) def restart_recovering_jobs(self): for job in self.jobs: if job.status != JOB_STATUS_RECOVERING: continue if self.runs % job.autodetection_retry != 0: continue self.start_jobs(job) def cleanup_jobs(self): self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED] def have_alive_jobs(self): return next( (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)), False, ) def save_job_statuses(self): if self.saver is None: return if self.runs % 10 != 0: return dump = JobsStatuses().from_jobs(self.jobs).dump() try: self.saver.save(dump) except Exception as error: self.log.error("error on saving jobs statuses dump : {0}".format(repr(error))) def serve_once(self): if not self.have_alive_jobs(): self.log.info('no jobs to serve') return False time.sleep(1) self.runs += 1 self.keep_alive() self.garbage_collection() self.cleanup_jobs() self.restart_recovering_jobs() self.save_job_statuses() return True def serve(self): while self.serve_once(): pass def run(self): self.start_jobs(*self.jobs) self.serve() def parse_command_line(): opts = sys.argv[:][1:] debug = False trace = False nolock = False update_every = 1 modules_to_run = list() def find_first_positive_int(values): return next((v for v in values if v.isdigit() and int(v) >= 1), None) u = find_first_positive_int(opts) if u is not None: update_every = int(u) opts.remove(u) if 'debug' in opts: debug = True opts.remove('debug') if 'trace' in opts: trace = True opts.remove('trace') if 'nolock' in opts: nolock = True opts.remove('nolock') if opts: modules_to_run = list(opts) cmd = collections.namedtuple( 'CMD', [ 'update_every', 'debug', 'trace', 'nolock', 'modules_to_run', ]) return cmd( update_every, debug, trace, nolock, modules_to_run, ) def guess_module(modules, *names): def guess(n): found = None for i, _ in enumerate(n): cur = [x for x in modules if x.startswith(name[:i + 1])] if not cur: return found found = cur return found guessed = list() for name in names: name = name.lower() m = guess(name) if m: guessed.extend(m) return sorted(set(guessed)) def disable(): if not IS_ATTY: safe_print('DISABLE') exit(0) def get_modules_to_run(cmd): if not cmd.modules_to_run: return AVAILABLE_MODULES modules_to_run, seen = list(), set() for m in AVAILABLE_MODULES: if m.name not in cmd.modules_to_run or m.name in seen: continue seen.add(m.name) modules_to_run.append(m) return modules_to_run def main(): cmd = parse_command_line() log = PythonDLogger() level = os.getenv('NETDATA_LOG_LEVEL') or str() level = level.lower() if level == 'debug': log.logger.severity = 'DEBUG' elif level == 'info': log.logger.severity = 'INFO' elif level == 'warn' or level == 'warning' or level == 'notice': log.logger.severity = 'WARNING' elif level == 'err' or level == 'error': log.logger.severity = 'ERROR' elif level == 'emergency' or level == 'alert' or level == 'critical': log.logger.severity = 'DISABLE' if cmd.debug: log.logger.severity = 'DEBUG' if cmd.trace: log.log_traceback = True log.info('using python v{0}'.format(PY_VERSION[0])) if DIRS.locks and not cmd.nolock: registry = FileLockRegistry(DIRS.locks) else: registry = DummyRegistry() unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES]) unknown = set(cmd.modules_to_run) - unique_avail_module_names if unknown: log.error('unknown modules : {0}'.format(sorted(list(unknown)))) guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run) if guessed: log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1))) return p = Plugin( get_modules_to_run(cmd), cmd.update_every, registry, ) # cheap attempt to reduce chance of python.d job running before go.d # TODO: better implementation needed if not IS_ATTY: time.sleep(1.5) try: if not p.setup(): return p.run() except KeyboardInterrupt: pass log.info('exiting from main...') if __name__ == "__main__": main() disable()