summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin.in
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /collectors/python.d.plugin/python.d.plugin.in
parentInitial commit. (diff)
downloadnetdata-upstream.tar.xz
netdata-upstream.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--collectors/python.d.plugin/python.d.plugin.in946
1 files changed, 946 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
new file mode 100644
index 00000000..86fea209
--- /dev/null
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -0,0 +1,946 @@
+#!/usr/bin/env bash
+'''':;
+pybinary=$(which python3 || which python || which python2)
+filtered=()
+for arg in "$@"
+do
+ case $arg in
+ -p*) pybinary=${arg:2}
+ shift 1 ;;
+ *) filtered+=("$arg") ;;
+ esac
+done
+if [ "$pybinary" = "" ]
+then
+ echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
+ exit 1
+fi
+exec "$pybinary" "$0" "${filtered[@]}" # '''
+
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import collections
+import copy
+import gc
+import json
+import os
+import pprint
+import re
+import sys
+import threading
+import time
+import types
+
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
+
+if PY_VERSION > (3, 1):
+ from importlib.machinery import SourceFileLoader
+else:
+ from imp import load_source as SourceFileLoader
+
+ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
+ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
+ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
+ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS'
+ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
+ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
+ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
+
+
+def add_pythond_packages():
+ pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
+ pythond = os.path.abspath(pluginsd + '/../python.d')
+ packages = os.path.join(pythond, 'python_modules')
+ sys.path.append(packages)
+
+
+add_pythond_packages()
+
+from bases.collection import safe_print
+from bases.loggers import PythonDLogger
+from bases.loaders import load_config
+from third_party import filelock
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
+
+
+def dirs():
+ var_lib = os.getenv(
+ ENV_NETDATA_LIB_DIR,
+ '@varlibdir_POST@',
+ )
+ plugin_user_config = os.getenv(
+ ENV_NETDATA_USER_CONFIG_DIR,
+ '@configdir_POST@',
+ )
+ plugin_stock_config = os.getenv(
+ ENV_NETDATA_STOCK_CONFIG_DIR,
+ '@libconfigdir_POST@',
+ )
+ pluginsd = os.getenv(
+ ENV_NETDATA_PLUGINS_DIR,
+ os.path.dirname(__file__),
+ )
+ locks = os.getenv(
+ ENV_NETDATA_LOCK_DIR,
+ os.path.join('@varlibdir_POST@', 'lock')
+ )
+ modules_user_config = os.path.join(plugin_user_config, 'python.d')
+ modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
+ modules = os.path.abspath(pluginsd + '/../python.d')
+ user_modules = [os.path.join(p, 'python.d') for p in
+ os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if
+ p]
+
+ Dirs = collections.namedtuple(
+ 'Dirs',
+ [
+ 'plugin_user_config',
+ 'plugin_stock_config',
+ 'modules_user_config',
+ 'modules_stock_config',
+ 'modules',
+ 'user_modules',
+ 'var_lib',
+ 'locks',
+ ]
+ )
+ return Dirs(
+ plugin_user_config,
+ plugin_stock_config,
+ modules_user_config,
+ modules_stock_config,
+ modules,
+ user_modules,
+ var_lib,
+ locks,
+ )
+
+
+DIRS = dirs()
+
+IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty()
+
+MODULE_SUFFIX = '.chart.py'
+
+
+def find_available_modules(*directories):
+ AvailableModule = collections.namedtuple(
+ 'AvailableModule',
+ [
+ 'filepath',
+ 'name',
+ ]
+ )
+ available = list()
+ for d in directories:
+ try:
+ if not os.path.isdir(d):
+ continue
+ files = sorted(os.listdir(d))
+ except OSError:
+ continue
+ modules = [m for m in files if m.endswith(MODULE_SUFFIX)]
+ available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules])
+
+ return available
+
+
+def available_modules():
+ obsolete = (
+ 'apache_cache', # replaced by web_log
+ 'cpuidle', # rewritten in C
+ 'cpufreq', # rewritten in C
+ 'gunicorn_log', # replaced by web_log
+ 'linux_power_supply', # rewritten in C
+ 'nginx_log', # replaced by web_log
+ 'mdstat', # rewritten in C
+ 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
+ 'unbound', # rewritten in Go
+ )
+
+ stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete]
+ user = find_available_modules(*DIRS.user_modules)
+
+ available, seen = list(), set()
+ for m in user + stock:
+ if m.name in seen:
+ continue
+ seen.add(m.name)
+ available.append(m)
+
+ return available
+
+
+AVAILABLE_MODULES = available_modules()
+
+JOB_BASE_CONF = {
+ 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
+ 'priority': 60000,
+ 'autodetection_retry': 0,
+ 'chart_cleanup': 10,
+ 'penalty': True,
+ 'name': str(),
+}
+
+PLUGIN_BASE_CONF = {
+ 'enabled': True,
+ 'default_run': True,
+ 'gc_run': True,
+ 'gc_interval': 300,
+}
+
+
+def multi_path_find(name, *paths):
+ for path in paths:
+ abs_name = os.path.join(path, name)
+ if os.path.isfile(abs_name):
+ return abs_name
+ return str()
+
+
+def load_module(name, filepath):
+ module = SourceFileLoader('pythond_' + name, filepath)
+ if isinstance(module, types.ModuleType):
+ return module
+ return module.load_module()
+
+
+class ModuleConfig:
+ def __init__(self, name, config=None):
+ self.name = name
+ self.config = config or OrderedDict()
+ self.is_stock = False
+
+ def load(self, abs_path):
+ if not IS_ATTY:
+ self.is_stock = abs_path.startswith(DIRS.modules_stock_config)
+ self.config.update(load_config(abs_path) or dict())
+
+ def defaults(self):
+ keys = (
+ 'update_every',
+ 'priority',
+ 'autodetection_retry',
+ 'chart_cleanup',
+ 'penalty',
+ )
+ return dict((k, self.config[k]) for k in keys if k in self.config)
+
+ def create_job(self, job_name, job_config=None):
+ job_config = job_config or dict()
+
+ config = OrderedDict()
+ config.update(job_config)
+ config['job_name'] = job_name
+ config['__is_stock'] = self.is_stock
+ for k, v in self.defaults().items():
+ config.setdefault(k, v)
+
+ return config
+
+ def job_names(self):
+ return [v for v in self.config if isinstance(self.config.get(v), dict)]
+
+ def single_job(self):
+ return [self.create_job(self.name, self.config)]
+
+ def multi_job(self):
+ return [self.create_job(n, self.config[n]) for n in self.job_names()]
+
+ def create_jobs(self):
+ return self.multi_job() or self.single_job()
+
+
+class JobsConfigsBuilder:
+ def __init__(self, config_dirs):
+ self.config_dirs = config_dirs
+ self.log = PythonDLogger()
+ self.job_defaults = None
+ self.module_defaults = None
+ self.min_update_every = None
+
+ def load_module_config(self, module_name):
+ name = '{0}.conf'.format(module_name)
+ self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
+ config = ModuleConfig(module_name)
+
+ abs_path = multi_path_find(name, *self.config_dirs)
+ if not abs_path:
+ self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
+ return config
+
+ self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
+ try:
+ config.load(abs_path)
+ except Exception as error:
+ self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
+ return None
+
+ self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
+ return config
+
+ @staticmethod
+ def apply_defaults(jobs, defaults):
+ if defaults is None:
+ return
+ for k, v in defaults.items():
+ for job in jobs:
+ job.setdefault(k, v)
+
+ def set_min_update_every(self, jobs, min_update_every):
+ if min_update_every is None:
+ return
+ for job in jobs:
+ if 'update_every' in job and job['update_every'] < self.min_update_every:
+ job['update_every'] = self.min_update_every
+
+ def build(self, module_name):
+ config = self.load_module_config(module_name)
+ if config is None:
+ return None
+
+ configs = config.create_jobs()
+ if not config.is_stock:
+ self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
+
+ self.apply_defaults(configs, self.module_defaults)
+ self.apply_defaults(configs, self.job_defaults)
+ self.set_min_update_every(configs, self.min_update_every)
+
+ return configs
+
+
+JOB_STATUS_ACTIVE = 'active'
+JOB_STATUS_RECOVERING = 'recovering'
+JOB_STATUS_DROPPED = 'dropped'
+JOB_STATUS_INIT = 'initial'
+
+
+class Job(threading.Thread):
+ inf = -1
+
+ def __init__(self, service, module_name, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.service = service
+ self.module_name = module_name
+ self.config = config
+ self.real_name = config['job_name']
+ self.actual_name = config['override_name'] or self.real_name
+ self.autodetection_retry = config['autodetection_retry']
+ self.checks = self.inf
+ self.job = None
+ self.is_stock = config.get('__is_stock', False)
+ self.status = JOB_STATUS_INIT
+
+ def is_inited(self):
+ return self.job is not None
+
+ def init(self):
+ self.job = self.service(configuration=copy.deepcopy(self.config))
+
+ def full_name(self):
+ return self.job.name
+
+ def check(self):
+ if self.is_stock:
+ self.job.logger.mute()
+
+ ok = self.job.check()
+
+ self.job.logger.unmute()
+ self.checks -= self.checks != self.inf and not ok
+
+ return ok
+
+ def create(self):
+ self.job.create()
+
+ def need_to_recheck(self):
+ return self.autodetection_retry != 0 and self.checks != 0
+
+ def run(self):
+ self.job.run()
+
+
+class ModuleSrc:
+ def __init__(self, m):
+ self.name = m.name
+ self.filepath = m.filepath
+ self.src = None
+
+ def load(self):
+ self.src = load_module(self.name, self.filepath)
+
+ def get(self, key):
+ return getattr(self.src, key, None)
+
+ def service(self):
+ return self.get('Service')
+
+ def defaults(self):
+ keys = (
+ 'update_every',
+ 'priority',
+ 'autodetection_retry',
+ 'chart_cleanup',
+ 'penalty',
+ )
+ return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
+
+ def is_disabled_by_default(self):
+ return bool(self.get('disabled_by_default'))
+
+
+class JobsStatuses:
+ def __init__(self):
+ self.items = OrderedDict()
+
+ def dump(self):
+ return json.dumps(self.items, indent=2)
+
+ def get(self, module_name, job_name):
+ if module_name not in self.items:
+ return None
+ return self.items[module_name].get(job_name)
+
+ def has(self, module_name, job_name):
+ return self.get(module_name, job_name) is not None
+
+ def from_file(self, path):
+ with open(path) as f:
+ data = json.load(f)
+ return self.from_json(data)
+
+ @staticmethod
+ def from_json(items):
+ if not isinstance(items, dict):
+ raise Exception('items obj has wrong type : {0}'.format(type(items)))
+ if not items:
+ return JobsStatuses()
+
+ v = OrderedDict()
+ for mod_name in sorted(items):
+ if not items[mod_name]:
+ continue
+ v[mod_name] = OrderedDict()
+ for job_name in sorted(items[mod_name]):
+ v[mod_name][job_name] = items[mod_name][job_name]
+
+ rv = JobsStatuses()
+ rv.items = v
+ return rv
+
+ @staticmethod
+ def from_jobs(jobs):
+ v = OrderedDict()
+ for job in jobs:
+ status = job.status
+ if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
+ continue
+ if job.module_name not in v:
+ v[job.module_name] = OrderedDict()
+ v[job.module_name][job.real_name] = status
+
+ rv = JobsStatuses()
+ rv.items = v
+ return rv
+
+
+class StdoutSaver:
+ @staticmethod
+ def save(dump):
+ print(dump)
+
+
+class CachedFileSaver:
+ def __init__(self, path):
+ self.last_save_success = False
+ self.last_saved_dump = str()
+ self.path = path
+
+ def save(self, dump):
+ if self.last_save_success and self.last_saved_dump == dump:
+ return
+ try:
+ with open(self.path, 'w') as out:
+ out.write(dump)
+ except Exception:
+ self.last_save_success = False
+ raise
+ self.last_saved_dump = dump
+ self.last_save_success = True
+
+
+class PluginConfig(dict):
+ def __init__(self, *args):
+ dict.__init__(self, *args)
+
+ def is_module_explicitly_enabled(self, module_name):
+ return self._is_module_enabled(module_name, True)
+
+ def is_module_enabled(self, module_name):
+ return self._is_module_enabled(module_name, False)
+
+ def _is_module_enabled(self, module_name, explicit):
+ if module_name in self:
+ return self[module_name]
+ if explicit:
+ return False
+ return self['default_run']
+
+
+class FileLockRegistry:
+ def __init__(self, path):
+ self.path = path
+ self.locks = dict()
+
+ @staticmethod
+ def rename(name):
+ # go version name is 'docker'
+ if name.startswith("dockerd"):
+ name = "docker" + name[7:]
+ return name
+
+ def register(self, name):
+ name = self.rename(name)
+ if name in self.locks:
+ return
+ file = os.path.join(self.path, '{0}.collector.lock'.format(name))
+ lock = filelock.FileLock(file)
+ lock.acquire(timeout=0)
+ self.locks[name] = lock
+
+ def unregister(self, name):
+ name = self.rename(name)
+ if name not in self.locks:
+ return
+ lock = self.locks[name]
+ lock.release()
+ del self.locks[name]
+
+
+class DummyRegistry:
+ def register(self, name):
+ pass
+
+ def unregister(self, name):
+ pass
+
+
+class Plugin:
+ config_name = 'python.d.conf'
+ jobs_status_dump_name = 'pythond-jobs-statuses.json'
+
+ def __init__(self, modules_to_run, min_update_every, registry):
+ self.modules_to_run = modules_to_run
+ self.min_update_every = min_update_every
+ self.config = PluginConfig(PLUGIN_BASE_CONF)
+ self.log = PythonDLogger()
+ self.registry = registry
+ self.started_jobs = collections.defaultdict(dict)
+ self.jobs = list()
+ self.saver = None
+ self.runs = 0
+
+ def load_config_file(self, filepath, expected):
+ self.log.debug("looking for '{0}'".format(filepath))
+ if not os.path.isfile(filepath):
+ log = self.log.info if not expected else self.log.error
+ log("'{0}' was not found".format(filepath))
+ return dict()
+ try:
+ config = load_config(filepath)
+ except Exception as error:
+ self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
+ return dict()
+ self.log.debug("'{0}' is loaded".format(filepath))
+ return config
+
+ def load_config(self):
+ user_config = self.load_config_file(
+ filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
+ expected=False,
+ )
+ stock_config = self.load_config_file(
+ filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
+ expected=True,
+ )
+ self.config.update(stock_config)
+ self.config.update(user_config)
+
+ def load_job_statuses(self):
+ self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
+ abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
+ if not abs_path:
+ self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
+ return
+
+ self.log.debug("loading '{0}'".format(abs_path))
+ try:
+ statuses = JobsStatuses().from_file(abs_path)
+ except Exception as error:
+ self.log.error("'{0}' invalid JSON format: {1}".format(
+ abs_path, ' '.join([v.strip() for v in str(error).split('\n')])))
+ return None
+ self.log.debug("'{0}' is loaded".format(abs_path))
+ return statuses
+
+ def create_jobs(self, job_statuses=None):
+ paths = [
+ DIRS.modules_user_config,
+ DIRS.modules_stock_config,
+ ]
+
+ builder = JobsConfigsBuilder(paths)
+ builder.job_defaults = JOB_BASE_CONF
+ builder.min_update_every = self.min_update_every
+
+ jobs = list()
+ for m in self.modules_to_run:
+ if not self.config.is_module_enabled(m.name):
+ self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name))
+ continue
+
+ src = ModuleSrc(m)
+ try:
+ src.load()
+ except Exception as error:
+ self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error)))
+ continue
+ self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath))
+
+ if not (src.service() and callable(src.service())):
+ self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name))
+ continue
+
+ if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name):
+ self.log.info("[{0}] is disabled by default, skipping it".format(m.name))
+ continue
+
+ builder.module_defaults = src.defaults()
+ configs = builder.build(m.name)
+ if not configs:
+ self.log.info("[{0}] has no job configs, skipping it".format(m.name))
+ continue
+
+ for config in configs:
+ config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
+ config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
+
+ job = Job(src.service(), m.name, config)
+
+ was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
+ if was_previously_active and job.autodetection_retry == 0:
+ self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
+ job.module_name, job.real_name))
+ job.checks = 11
+ job.autodetection_retry = 30
+
+ jobs.append(job)
+
+ return jobs
+
+ def setup(self):
+ self.load_config()
+
+ if not self.config['enabled']:
+ self.log.info('disabled in the configuration file')
+ return False
+
+ statuses = self.load_job_statuses()
+
+ self.jobs = self.create_jobs(statuses)
+ if not self.jobs:
+ self.log.info('no jobs to run')
+ return False
+
+ if not IS_ATTY:
+ abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
+ self.saver = CachedFileSaver(abs_path)
+ return True
+
+ def start_jobs(self, *jobs):
+ for job in jobs:
+ if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
+ continue
+
+ if job.actual_name in self.started_jobs[job.module_name]:
+ self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
+ job.module_name, job.real_name))
+ job.status = JOB_STATUS_DROPPED
+ continue
+
+ if not job.is_inited():
+ try:
+ job.init()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
+ job.module_name, job.real_name, repr(error)))
+ job.status = JOB_STATUS_DROPPED
+ continue
+
+ try:
+ ok = job.check()
+ except Exception as error:
+ if not job.is_stock:
+ self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
+ job.module_name, job.real_name, repr(error)))
+ job.status = JOB_STATUS_DROPPED
+ continue
+ if not ok:
+ if not job.is_stock:
+ self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
+ job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
+ continue
+ self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
+
+ try:
+ self.registry.register(job.full_name())
+ except filelock.Timeout as error:
+ self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+ except Exception as error:
+ self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+
+ try:
+ job.create()
+ except Exception as error:
+ self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
+ job.module_name, job.real_name, repr(error)))
+ job.status = JOB_STATUS_DROPPED
+ try:
+ self.registry.unregister(job.full_name())
+ except Exception as error:
+ self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
+ job.module_name, job.real_name, error))
+ continue
+
+ self.started_jobs[job.module_name] = job.actual_name
+ job.status = JOB_STATUS_ACTIVE
+ job.start()
+
+ @staticmethod
+ def keep_alive():
+ if not IS_ATTY:
+ safe_print('\n')
+
+ def garbage_collection(self):
+ if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
+ v = gc.collect()
+ self.log.debug('GC collection run result: {0}'.format(v))
+
+ def restart_recovering_jobs(self):
+ for job in self.jobs:
+ if job.status != JOB_STATUS_RECOVERING:
+ continue
+ if self.runs % job.autodetection_retry != 0:
+ continue
+ self.start_jobs(job)
+
+ def cleanup_jobs(self):
+ self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
+
+ def have_alive_jobs(self):
+ return next(
+ (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
+ False,
+ )
+
+ def save_job_statuses(self):
+ if self.saver is None:
+ return
+ if self.runs % 10 != 0:
+ return
+ dump = JobsStatuses().from_jobs(self.jobs).dump()
+ try:
+ self.saver.save(dump)
+ except Exception as error:
+ self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
+
+ def serve_once(self):
+ if not self.have_alive_jobs():
+ self.log.info('no jobs to serve')
+ return False
+
+ time.sleep(1)
+ self.runs += 1
+
+ self.keep_alive()
+ self.garbage_collection()
+ self.cleanup_jobs()
+ self.restart_recovering_jobs()
+ self.save_job_statuses()
+ return True
+
+ def serve(self):
+ while self.serve_once():
+ pass
+
+ def run(self):
+ self.start_jobs(*self.jobs)
+ self.serve()
+
+
+def parse_command_line():
+ opts = sys.argv[:][1:]
+
+ debug = False
+ trace = False
+ nolock = False
+ update_every = 1
+ modules_to_run = list()
+
+ def find_first_positive_int(values):
+ return next((v for v in values if v.isdigit() and int(v) >= 1), None)
+
+ u = find_first_positive_int(opts)
+ if u is not None:
+ update_every = int(u)
+ opts.remove(u)
+ if 'debug' in opts:
+ debug = True
+ opts.remove('debug')
+ if 'trace' in opts:
+ trace = True
+ opts.remove('trace')
+ if 'nolock' in opts:
+ nolock = True
+ opts.remove('nolock')
+ if opts:
+ modules_to_run = list(opts)
+
+ cmd = collections.namedtuple(
+ 'CMD',
+ [
+ 'update_every',
+ 'debug',
+ 'trace',
+ 'nolock',
+ 'modules_to_run',
+ ])
+ return cmd(
+ update_every,
+ debug,
+ trace,
+ nolock,
+ modules_to_run,
+ )
+
+
+def guess_module(modules, *names):
+ def guess(n):
+ found = None
+ for i, _ in enumerate(n):
+ cur = [x for x in modules if x.startswith(name[:i + 1])]
+ if not cur:
+ return found
+ found = cur
+ return found
+
+ guessed = list()
+ for name in names:
+ name = name.lower()
+ m = guess(name)
+ if m:
+ guessed.extend(m)
+ return sorted(set(guessed))
+
+
+def disable():
+ if not IS_ATTY:
+ safe_print('DISABLE')
+ exit(0)
+
+
+def get_modules_to_run(cmd):
+ if not cmd.modules_to_run:
+ return AVAILABLE_MODULES
+
+ modules_to_run, seen = list(), set()
+ for m in AVAILABLE_MODULES:
+ if m.name not in cmd.modules_to_run or m.name in seen:
+ continue
+ seen.add(m.name)
+ modules_to_run.append(m)
+
+ return modules_to_run
+
+
+def main():
+ cmd = parse_command_line()
+ log = PythonDLogger()
+
+ level = os.getenv('NETDATA_LOG_LEVEL') or str()
+ level = level.lower()
+ if level == 'debug':
+ log.logger.severity = 'DEBUG'
+ elif level == 'info':
+ log.logger.severity = 'INFO'
+ elif level == 'warn' or level == 'warning':
+ log.logger.severity = 'WARNING'
+ elif level == 'err' or level == 'error':
+ log.logger.severity = 'ERROR'
+
+ if cmd.debug:
+ log.logger.severity = 'DEBUG'
+ if cmd.trace:
+ log.log_traceback = True
+
+ log.info('using python v{0}'.format(PY_VERSION[0]))
+
+ if DIRS.locks and not cmd.nolock:
+ registry = FileLockRegistry(DIRS.locks)
+ else:
+ registry = DummyRegistry()
+
+ unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES])
+ unknown = set(cmd.modules_to_run) - unique_avail_module_names
+ if unknown:
+ log.error('unknown modules : {0}'.format(sorted(list(unknown))))
+ guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run)
+ if guessed:
+ log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
+ return
+
+ p = Plugin(
+ get_modules_to_run(cmd),
+ cmd.update_every,
+ registry,
+ )
+
+ # cheap attempt to reduce chance of python.d job running before go.d
+ # TODO: better implementation needed
+ if not IS_ATTY:
+ time.sleep(1.5)
+
+ try:
+ if not p.setup():
+ return
+ p.run()
+ except KeyboardInterrupt:
+ pass
+ log.info('exiting from main...')
+
+
+if __name__ == "__main__":
+ main()
+ disable()