summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin.in
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-09-13 05:05:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-09-13 05:05:16 +0000
commit8f5d8f3de6cae180af37917ef978a4affc2cd464 (patch)
tree4bfe1abc6d19c2dd635d1b83cc0e73d0aa6904ac /collectors/python.d.plugin/python.d.plugin.in
parentAdding upstream version 1.17.0. (diff)
downloadnetdata-8f5d8f3de6cae180af37917ef978a4affc2cd464.tar.xz
netdata-8f5d8f3de6cae180af37917ef978a4affc2cd464.zip
Adding upstream version 1.17.1.upstream/1.17.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rw-r--r--collectors/python.d.plugin/python.d.plugin.in1018
1 files changed, 527 insertions, 491 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
index f1429b6fa..5b8b50a67 100644
--- a/collectors/python.d.plugin/python.d.plugin.in
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -1,8 +1,5 @@
#!/usr/bin/env bash
'''':;
-if [[ "$OSTYPE" == "darwin"* ]]; then
- export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
-fi
exec "$(command -v python || command -v python3 || command -v python2 ||
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
@@ -12,19 +9,25 @@ echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later
-
import collections
import copy
import gc
-import multiprocessing
+import json
import os
+import pprint
import re
import sys
import time
import threading
import types
-PY_VERSION = sys.version_info[:2]
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
+
if PY_VERSION > (3, 1):
from importlib.machinery import SourceFileLoader
@@ -35,98 +38,101 @@ else:
ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
+ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
+def add_pythond_packages():
+ pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
+ pythond = os.path.abspath(pluginsd + '/../python.d')
+ packages = os.path.join(pythond, 'python_modules')
+ sys.path.append(packages)
+
+
+add_pythond_packages()
+
+
+from bases.collection import safe_print
+from bases.loggers import PythonDLogger
+from bases.loaders import load_config
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
+
+
def dirs():
- user_config = os.getenv(
+ var_lib = os.getenv(
+ ENV_NETDATA_LIB_DIR,
+ '@varlibdir_POST@',
+ )
+ plugin_user_config = os.getenv(
ENV_NETDATA_USER_CONFIG_DIR,
'@configdir_POST@',
)
- stock_config = os.getenv(
+ plugin_stock_config = os.getenv(
ENV_NETDATA_STOCK_CONFIG_DIR,
'@libconfigdir_POST@',
)
- modules_user_config = os.path.join(user_config, 'python.d')
- modules_stock_config = os.path.join(stock_config, 'python.d')
-
- modules = os.path.abspath(
- os.getenv(
- ENV_NETDATA_PLUGINS_DIR,
- os.path.dirname(__file__),
- ) + '/../python.d'
+ pluginsd = os.getenv(
+ ENV_NETDATA_PLUGINS_DIR,
+ os.path.dirname(__file__),
)
- pythond_packages = os.path.join(modules, 'python_modules')
+ modules_user_config = os.path.join(plugin_user_config, 'python.d')
+ modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
+ modules = os.path.abspath(pluginsd + '/../python.d')
- return collections.namedtuple(
+ Dirs = collections.namedtuple(
'Dirs',
[
- 'user_config',
- 'stock_config',
+ 'plugin_user_config',
+ 'plugin_stock_config',
'modules_user_config',
'modules_stock_config',
'modules',
- 'pythond_packages',
+ 'var_lib',
]
- )(
- user_config,
- stock_config,
+ )
+ return Dirs(
+ plugin_user_config,
+ plugin_stock_config,
modules_user_config,
modules_stock_config,
modules,
- pythond_packages,
+ var_lib,
)
DIRS = dirs()
-sys.path.append(DIRS.pythond_packages)
-
-
-from bases.collection import safe_print
-from bases.loggers import PythonDLogger
-from bases.loaders import load_config
-
-try:
- from collections import OrderedDict
-except ImportError:
- from third_party.ordereddict import OrderedDict
-
-
-END_TASK_MARKER = None
-
IS_ATTY = sys.stdout.isatty()
-PLUGIN_CONF_FILE = 'python.d.conf'
-
MODULE_SUFFIX = '.chart.py'
-OBSOLETED_MODULES = (
- 'apache_cache', # replaced by web_log
- 'cpuidle', # rewritten in C
- 'cpufreq', # rewritten in C
- 'gunicorn_log', # replaced by web_log
- 'linux_power_supply', # rewritten in C
- 'nginx_log', # replaced by web_log
- 'mdstat', # rewritten in C
- 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
-)
+def available_modules():
+ obsolete = (
+ 'apache_cache', # replaced by web_log
+ 'cpuidle', # rewritten in C
+ 'cpufreq', # rewritten in C
+ 'gunicorn_log', # replaced by web_log
+ 'linux_power_supply', # rewritten in C
+ 'nginx_log', # replaced by web_log
+ 'mdstat', # rewritten in C
+ 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
+ )
-AVAILABLE_MODULES = [
- m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
- if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
-]
+ files = sorted(os.listdir(DIRS.modules))
+ modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)]
+ avail = [m for m in modules if m not in obsolete]
+ return tuple(avail)
-PLUGIN_BASE_CONF = {
- 'enabled': True,
- 'default_run': True,
- 'gc_run': True,
- 'gc_interval': 300,
-}
+
+AVAILABLE_MODULES = available_modules()
JOB_BASE_CONF = {
- 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
+ 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
'priority': 60000,
'autodetection_retry': 0,
'chart_cleanup': 10,
@@ -134,23 +140,20 @@ JOB_BASE_CONF = {
'name': str(),
}
+PLUGIN_BASE_CONF = {
+ 'enabled': True,
+ 'default_run': True,
+ 'gc_run': True,
+ 'gc_interval': 300,
+}
-def heartbeat():
- if IS_ATTY:
- return
- safe_print('\n')
-
-
-class HeartBeat(threading.Thread):
- def __init__(self, every):
- threading.Thread.__init__(self)
- self.daemon = True
- self.every = every
- def run(self):
- while True:
- time.sleep(self.every)
- heartbeat()
+def multi_path_find(name, *paths):
+ for path in paths:
+ abs_name = os.path.join(path, name)
+ if os.path.isfile(abs_name):
+ return abs_name
+ return str()
def load_module(name):
@@ -161,265 +164,268 @@ def load_module(name):
return module.load_module()
-def multi_path_find(name, paths):
- for path in paths:
- abs_name = os.path.join(path, name)
- if os.path.isfile(abs_name):
- return abs_name
- return ''
-
-
-Task = collections.namedtuple(
- 'Task',
- [
- 'module_name',
- 'explicitly_enabled',
- ],
-)
-
-Result = collections.namedtuple(
- 'Result',
- [
- 'module_name',
- 'jobs_configs',
- ],
-)
-
-
-class ModuleChecker(multiprocessing.Process):
- def __init__(
- self,
- task_queue,
- result_queue,
- ):
- multiprocessing.Process.__init__(self)
- self.log = PythonDLogger()
- self.log.job_name = 'checker'
- self.task_queue = task_queue
- self.result_queue = result_queue
-
- def run(self):
- self.log.info('starting...')
- HeartBeat(1).start()
- while self.run_once():
- pass
- self.log.info('terminating...')
+class ModuleConfig:
+ def __init__(self, name, config=None):
+ self.name = name
+ self.config = config or OrderedDict()
- def run_once(self):
- task = self.task_queue.get()
+ def load(self, abs_path):
+ self.config.update(load_config(abs_path) or dict())
- if task is END_TASK_MARKER:
- # TODO: find better solution, understand why heartbeat thread doesn't work
- heartbeat()
- self.task_queue.task_done()
- self.result_queue.put(END_TASK_MARKER)
- return False
+ def defaults(self):
+ keys = (
+ 'update_every',
+ 'priority',
+ 'autodetection_retry',
+ 'chart_cleanup',
+ 'penalty',
+ )
+ return dict((k, self.config[k]) for k in keys if k in self.config)
- result = self.do_task(task)
- if result:
- self.result_queue.put(result)
- self.task_queue.task_done()
+ def create_job(self, job_name, job_config=None):
+ job_config = job_config or dict()
- return True
+ config = OrderedDict()
+ config.update(job_config)
+ config['job_name'] = job_name
+ for k, v in self.defaults().items():
+ config.setdefault(k, v)
- def do_task(self, task):
- self.log.info("{0} : checking".format(task.module_name))
+ return config
- # LOAD SOURCE
- module = Module(task.module_name)
- try:
- module.load_source()
- except Exception as error:
- self.log.warning("{0} : error on loading source : {1}, skipping module".format(
- task.module_name,
- error,
- ))
- return None
- else:
- self.log.info("{0} : source successfully loaded".format(task.module_name))
+ def job_names(self):
+ return [v for v in self.config if isinstance(self.config.get(v), dict)]
- if module.is_disabled_by_default() and not task.explicitly_enabled:
- self.log.info("{0} : disabled by default".format(task.module_name))
- return None
+ def single_job(self):
+ return [self.create_job(self.name)]
- # LOAD CONFIG
- paths = [
- DIRS.modules_user_config,
- DIRS.modules_stock_config,
- ]
+ def multi_job(self):
+ return [self.create_job(n, self.config[n]) for n in self.job_names()]
- conf_abs_path = multi_path_find(
- name='{0}.conf'.format(task.module_name),
- paths=paths,
- )
+ def create_jobs(self):
+ return self.multi_job() or self.single_job()
- if conf_abs_path:
- self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
- try:
- module.load_config(conf_abs_path)
- except Exception as error:
- self.log.warning("{0} : error on loading config : {1}, skipping module".format(
- task.module_name, error))
- return None
- else:
- self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
- task.module_name, paths))
-
- # CHECK JOBS
- jobs = module.create_jobs()
- self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
-
- successful_jobs_configs = list()
- for job in jobs:
- if job.autodetection_retry() > 0:
- successful_jobs_configs.append(job.config)
- self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
- continue
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
- task.module_name, job.name, error))
- continue
+class JobsConfigsBuilder:
+ def __init__(self, config_dirs):
+ self.config_dirs = config_dirs
+ self.log = PythonDLogger()
+ self.job_defaults = None
+ self.module_defaults = None
+ self.min_update_every = None
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- task.module_name, job.name, error))
- continue
+ def load_module_config(self, module_name):
+ name = '{0}.conf'.format(module_name)
+ self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
+ config = ModuleConfig(module_name)
- if not ok:
- self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
- continue
+ abs_path = multi_path_find(name, *self.config_dirs)
+ if not abs_path:
+ self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
+ return config
- self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
+ self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
+ try:
+ config.load(abs_path)
+ except Exception as error:
+ self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
+ return None
- job.config['autodetection_retry'] = job.config['update_every']
- successful_jobs_configs.append(job.config)
+ self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
+ return config
- if not successful_jobs_configs:
- self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
- return None
+ @staticmethod
+ def apply_defaults(jobs, defaults):
+ if defaults is None:
+ return
+ for k, v in defaults.items():
+ for job in jobs:
+ job.setdefault(k, v)
- return Result(module.source.__name__, successful_jobs_configs)
+ def set_min_update_every(self, jobs, min_update_every):
+ if min_update_every is None:
+ return
+ for job in jobs:
+ if 'update_every' in job and job['update_every'] < self.min_update_every:
+ job['update_every'] = self.min_update_every
+ def build(self, module_name):
+ config = self.load_module_config(module_name)
+ if config is None:
+ return None
-class JobConf(OrderedDict):
- def __init__(self, *args):
- OrderedDict.__init__(self, *args)
+ configs = config.create_jobs()
+ self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
- def set_defaults_from_module(self, module):
- for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
- self[k] = getattr(module, k)
+ self.apply_defaults(configs, self.module_defaults)
+ self.apply_defaults(configs, self.job_defaults)
+ self.set_min_update_every(configs, self.min_update_every)
- def set_defaults_from_config(self, module_config):
- for k in [k for k in JOB_BASE_CONF if k in module_config]:
- self[k] = module_config[k]
+ return configs
- def set_job_name(self, name):
- self['job_name'] = re.sub(r'\s+', '_', name)
- def set_override_name(self, name):
- self['override_name'] = re.sub(r'\s+', '_', name)
+JOB_STATUS_ACTIVE = 'active'
+JOB_STATUS_RECOVERING = 'recovering'
+JOB_STATUS_DROPPED = 'dropped'
+JOB_STATUS_INIT = 'initial'
- def as_dict(self):
- return copy.deepcopy(OrderedDict(self))
+class Job(threading.Thread):
+ inf = -1
-class Job:
- def __init__(
- self,
- service,
- module_name,
- config,
- ):
+ def __init__(self, service, module_name, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
self.service = service
- self.config = config
self.module_name = module_name
- self.name = config['job_name']
- self.override_name = config['override_name']
- self.wrapped = None
+ self.config = config
+ self.real_name = config['job_name']
+ self.actual_name = config['override_name'] or self.real_name
+ self.autodetection_retry = config['autodetection_retry']
+ self.checks = self.inf
+ self.job = None
+ self.status = JOB_STATUS_INIT
+
+ def is_inited(self):
+ return self.job is not None
def init(self):
- self.wrapped = self.service(configuration=self.config.as_dict())
+ self.job = self.service(configuration=copy.deepcopy(self.config))
def check(self):
- return self.wrapped.check()
-
- def post_check(self, min_update_every):
- if self.wrapped.update_every < min_update_every:
- self.wrapped.update_every = min_update_every
+ ok = self.job.check()
+ self.checks -= self.checks != self.inf and not ok
+ return ok
def create(self):
- return self.wrapped.create()
+ self.job.create()
- def autodetection_retry(self):
- return self.config['autodetection_retry']
+ def need_to_recheck(self):
+ return self.autodetection_retry != 0 and self.checks != 0
def run(self):
- self.wrapped.run()
+ self.job.run()
-class Module:
+class ModuleSrc:
def __init__(self, name):
self.name = name
- self.source = None
- self.config = dict()
+ self.src = None
+
+ def load(self):
+ self.src = load_module(self.name)
+
+ def get(self, key):
+ return getattr(self.src, key, None)
+
+ def service(self):
+ return self.get('Service')
+
+ def defaults(self):
+ keys = (
+ 'update_every',
+ 'priority',
+ 'autodetection_retry',
+ 'chart_cleanup',
+ 'penalty',
+ )
+ return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
def is_disabled_by_default(self):
- return bool(getattr(self.source, 'disabled_by_default', False))
-
- def load_source(self):
- self.source = load_module(self.name)
-
- def load_config(self, abs_path):
- self.config = load_config(abs_path) or dict()
-
- def gather_jobs_configs(self):
- job_names = [v for v in self.config if isinstance(self.config[v], dict)]
-
- if len(job_names) == 0:
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.update(self.config)
- job_conf.set_job_name(self.name)
- job_conf.set_override_name(job_conf.pop('name'))
- return [job_conf]
-
- configs = list()
- for job_name in job_names:
- raw_job_conf = self.config[job_name]
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.set_defaults_from_config(self.config)
- job_conf.update(raw_job_conf)
- job_conf.set_job_name(job_name)
- job_conf.set_override_name(job_conf.pop('name'))
- configs.append(job_conf)
+ return bool(self.get('disabled_by_default'))
- return configs
- def create_jobs(self, jobs_conf=None):
- return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
+class JobsStatuses:
+ def __init__(self):
+ self.items = OrderedDict()
+ def dump(self):
+ return json.dumps(self.items, indent=2)
-class JobRunner(threading.Thread):
- def __init__(self, job):
- threading.Thread.__init__(self)
- self.daemon = True
- self.wrapped = job
+ def get(self, module_name, job_name):
+ if module_name not in self.items:
+ return None
+ return self.items[module_name].get(job_name)
- def run(self):
- self.wrapped.run()
+ def has(self, module_name, job_name):
+ return self.get(module_name, job_name) is not None
+ def from_file(self, path):
+ with open(path) as f:
+ data = json.load(f)
+ return self.from_json(data)
-class PluginConf(dict):
+ @staticmethod
+ def from_json(items):
+ if not isinstance(items, dict):
+ raise Exception('items obj has wrong type : {0}'.format(type(items)))
+ if not items:
+ return JobsStatuses()
+
+ v = OrderedDict()
+ for mod_name in sorted(items):
+ if not items[mod_name]:
+ continue
+ v[mod_name] = OrderedDict()
+ for job_name in sorted(items[mod_name]):
+ v[mod_name][job_name] = items[mod_name][job_name]
+
+ rv = JobsStatuses()
+ rv.items = v
+ return rv
+
+ @staticmethod
+ def from_jobs(jobs):
+ v = OrderedDict()
+ for job in jobs:
+ status = job.status
+ if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
+ continue
+ if job.module_name not in v:
+ v[job.module_name] = OrderedDict()
+ v[job.module_name][job.real_name] = status
+
+ rv = JobsStatuses()
+ rv.items = v
+ return rv
+
+
+class StdoutSaver:
+ @staticmethod
+ def save(dump):
+ print(dump)
+
+
+class CachedFileSaver:
+ def __init__(self, path):
+ self.last_save_success = False
+ self.last_saved_dump = str()
+ self.path = path
+
+ def save(self, dump):
+ if self.last_save_success and self.last_saved_dump == dump:
+ return
+ try:
+ with open(self.path, 'w') as out:
+ out.write(dump)
+ except Exception:
+ self.last_save_success = False
+ raise
+ self.last_saved_dump = dump
+ self.last_save_success = True
+
+
+class PluginConfig(dict):
def __init__(self, *args):
dict.__init__(self, *args)
- def is_module_enabled(self, module_name, explicit):
+ def is_module_explicitly_enabled(self, module_name):
+ return self._is_module_enabled(module_name, True)
+
+ def is_module_enabled(self, module_name):
+ return self._is_module_enabled(module_name, False)
+
+ def _is_module_enabled(self, module_name, explicit):
if module_name in self:
return self[module_name]
if explicit:
@@ -428,249 +434,253 @@ class PluginConf(dict):
class Plugin:
- def __init__(
- self,
- min_update_every=1,
- modules_to_run=tuple(AVAILABLE_MODULES),
- ):
- self.log = PythonDLogger()
- self.config = PluginConf(PLUGIN_BASE_CONF)
- self.task_queue = multiprocessing.JoinableQueue()
- self.result_queue = multiprocessing.JoinableQueue()
- self.min_update_every = min_update_every
+ config_name = 'python.d.conf'
+ jobs_status_dump_name = 'pythond-jobs-statuses.json'
+
+ def __init__(self, modules_to_run, min_update_every):
self.modules_to_run = modules_to_run
- self.auto_detection_jobs = list()
- self.tasks = list()
- self.results = list()
- self.checked_jobs = collections.defaultdict(list)
+ self.min_update_every = min_update_every
+ self.config = PluginConfig(PLUGIN_BASE_CONF)
+ self.log = PythonDLogger()
+ self.started_jobs = collections.defaultdict(dict)
+ self.jobs = list()
+ self.saver = None
self.runs = 0
- @staticmethod
- def shutdown():
- safe_print('DISABLE')
- exit(0)
-
- def run(self):
- jobs = self.create_jobs()
- if not jobs:
- return
-
- for job in self.prepare_jobs(jobs):
- self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
- JobRunner(job).start()
-
- self.serve()
-
- def enqueue_tasks(self):
- for task in self.tasks:
- self.task_queue.put(task)
- self.task_queue.put(END_TASK_MARKER)
-
- def dequeue_results(self):
- while True:
- result = self.result_queue.get()
- self.result_queue.task_done()
- if result is END_TASK_MARKER:
- break
- self.results.append(result)
-
def load_config(self):
paths = [
- DIRS.user_config,
- DIRS.stock_config,
+ DIRS.plugin_user_config,
+ DIRS.plugin_stock_config,
]
-
- self.log.info("checking for config in {0}".format(paths))
- abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
+ self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths))
+ abs_path = multi_path_find(self.config_name, *paths)
if not abs_path:
- self.log.warning('config was not found, using defaults')
+ self.log.warning("'{0}' was not found, using defaults".format(self.config_name))
return True
- self.log.info("config found, loading config '{0}'".format(abs_path))
+ self.log.debug("loading '{0}'".format(abs_path))
try:
- config = load_config(abs_path) or dict()
+ config = load_config(abs_path)
except Exception as error:
- self.log.error('error on loading config : {0}'.format(error))
+ self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error)))
return False
- self.log.info('config successfully loaded')
+ self.log.debug("'{0}' is loaded".format(abs_path))
self.config.update(config)
return True
- def setup(self):
- self.log.info('starting setup')
- if not self.load_config():
- return False
-
- if not self.config['enabled']:
- self.log.info('disabled in configuration file')
- return False
-
- for mod in self.modules_to_run:
- if self.config.is_module_enabled(mod, False):
- task = Task(mod, self.config.is_module_enabled(mod, True))
- self.tasks.append(task)
- else:
- self.log.info("{0} : disabled in configuration file".format(mod))
-
- if not self.tasks:
- self.log.info('no modules to run')
- return False
+ def load_job_statuses(self):
+ self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
+ abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
+ if not abs_path:
+ self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
+ return
- worker = ModuleChecker(self.task_queue, self.result_queue)
- self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
- worker.start()
-
- # TODO: timeouts?
- self.enqueue_tasks()
- self.task_queue.join()
- self.dequeue_results()
- self.result_queue.join()
- self.task_queue.close()
- self.result_queue.close()
- self.log.info('stopping checker process')
- worker.join()
-
- if not self.results:
- self.log.info('no modules to run')
- return False
+ self.log.debug("loading '{0}'".format(abs_path))
+ try:
+ statuses = JobsStatuses().from_file(abs_path)
+ except Exception as error:
+ self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error)))
+ return None
+ self.log.debug("'{0}' is loaded".format(abs_path))
+ return statuses
- self.log.info("setup complete, {0} active module(s) : '{1}'".format(
- len(self.results),
- [v.module_name for v in self.results])
- )
+ def create_jobs(self, job_statuses=None):
+ paths = [
+ DIRS.modules_user_config,
+ DIRS.modules_stock_config,
+ ]
- return True
+ builder = JobsConfigsBuilder(paths)
+ builder.job_defaults = JOB_BASE_CONF
+ builder.min_update_every = self.min_update_every
- def create_jobs(self):
jobs = list()
- for result in self.results:
- module = Module(result.module_name)
+ for mod_name in self.modules_to_run:
+ if not self.config.is_module_enabled(mod_name):
+ self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name))
+ continue
+
+ src = ModuleSrc(mod_name)
try:
- module.load_source()
+ src.load()
except Exception as error:
- self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
- result.module_name, error))
+ self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error)))
continue
- module_jobs = module.create_jobs(result.jobs_configs)
- self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
- jobs.extend(module_jobs)
-
- return jobs
-
- def prepare_jobs(self, jobs):
- prepared = list()
+ if not (src.service() and callable(src.service())):
+ self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name))
+ continue
- for job in jobs:
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
- job.module_name, job.name))
+ if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name):
+ self.log.info("[{0}] is disabled by default, skipping it".format(mod_name))
continue
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
- job.module_name, job.name, error))
+ builder.module_defaults = src.defaults()
+ configs = builder.build(mod_name)
+ if not configs:
+ self.log.info("[{0}] has no job configs, skipping it".format(mod_name))
continue
- self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
+ for config in configs:
+ config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
+ config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- job.module_name, job.name, error))
- continue
+ job = Job(src.service(), mod_name, config)
- if not ok:
- self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
- if job.autodetection_retry() > 0:
- self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- self.auto_detection_jobs.append(job)
- continue
+ was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
+ if was_previously_active and job.autodetection_retry == 0:
+ self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
+ job.module_name, job.real_name))
+ job.checks = 11
+ job.autodetection_retry = 30
- self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
+ jobs.append(job)
- job.post_check(int(self.min_update_every))
+ return jobs
- if not job.create():
- self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
+ def setup(self):
+ if not self.load_config():
+ return False
- self.checked_jobs[job.module_name].append(check_name)
- prepared.append(job)
+ if not self.config['enabled']:
+ self.log.info('disabled in the configuration file')
+ return False
- return prepared
+ statuses = self.load_job_statuses()
- def serve(self):
- gc_run = self.config['gc_run']
- gc_interval = self.config['gc_interval']
+ self.jobs = self.create_jobs(statuses)
+ if not self.jobs:
+ self.log.info('no jobs to run')
+ return False
+
+ if not IS_ATTY:
+ abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
+ self.saver = CachedFileSaver(abs_path)
+ return True
- while True:
- self.runs += 1
+ def start_jobs(self, *jobs):
+ for job in jobs:
+ if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
+ continue
- # threads: main + heartbeat
- if threading.active_count() <= 2 and not self.auto_detection_jobs:
- return
+ if job.actual_name in self.started_jobs[job.module_name]:
+ self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
+ job.module_name, job.real_name))
+ job.status = JOB_STATUS_DROPPED
+ continue
- time.sleep(1)
+ if not job.is_inited():
+ try:
+ job.init()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job",
+ job.module_name, job.real_name, repr(error))
+ job.status = JOB_STATUS_DROPPED
+ continue
- if gc_run and self.runs % gc_interval == 0:
- v = gc.collect()
- self.log.debug('GC collection run result: {0}'.format(v))
+ try:
+ ok = job.check()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job",
+ job.module_name, job.real_name, repr(error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+ if not ok:
+ self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
+ job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
+ continue
+ self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
+
+ try:
+ job.create()
+ except Exception as error:
+ self.log.error("{0}[{1}] : unhandled exception on create : {2}, skipping the job",
+ job.module_name, job.real_name, repr(error))
+ job.status = JOB_STATUS_DROPPED
+ continue
- self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
+ self.started_jobs[job.module_name] = job.actual_name
+ job.status = JOB_STATUS_ACTIVE
+ job.start()
- def retry_job(self, job):
- stop_retrying = True
- retry_later = False
+ @staticmethod
+ def keep_alive():
+ if not IS_ATTY:
+ safe_print('\n')
+
+ def garbage_collection(self):
+ if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
+ v = gc.collect()
+ self.log.debug('GC collection run result: {0}'.format(v))
+
+ def restart_recovering_jobs(self):
+ for job in self.jobs:
+ if job.status != JOB_STATUS_RECOVERING:
+ continue
+ if self.runs % job.autodetection_retry != 0:
+ continue
+ self.start_jobs(job)
- if self.runs % job.autodetection_retry() != 0:
- return retry_later
+ def cleanup_jobs(self):
+ self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
- job.module_name, job.name))
- return stop_retrying
+ def have_alive_jobs(self):
+ return next(
+ (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
+ False,
+ )
+ def save_job_statuses(self):
+ if self.saver is None:
+ return
+ if self.runs % 10 != 0:
+ return
+ dump = JobsStatuses().from_jobs(self.jobs).dump()
try:
- ok = job.check()
+ self.saver.save(dump)
except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
- job.module_name, job.name, error))
- return stop_retrying
+ self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
- if not ok:
- self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- return retry_later
- self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
+ def serve_once(self):
+ if not self.have_alive_jobs():
+ self.log.info('no jobs to serve')
+ return False
+
+ time.sleep(1)
+ self.runs += 1
- if not job.create():
- return stop_retrying
+ self.keep_alive()
+ self.garbage_collection()
+ self.cleanup_jobs()
+ self.restart_recovering_jobs()
+ self.save_job_statuses()
+ return True
- job.post_check(int(self.min_update_every))
- self.checked_jobs[job.module_name].append(check_name)
- JobRunner(job).start()
+ def serve(self):
+ while self.serve_once():
+ pass
- return stop_retrying
+ def run(self):
+ self.start_jobs(*self.jobs)
+ self.serve()
-def parse_cmd():
+def parse_command_line():
opts = sys.argv[:][1:]
+
debug = False
trace = False
update_every = 1
modules_to_run = list()
- v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
- if v:
- update_every = v
- opts.remove(v)
+ def find_first_positive_int(values):
+ return next((v for v in values if v.isdigit() and int(v) >= 1), None)
+
+ u = find_first_positive_int(opts)
+ if u is not None:
+ update_every = int(u)
+ opts.remove(u)
if 'debug' in opts:
debug = True
opts.remove('debug')
@@ -680,54 +690,80 @@ def parse_cmd():
if opts:
modules_to_run = list(opts)
- return collections.namedtuple(
+ cmd = collections.namedtuple(
'CMD',
[
'update_every',
'debug',
'trace',
'modules_to_run',
- ],
- )(
+ ])
+ return cmd(
update_every,
debug,
trace,
- modules_to_run,
+ modules_to_run
)
+def guess_module(modules, *names):
+ def guess(n):
+ found = None
+ for i, _ in enumerate(n):
+ cur = [x for x in modules if x.startswith(name[:i + 1])]
+ if not cur:
+ return found
+ found = cur
+ return found
+
+ guessed = list()
+ for name in names:
+ name = name.lower()
+ m = guess(name)
+ if m:
+ guessed.extend(m)
+ return sorted(set(guessed))
+
+
+def disable():
+ if not IS_ATTY:
+ safe_print('DISABLE')
+ exit(0)
+
+
def main():
- cmd = parse_cmd()
- logger = PythonDLogger()
+ cmd = parse_command_line()
+ log = PythonDLogger()
if cmd.debug:
- logger.logger.severity = 'DEBUG'
+ log.logger.severity = 'DEBUG'
if cmd.trace:
- logger.log_traceback = True
+ log.log_traceback = True
- logger.info('using python v{0}'.format(PY_VERSION[0]))
+ log.info('using python v{0}'.format(PY_VERSION[0]))
- unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
- if unknown_modules:
- logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
- safe_print('DISABLE')
+ unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
+ if unknown:
+ log.error('unknown modules : {0}'.format(sorted(list(unknown))))
+ guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run)
+ if guessed:
+ log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
return
- plugin = Plugin(
- cmd.update_every,
+ p = Plugin(
cmd.modules_to_run or AVAILABLE_MODULES,
+ cmd.update_every,
)
- HeartBeat(1).start()
-
- if not plugin.setup():
- safe_print('DISABLE')
- return
-
- plugin.run()
- logger.info('exiting from main...')
- plugin.shutdown()
+ try:
+ if not p.setup():
+ return
+ p.run()
+ except KeyboardInterrupt:
+ pass
+ log.info('exiting from main...')
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
+ disable()