From 8f5d8f3de6cae180af37917ef978a4affc2cd464 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 13 Sep 2019 07:05:16 +0200 Subject: Adding upstream version 1.17.1. Signed-off-by: Daniel Baumann --- collectors/python.d.plugin/python.d.plugin.in | 1018 +++++++++++++------------ 1 file changed, 527 insertions(+), 491 deletions(-) (limited to 'collectors/python.d.plugin/python.d.plugin.in') diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in index f1429b6fa..5b8b50a67 100644 --- a/collectors/python.d.plugin/python.d.plugin.in +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -1,8 +1,5 @@ #!/usr/bin/env bash '''':; -if [[ "$OSTYPE" == "darwin"* ]]; then - export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES -fi exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' @@ -12,19 +9,25 @@ echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # Author: Ilya Mashchenko (l2isbad) # SPDX-License-Identifier: GPL-3.0-or-later - import collections import copy import gc -import multiprocessing +import json import os +import pprint import re import sys import time import threading import types -PY_VERSION = sys.version_info[:2] +try: + from queue import Queue +except ImportError: + from Queue import Queue + +PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0) + if PY_VERSION > (3, 1): from importlib.machinery import SourceFileLoader @@ -35,98 +38,101 @@ else: ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' +ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' +def add_pythond_packages(): + pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__)) + pythond = os.path.abspath(pluginsd + '/../python.d') + packages = os.path.join(pythond, 'python_modules') + sys.path.append(packages) + + +add_pythond_packages() + + +from bases.collection import safe_print +from bases.loggers import PythonDLogger +from bases.loaders import load_config + +try: + from collections import OrderedDict +except ImportError: + from third_party.ordereddict import OrderedDict + + def dirs(): - user_config = os.getenv( + var_lib = os.getenv( + ENV_NETDATA_LIB_DIR, + '@varlibdir_POST@', + ) + plugin_user_config = os.getenv( ENV_NETDATA_USER_CONFIG_DIR, '@configdir_POST@', ) - stock_config = os.getenv( + plugin_stock_config = os.getenv( ENV_NETDATA_STOCK_CONFIG_DIR, '@libconfigdir_POST@', ) - modules_user_config = os.path.join(user_config, 'python.d') - modules_stock_config = os.path.join(stock_config, 'python.d') - - modules = os.path.abspath( - os.getenv( - ENV_NETDATA_PLUGINS_DIR, - os.path.dirname(__file__), - ) + '/../python.d' + pluginsd = os.getenv( + ENV_NETDATA_PLUGINS_DIR, + os.path.dirname(__file__), ) - pythond_packages = os.path.join(modules, 'python_modules') + modules_user_config = os.path.join(plugin_user_config, 'python.d') + modules_stock_config = os.path.join(plugin_stock_config, 'python.d') + modules = os.path.abspath(pluginsd + '/../python.d') - return collections.namedtuple( + Dirs = collections.namedtuple( 'Dirs', [ - 'user_config', - 'stock_config', + 'plugin_user_config', + 'plugin_stock_config', 'modules_user_config', 'modules_stock_config', 'modules', - 'pythond_packages', + 'var_lib', ] - )( - user_config, - stock_config, + ) + return Dirs( + plugin_user_config, + plugin_stock_config, modules_user_config, modules_stock_config, modules, - pythond_packages, + var_lib, ) DIRS = dirs() -sys.path.append(DIRS.pythond_packages) - - -from bases.collection import safe_print -from bases.loggers import PythonDLogger -from bases.loaders import load_config - -try: - from collections import OrderedDict -except ImportError: - from third_party.ordereddict import OrderedDict - - -END_TASK_MARKER = None - IS_ATTY = sys.stdout.isatty() -PLUGIN_CONF_FILE = 'python.d.conf' - MODULE_SUFFIX = '.chart.py' -OBSOLETED_MODULES = ( - 'apache_cache', # replaced by web_log - 'cpuidle', # rewritten in C - 'cpufreq', # rewritten in C - 'gunicorn_log', # replaced by web_log - 'linux_power_supply', # rewritten in C - 'nginx_log', # replaced by web_log - 'mdstat', # rewritten in C - 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624 -) +def available_modules(): + obsolete = ( + 'apache_cache', # replaced by web_log + 'cpuidle', # rewritten in C + 'cpufreq', # rewritten in C + 'gunicorn_log', # replaced by web_log + 'linux_power_supply', # rewritten in C + 'nginx_log', # replaced by web_log + 'mdstat', # rewritten in C + 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624 + ) -AVAILABLE_MODULES = [ - m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules)) - if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES -] + files = sorted(os.listdir(DIRS.modules)) + modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)] + avail = [m for m in modules if m not in obsolete] + return tuple(avail) -PLUGIN_BASE_CONF = { - 'enabled': True, - 'default_run': True, - 'gc_run': True, - 'gc_interval': 300, -} + +AVAILABLE_MODULES = available_modules() JOB_BASE_CONF = { - 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1), + 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)), 'priority': 60000, 'autodetection_retry': 0, 'chart_cleanup': 10, @@ -134,23 +140,20 @@ JOB_BASE_CONF = { 'name': str(), } +PLUGIN_BASE_CONF = { + 'enabled': True, + 'default_run': True, + 'gc_run': True, + 'gc_interval': 300, +} -def heartbeat(): - if IS_ATTY: - return - safe_print('\n') - - -class HeartBeat(threading.Thread): - def __init__(self, every): - threading.Thread.__init__(self) - self.daemon = True - self.every = every - def run(self): - while True: - time.sleep(self.every) - heartbeat() +def multi_path_find(name, *paths): + for path in paths: + abs_name = os.path.join(path, name) + if os.path.isfile(abs_name): + return abs_name + return str() def load_module(name): @@ -161,265 +164,268 @@ def load_module(name): return module.load_module() -def multi_path_find(name, paths): - for path in paths: - abs_name = os.path.join(path, name) - if os.path.isfile(abs_name): - return abs_name - return '' - - -Task = collections.namedtuple( - 'Task', - [ - 'module_name', - 'explicitly_enabled', - ], -) - -Result = collections.namedtuple( - 'Result', - [ - 'module_name', - 'jobs_configs', - ], -) - - -class ModuleChecker(multiprocessing.Process): - def __init__( - self, - task_queue, - result_queue, - ): - multiprocessing.Process.__init__(self) - self.log = PythonDLogger() - self.log.job_name = 'checker' - self.task_queue = task_queue - self.result_queue = result_queue - - def run(self): - self.log.info('starting...') - HeartBeat(1).start() - while self.run_once(): - pass - self.log.info('terminating...') +class ModuleConfig: + def __init__(self, name, config=None): + self.name = name + self.config = config or OrderedDict() - def run_once(self): - task = self.task_queue.get() + def load(self, abs_path): + self.config.update(load_config(abs_path) or dict()) - if task is END_TASK_MARKER: - # TODO: find better solution, understand why heartbeat thread doesn't work - heartbeat() - self.task_queue.task_done() - self.result_queue.put(END_TASK_MARKER) - return False + def defaults(self): + keys = ( + 'update_every', + 'priority', + 'autodetection_retry', + 'chart_cleanup', + 'penalty', + ) + return dict((k, self.config[k]) for k in keys if k in self.config) - result = self.do_task(task) - if result: - self.result_queue.put(result) - self.task_queue.task_done() + def create_job(self, job_name, job_config=None): + job_config = job_config or dict() - return True + config = OrderedDict() + config.update(job_config) + config['job_name'] = job_name + for k, v in self.defaults().items(): + config.setdefault(k, v) - def do_task(self, task): - self.log.info("{0} : checking".format(task.module_name)) + return config - # LOAD SOURCE - module = Module(task.module_name) - try: - module.load_source() - except Exception as error: - self.log.warning("{0} : error on loading source : {1}, skipping module".format( - task.module_name, - error, - )) - return None - else: - self.log.info("{0} : source successfully loaded".format(task.module_name)) + def job_names(self): + return [v for v in self.config if isinstance(self.config.get(v), dict)] - if module.is_disabled_by_default() and not task.explicitly_enabled: - self.log.info("{0} : disabled by default".format(task.module_name)) - return None + def single_job(self): + return [self.create_job(self.name)] - # LOAD CONFIG - paths = [ - DIRS.modules_user_config, - DIRS.modules_stock_config, - ] + def multi_job(self): + return [self.create_job(n, self.config[n]) for n in self.job_names()] - conf_abs_path = multi_path_find( - name='{0}.conf'.format(task.module_name), - paths=paths, - ) + def create_jobs(self): + return self.multi_job() or self.single_job() - if conf_abs_path: - self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path)) - try: - module.load_config(conf_abs_path) - except Exception as error: - self.log.warning("{0} : error on loading config : {1}, skipping module".format( - task.module_name, error)) - return None - else: - self.log.info("{0} : config was not found in '{1}', using default 1 job config".format( - task.module_name, paths)) - - # CHECK JOBS - jobs = module.create_jobs() - self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs))) - - successful_jobs_configs = list() - for job in jobs: - if job.autodetection_retry() > 0: - successful_jobs_configs.append(job.config) - self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name)) - continue - try: - job.init() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format( - task.module_name, job.name, error)) - continue +class JobsConfigsBuilder: + def __init__(self, config_dirs): + self.config_dirs = config_dirs + self.log = PythonDLogger() + self.job_defaults = None + self.module_defaults = None + self.min_update_every = None - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - task.module_name, job.name, error)) - continue + def load_module_config(self, module_name): + name = '{0}.conf'.format(module_name) + self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs)) + config = ModuleConfig(module_name) - if not ok: - self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name)) - continue + abs_path = multi_path_find(name, *self.config_dirs) + if not abs_path: + self.log.warning("[{0}] '{1}' was not found".format(module_name, name)) + return config - self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name)) + self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path)) + try: + config.load(abs_path) + except Exception as error: + self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error))) + return None - job.config['autodetection_retry'] = job.config['update_every'] - successful_jobs_configs.append(job.config) + self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path)) + return config - if not successful_jobs_configs: - self.log.info("{0} : all jobs failed, skipping module".format(task.module_name)) - return None + @staticmethod + def apply_defaults(jobs, defaults): + if defaults is None: + return + for k, v in defaults.items(): + for job in jobs: + job.setdefault(k, v) - return Result(module.source.__name__, successful_jobs_configs) + def set_min_update_every(self, jobs, min_update_every): + if min_update_every is None: + return + for job in jobs: + if 'update_every' in job and job['update_every'] < self.min_update_every: + job['update_every'] = self.min_update_every + def build(self, module_name): + config = self.load_module_config(module_name) + if config is None: + return None -class JobConf(OrderedDict): - def __init__(self, *args): - OrderedDict.__init__(self, *args) + configs = config.create_jobs() + self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs))) - def set_defaults_from_module(self, module): - for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]: - self[k] = getattr(module, k) + self.apply_defaults(configs, self.module_defaults) + self.apply_defaults(configs, self.job_defaults) + self.set_min_update_every(configs, self.min_update_every) - def set_defaults_from_config(self, module_config): - for k in [k for k in JOB_BASE_CONF if k in module_config]: - self[k] = module_config[k] + return configs - def set_job_name(self, name): - self['job_name'] = re.sub(r'\s+', '_', name) - def set_override_name(self, name): - self['override_name'] = re.sub(r'\s+', '_', name) +JOB_STATUS_ACTIVE = 'active' +JOB_STATUS_RECOVERING = 'recovering' +JOB_STATUS_DROPPED = 'dropped' +JOB_STATUS_INIT = 'initial' - def as_dict(self): - return copy.deepcopy(OrderedDict(self)) +class Job(threading.Thread): + inf = -1 -class Job: - def __init__( - self, - service, - module_name, - config, - ): + def __init__(self, service, module_name, config): + threading.Thread.__init__(self) + self.daemon = True self.service = service - self.config = config self.module_name = module_name - self.name = config['job_name'] - self.override_name = config['override_name'] - self.wrapped = None + self.config = config + self.real_name = config['job_name'] + self.actual_name = config['override_name'] or self.real_name + self.autodetection_retry = config['autodetection_retry'] + self.checks = self.inf + self.job = None + self.status = JOB_STATUS_INIT + + def is_inited(self): + return self.job is not None def init(self): - self.wrapped = self.service(configuration=self.config.as_dict()) + self.job = self.service(configuration=copy.deepcopy(self.config)) def check(self): - return self.wrapped.check() - - def post_check(self, min_update_every): - if self.wrapped.update_every < min_update_every: - self.wrapped.update_every = min_update_every + ok = self.job.check() + self.checks -= self.checks != self.inf and not ok + return ok def create(self): - return self.wrapped.create() + self.job.create() - def autodetection_retry(self): - return self.config['autodetection_retry'] + def need_to_recheck(self): + return self.autodetection_retry != 0 and self.checks != 0 def run(self): - self.wrapped.run() + self.job.run() -class Module: +class ModuleSrc: def __init__(self, name): self.name = name - self.source = None - self.config = dict() + self.src = None + + def load(self): + self.src = load_module(self.name) + + def get(self, key): + return getattr(self.src, key, None) + + def service(self): + return self.get('Service') + + def defaults(self): + keys = ( + 'update_every', + 'priority', + 'autodetection_retry', + 'chart_cleanup', + 'penalty', + ) + return dict((k, self.get(k)) for k in keys if self.get(k) is not None) def is_disabled_by_default(self): - return bool(getattr(self.source, 'disabled_by_default', False)) - - def load_source(self): - self.source = load_module(self.name) - - def load_config(self, abs_path): - self.config = load_config(abs_path) or dict() - - def gather_jobs_configs(self): - job_names = [v for v in self.config if isinstance(self.config[v], dict)] - - if len(job_names) == 0: - job_conf = JobConf(JOB_BASE_CONF) - job_conf.set_defaults_from_module(self.source) - job_conf.update(self.config) - job_conf.set_job_name(self.name) - job_conf.set_override_name(job_conf.pop('name')) - return [job_conf] - - configs = list() - for job_name in job_names: - raw_job_conf = self.config[job_name] - job_conf = JobConf(JOB_BASE_CONF) - job_conf.set_defaults_from_module(self.source) - job_conf.set_defaults_from_config(self.config) - job_conf.update(raw_job_conf) - job_conf.set_job_name(job_name) - job_conf.set_override_name(job_conf.pop('name')) - configs.append(job_conf) + return bool(self.get('disabled_by_default')) - return configs - def create_jobs(self, jobs_conf=None): - return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()] +class JobsStatuses: + def __init__(self): + self.items = OrderedDict() + def dump(self): + return json.dumps(self.items, indent=2) -class JobRunner(threading.Thread): - def __init__(self, job): - threading.Thread.__init__(self) - self.daemon = True - self.wrapped = job + def get(self, module_name, job_name): + if module_name not in self.items: + return None + return self.items[module_name].get(job_name) - def run(self): - self.wrapped.run() + def has(self, module_name, job_name): + return self.get(module_name, job_name) is not None + def from_file(self, path): + with open(path) as f: + data = json.load(f) + return self.from_json(data) -class PluginConf(dict): + @staticmethod + def from_json(items): + if not isinstance(items, dict): + raise Exception('items obj has wrong type : {0}'.format(type(items))) + if not items: + return JobsStatuses() + + v = OrderedDict() + for mod_name in sorted(items): + if not items[mod_name]: + continue + v[mod_name] = OrderedDict() + for job_name in sorted(items[mod_name]): + v[mod_name][job_name] = items[mod_name][job_name] + + rv = JobsStatuses() + rv.items = v + return rv + + @staticmethod + def from_jobs(jobs): + v = OrderedDict() + for job in jobs: + status = job.status + if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING): + continue + if job.module_name not in v: + v[job.module_name] = OrderedDict() + v[job.module_name][job.real_name] = status + + rv = JobsStatuses() + rv.items = v + return rv + + +class StdoutSaver: + @staticmethod + def save(dump): + print(dump) + + +class CachedFileSaver: + def __init__(self, path): + self.last_save_success = False + self.last_saved_dump = str() + self.path = path + + def save(self, dump): + if self.last_save_success and self.last_saved_dump == dump: + return + try: + with open(self.path, 'w') as out: + out.write(dump) + except Exception: + self.last_save_success = False + raise + self.last_saved_dump = dump + self.last_save_success = True + + +class PluginConfig(dict): def __init__(self, *args): dict.__init__(self, *args) - def is_module_enabled(self, module_name, explicit): + def is_module_explicitly_enabled(self, module_name): + return self._is_module_enabled(module_name, True) + + def is_module_enabled(self, module_name): + return self._is_module_enabled(module_name, False) + + def _is_module_enabled(self, module_name, explicit): if module_name in self: return self[module_name] if explicit: @@ -428,249 +434,253 @@ class PluginConf(dict): class Plugin: - def __init__( - self, - min_update_every=1, - modules_to_run=tuple(AVAILABLE_MODULES), - ): - self.log = PythonDLogger() - self.config = PluginConf(PLUGIN_BASE_CONF) - self.task_queue = multiprocessing.JoinableQueue() - self.result_queue = multiprocessing.JoinableQueue() - self.min_update_every = min_update_every + config_name = 'python.d.conf' + jobs_status_dump_name = 'pythond-jobs-statuses.json' + + def __init__(self, modules_to_run, min_update_every): self.modules_to_run = modules_to_run - self.auto_detection_jobs = list() - self.tasks = list() - self.results = list() - self.checked_jobs = collections.defaultdict(list) + self.min_update_every = min_update_every + self.config = PluginConfig(PLUGIN_BASE_CONF) + self.log = PythonDLogger() + self.started_jobs = collections.defaultdict(dict) + self.jobs = list() + self.saver = None self.runs = 0 - @staticmethod - def shutdown(): - safe_print('DISABLE') - exit(0) - - def run(self): - jobs = self.create_jobs() - if not jobs: - return - - for job in self.prepare_jobs(jobs): - self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name)) - JobRunner(job).start() - - self.serve() - - def enqueue_tasks(self): - for task in self.tasks: - self.task_queue.put(task) - self.task_queue.put(END_TASK_MARKER) - - def dequeue_results(self): - while True: - result = self.result_queue.get() - self.result_queue.task_done() - if result is END_TASK_MARKER: - break - self.results.append(result) - def load_config(self): paths = [ - DIRS.user_config, - DIRS.stock_config, + DIRS.plugin_user_config, + DIRS.plugin_stock_config, ] - - self.log.info("checking for config in {0}".format(paths)) - abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths) + self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths)) + abs_path = multi_path_find(self.config_name, *paths) if not abs_path: - self.log.warning('config was not found, using defaults') + self.log.warning("'{0}' was not found, using defaults".format(self.config_name)) return True - self.log.info("config found, loading config '{0}'".format(abs_path)) + self.log.debug("loading '{0}'".format(abs_path)) try: - config = load_config(abs_path) or dict() + config = load_config(abs_path) except Exception as error: - self.log.error('error on loading config : {0}'.format(error)) + self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error))) return False - self.log.info('config successfully loaded') + self.log.debug("'{0}' is loaded".format(abs_path)) self.config.update(config) return True - def setup(self): - self.log.info('starting setup') - if not self.load_config(): - return False - - if not self.config['enabled']: - self.log.info('disabled in configuration file') - return False - - for mod in self.modules_to_run: - if self.config.is_module_enabled(mod, False): - task = Task(mod, self.config.is_module_enabled(mod, True)) - self.tasks.append(task) - else: - self.log.info("{0} : disabled in configuration file".format(mod)) - - if not self.tasks: - self.log.info('no modules to run') - return False + def load_job_statuses(self): + self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib)) + abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib) + if not abs_path: + self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name)) + return - worker = ModuleChecker(self.task_queue, self.result_queue) - self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks))) - worker.start() - - # TODO: timeouts? - self.enqueue_tasks() - self.task_queue.join() - self.dequeue_results() - self.result_queue.join() - self.task_queue.close() - self.result_queue.close() - self.log.info('stopping checker process') - worker.join() - - if not self.results: - self.log.info('no modules to run') - return False + self.log.debug("loading '{0}'".format(abs_path)) + try: + statuses = JobsStatuses().from_file(abs_path) + except Exception as error: + self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error))) + return None + self.log.debug("'{0}' is loaded".format(abs_path)) + return statuses - self.log.info("setup complete, {0} active module(s) : '{1}'".format( - len(self.results), - [v.module_name for v in self.results]) - ) + def create_jobs(self, job_statuses=None): + paths = [ + DIRS.modules_user_config, + DIRS.modules_stock_config, + ] - return True + builder = JobsConfigsBuilder(paths) + builder.job_defaults = JOB_BASE_CONF + builder.min_update_every = self.min_update_every - def create_jobs(self): jobs = list() - for result in self.results: - module = Module(result.module_name) + for mod_name in self.modules_to_run: + if not self.config.is_module_enabled(mod_name): + self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name)) + continue + + src = ModuleSrc(mod_name) try: - module.load_source() + src.load() except Exception as error: - self.log.warning("{0} : error on loading module source : {1}, skipping module".format( - result.module_name, error)) + self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error))) continue - module_jobs = module.create_jobs(result.jobs_configs) - self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs))) - jobs.extend(module_jobs) - - return jobs - - def prepare_jobs(self, jobs): - prepared = list() + if not (src.service() and callable(src.service())): + self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name)) + continue - for job in jobs: - check_name = job.override_name or job.name - if check_name in self.checked_jobs[job.module_name]: - self.log.info('{0}[{1}] : already served by another job, skipping the job'.format( - job.module_name, job.name)) + if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name): + self.log.info("[{0}] is disabled by default, skipping it".format(mod_name)) continue - try: - job.init() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( - job.module_name, job.name, error)) + builder.module_defaults = src.defaults() + configs = builder.build(mod_name) + if not configs: + self.log.info("[{0}] has no job configs, skipping it".format(mod_name)) continue - self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name)) + for config in configs: + config['job_name'] = re.sub(r'\s+', '_', config['job_name']) + config['override_name'] = re.sub(r'\s+', '_', config.pop('name')) - try: - ok = job.check() - except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - job.module_name, job.name, error)) - continue + job = Job(src.service(), mod_name, config) - if not ok: - self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name)) - if job.autodetection_retry() > 0: - self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format( - job.module_name, job.name, job.autodetection_retry())) - self.auto_detection_jobs.append(job) - continue + was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name) + if was_previously_active and job.autodetection_retry == 0: + self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format( + job.module_name, job.real_name)) + job.checks = 11 + job.autodetection_retry = 30 - self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name)) + jobs.append(job) - job.post_check(int(self.min_update_every)) + return jobs - if not job.create(): - self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name)) + def setup(self): + if not self.load_config(): + return False - self.checked_jobs[job.module_name].append(check_name) - prepared.append(job) + if not self.config['enabled']: + self.log.info('disabled in the configuration file') + return False - return prepared + statuses = self.load_job_statuses() - def serve(self): - gc_run = self.config['gc_run'] - gc_interval = self.config['gc_interval'] + self.jobs = self.create_jobs(statuses) + if not self.jobs: + self.log.info('no jobs to run') + return False + + if not IS_ATTY: + abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name) + self.saver = CachedFileSaver(abs_path) + return True - while True: - self.runs += 1 + def start_jobs(self, *jobs): + for job in jobs: + if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING): + continue - # threads: main + heartbeat - if threading.active_count() <= 2 and not self.auto_detection_jobs: - return + if job.actual_name in self.started_jobs[job.module_name]: + self.log.info('{0}[{1}] : already served by another job, skipping it'.format( + job.module_name, job.real_name)) + job.status = JOB_STATUS_DROPPED + continue - time.sleep(1) + if not job.is_inited(): + try: + job.init() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job", + job.module_name, job.real_name, repr(error)) + job.status = JOB_STATUS_DROPPED + continue - if gc_run and self.runs % gc_interval == 0: - v = gc.collect() - self.log.debug('GC collection run result: {0}'.format(v)) + try: + ok = job.check() + except Exception as error: + self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job", + job.module_name, job.real_name, repr(error)) + job.status = JOB_STATUS_DROPPED + continue + if not ok: + self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name)) + job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED + continue + self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) + + try: + job.create() + except Exception as error: + self.log.error("{0}[{1}] : unhandled exception on create : {2}, skipping the job", + job.module_name, job.real_name, repr(error)) + job.status = JOB_STATUS_DROPPED + continue - self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)] + self.started_jobs[job.module_name] = job.actual_name + job.status = JOB_STATUS_ACTIVE + job.start() - def retry_job(self, job): - stop_retrying = True - retry_later = False + @staticmethod + def keep_alive(): + if not IS_ATTY: + safe_print('\n') + + def garbage_collection(self): + if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0: + v = gc.collect() + self.log.debug('GC collection run result: {0}'.format(v)) + + def restart_recovering_jobs(self): + for job in self.jobs: + if job.status != JOB_STATUS_RECOVERING: + continue + if self.runs % job.autodetection_retry != 0: + continue + self.start_jobs(job) - if self.runs % job.autodetection_retry() != 0: - return retry_later + def cleanup_jobs(self): + self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED] - check_name = job.override_name or job.name - if check_name in self.checked_jobs[job.module_name]: - self.log.info("{0}[{1}]: already served by another job, give up on retrying".format( - job.module_name, job.name)) - return stop_retrying + def have_alive_jobs(self): + return next( + (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)), + False, + ) + def save_job_statuses(self): + if self.saver is None: + return + if self.runs % 10 != 0: + return + dump = JobsStatuses().from_jobs(self.jobs).dump() try: - ok = job.check() + self.saver.save(dump) except Exception as error: - self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format( - job.module_name, job.name, error)) - return stop_retrying + self.log.error("error on saving jobs statuses dump : {0}".format(repr(error))) - if not ok: - self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format( - job.module_name, job.name, job.autodetection_retry())) - return retry_later - self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name)) + def serve_once(self): + if not self.have_alive_jobs(): + self.log.info('no jobs to serve') + return False + + time.sleep(1) + self.runs += 1 - if not job.create(): - return stop_retrying + self.keep_alive() + self.garbage_collection() + self.cleanup_jobs() + self.restart_recovering_jobs() + self.save_job_statuses() + return True - job.post_check(int(self.min_update_every)) - self.checked_jobs[job.module_name].append(check_name) - JobRunner(job).start() + def serve(self): + while self.serve_once(): + pass - return stop_retrying + def run(self): + self.start_jobs(*self.jobs) + self.serve() -def parse_cmd(): +def parse_command_line(): opts = sys.argv[:][1:] + debug = False trace = False update_every = 1 modules_to_run = list() - v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None) - if v: - update_every = v - opts.remove(v) + def find_first_positive_int(values): + return next((v for v in values if v.isdigit() and int(v) >= 1), None) + + u = find_first_positive_int(opts) + if u is not None: + update_every = int(u) + opts.remove(u) if 'debug' in opts: debug = True opts.remove('debug') @@ -680,54 +690,80 @@ def parse_cmd(): if opts: modules_to_run = list(opts) - return collections.namedtuple( + cmd = collections.namedtuple( 'CMD', [ 'update_every', 'debug', 'trace', 'modules_to_run', - ], - )( + ]) + return cmd( update_every, debug, trace, - modules_to_run, + modules_to_run ) +def guess_module(modules, *names): + def guess(n): + found = None + for i, _ in enumerate(n): + cur = [x for x in modules if x.startswith(name[:i + 1])] + if not cur: + return found + found = cur + return found + + guessed = list() + for name in names: + name = name.lower() + m = guess(name) + if m: + guessed.extend(m) + return sorted(set(guessed)) + + +def disable(): + if not IS_ATTY: + safe_print('DISABLE') + exit(0) + + def main(): - cmd = parse_cmd() - logger = PythonDLogger() + cmd = parse_command_line() + log = PythonDLogger() if cmd.debug: - logger.logger.severity = 'DEBUG' + log.logger.severity = 'DEBUG' if cmd.trace: - logger.log_traceback = True + log.log_traceback = True - logger.info('using python v{0}'.format(PY_VERSION[0])) + log.info('using python v{0}'.format(PY_VERSION[0])) - unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) - if unknown_modules: - logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules)))) - safe_print('DISABLE') + unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES) + if unknown: + log.error('unknown modules : {0}'.format(sorted(list(unknown)))) + guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run) + if guessed: + log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1))) return - plugin = Plugin( - cmd.update_every, + p = Plugin( cmd.modules_to_run or AVAILABLE_MODULES, + cmd.update_every, ) - HeartBeat(1).start() - - if not plugin.setup(): - safe_print('DISABLE') - return - - plugin.run() - logger.info('exiting from main...') - plugin.shutdown() + try: + if not p.setup(): + return + p.run() + except KeyboardInterrupt: + pass + log.info('exiting from main...') -if __name__ == '__main__': +if __name__ == "__main__": main() + disable() -- cgit v1.2.3