summaryrefslogtreecommitdiffstats
path: root/plugins.d/python.d.plugin
diff options
context:
space:
mode:
authorFederico Ceratto <federico.ceratto@gmail.com>2017-12-19 23:39:21 +0000
committerFederico Ceratto <federico.ceratto@gmail.com>2017-12-19 23:39:21 +0000
commit61aedf201c2c4bf0e5aa4db32e74f4d860b88593 (patch)
treebcf4f9a0cd8bc2daf38b2ff9f29bfcc1e5ed8968 /plugins.d/python.d.plugin
parentNew upstream version 1.8.0+dfsg (diff)
downloadnetdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.tar.xz
netdata-61aedf201c2c4bf0e5aa4db32e74f4d860b88593.zip
New upstream version 1.9.0+dfsgupstream/1.9.0+dfsg
Diffstat (limited to '')
-rwxr-xr-xplugins.d/python.d.plugin869
1 files changed, 326 insertions, 543 deletions
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin
index 03c156f4..855080e8 100755
--- a/plugins.d/python.d.plugin
+++ b/plugins.d/python.d.plugin
@@ -1,599 +1,382 @@
#!/usr/bin/env bash
-'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
-# -*- coding: utf-8 -*-
+'''':; exec "$(command -v python || command -v python3 || command -v python2 ||
+echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
-# Description: netdata python modules supervisor
+# -*- coding: utf-8 -*-
+# Description:
# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
import os
import sys
-import time
import threading
+
from re import sub
+from sys import version_info, argv
+from time import sleep
+
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
+
+PY_VERSION = version_info[:2]
+PLUGIN_CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', os.path.dirname(__file__) + '/../../../../etc/netdata') + '/'
+CHARTS_PY_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + '/../python.d') + '/'
+CHARTS_PY_CONFIG_DIR = PLUGIN_CONFIG_DIR + 'python.d/'
+PYTHON_MODULES_DIR = CHARTS_PY_DIR + 'python_modules'
+
+sys.path.append(PYTHON_MODULES_DIR)
+
+from bases.loaders import ModuleAndConfigLoader
+from bases.loggers import PythonDLogger
+from bases.collection import setdefault_values, run_and_exit
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
-# -----------------------------------------------------------------------------
-# globals & environment setup
-# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
-MODULE_EXTENSION = ".chart.py"
BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
- 'priority': 90000,
- 'retries': 10}
+ 'retries': 60,
+ 'priority': 60000,
+ 'autodetection_retry': 0,
+ 'chart_cleanup': 10,
+ 'name': str()}
-MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + "/../python.d") + "/"
-CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR',
- os.path.dirname(__file__) + "/../../../../etc/netdata")
+MODULE_EXTENSION = '.chart.py'
+OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log']
-# directories should end with '/'
-if CONFIG_DIR[-1] != "/":
- CONFIG_DIR += "/"
-sys.path.append(MODULES_DIR + "python_modules")
-PROGRAM = os.path.basename(__file__).replace(".plugin", "")
-DEBUG_FLAG = False
-TRACE_FLAG = False
-OVERRIDE_UPDATE_EVERY = False
+def module_ok(m):
+ return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
-# -----------------------------------------------------------------------------
-# custom, third party and version specific python modules management
-import msg
-try:
- assert sys.version_info >= (3, 1)
- import importlib.machinery
- PY_VERSION = 3
- # change this hack below if we want PY_VERSION to be used in modules
- # import builtins
- # builtins.PY_VERSION = 3
- msg.info('Using python v3')
-except (AssertionError, ImportError):
- try:
- import imp
-
- # change this hack below if we want PY_VERSION to be used in modules
- # import __builtin__
- # __builtin__.PY_VERSION = 2
- PY_VERSION = 2
- msg.info('Using python v2')
- except ImportError:
- msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
-# try:
-# import yaml
-# except ImportError:
-# msg.fatal('Cannot find yaml library')
-try:
- if PY_VERSION == 3:
- import pyyaml3 as yaml
- else:
- import pyyaml2 as yaml
-except ImportError:
- msg.fatal('Cannot find yaml library')
+ALL_MODULES = [m for m in sorted(os.listdir(CHARTS_PY_DIR)) if module_ok(m)]
-try:
- from collections import OrderedDict
- ORDERED = True
- DICT = OrderedDict
- msg.info('YAML output is ordered')
-except ImportError:
- try:
- from ordereddict import OrderedDict
- ORDERED = True
- DICT = OrderedDict
- msg.info('YAML output is ordered')
- except ImportError:
- ORDERED = False
- DICT = dict
- msg.info('YAML output is unordered')
-if ORDERED:
- def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
- class OrderedLoader(Loader):
- pass
-
- def construct_mapping(loader, node):
- loader.flatten_mapping(node)
- return object_pairs_hook(loader.construct_pairs(node))
- OrderedLoader.add_constructor(
- yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
- construct_mapping)
- return yaml.load(stream, OrderedLoader)
-
-
-class PythonCharts(object):
- """
- Main class used to control every python module.
- """
-
- def __init__(self,
- modules=None,
- modules_path='../python.d/',
- modules_configs='../conf.d/',
- modules_disabled=None,
- modules_enabled=None,
- default_run=None):
+
+def parse_cmd():
+ debug = 'debug' in argv[1:]
+ trace = 'trace' in argv[1:]
+ override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
+ modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
+ return debug, trace, override_update_every, modules or ALL_MODULES
+
+
+def multi_job_check(config):
+ return next((True for key in config if isinstance(config[key], dict)), False)
+
+
+class Job(object):
+ def __init__(self, initialized_job, job_id):
"""
- :param modules: list
- :param modules_path: str
- :param modules_configs: str
- :param modules_disabled: list
- :param modules_enabled: list
- :param default_run: bool
+ :param initialized_job: instance of <Class Service>
+ :param job_id: <str>
"""
+ self.job = initialized_job
+ self.id = job_id # key in Modules.jobs()
+ self.module_name = self.job.__module__ # used in Plugin.delete_job()
+ self.recheck_every = self.job.configuration.pop('autodetection_retry')
+ self.checked = False # used in Plugin.check_job()
+ self.created = False # used in Plugin.create_job_charts()
+ if OVERRIDE_UPDATE_EVERY:
+ self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
- if modules is None:
- modules = []
- if modules_disabled is None:
- modules_disabled = []
+ def __getattr__(self, item):
+ return getattr(self.job, item)
- self.first_run = True
- # set configuration directory
- self.configs = modules_configs
+ def __repr__(self):
+ return self.job.__repr__()
- # load modules
- loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run)
+ def is_dead(self):
+ return bool(self.ident) and not self.is_alive()
- # load configuration files
- configured_modules = self._load_configs(loaded_modules)
+ def not_launched(self):
+ return not bool(self.ident)
- # good economy and prosperity:
- self.jobs = self._create_jobs(configured_modules) # type <list>
+ def is_autodetect(self):
+ return self.recheck_every
- # enable timetable override like `python.d.plugin mysql debug 1`
- if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
- for job in self.jobs:
- job.create_timetable(BASE_CONFIG['update_every'])
- @staticmethod
- def _import_module(path, name=None):
+class Module(object):
+ def __init__(self, service, config):
"""
- Try to import module using only its path.
- :param path: str
- :param name: str
- :return: object
+ :param service: <Module>
+ :param config: <dict>
"""
+ self.service = service
+ self.name = service.__name__
+ self.config = self.jobs_configurations_builder(config)
+ self.jobs = OrderedDict()
+ self.counter = 1
- if name is None:
- name = path.split('/')[-1]
- if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
- return None
- name = name[:-len(MODULE_EXTENSION)]
- try:
- if PY_VERSION == 3:
- return importlib.machinery.SourceFileLoader(name, path).load_module()
- else:
- return imp.load_source(name, path)
- except Exception as e:
- msg.error("Problem loading", name, str(e))
- return None
+ self.initialize_jobs()
+
+ def __repr__(self):
+ return "<Class Module '{name}'>".format(name=self.name)
+
+ def __iter__(self):
+ return iter(OrderedDict(self.jobs).values())
+
+ def __getitem__(self, item):
+ return self.jobs[item]
+
+ def __delitem__(self, key):
+ del self.jobs[key]
+
+ def __len__(self):
+ return len(self.jobs)
+
+ def __bool__(self):
+ return bool(self.jobs)
- def _load_modules(self, path, modules, disabled, enabled, default_run):
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def jobs_configurations_builder(self, config):
"""
- Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
- :param path: str
- :param modules: list
- :param disabled: list
- :return: list
+ :param config: <dict>
+ :return:
"""
+ counter = 0
+ job_base_config = dict()
- # check if plugin directory exists
- if not os.path.isdir(path):
- msg.fatal("cannot find charts directory ", path)
-
- # load modules
- loaded = []
- if len(modules) > 0:
- for m in modules:
- if m in disabled:
- continue
- mod = self._import_module(path + m + MODULE_EXTENSION)
- if mod is not None:
- loaded.append(mod)
- else: # exit if plugin is not found
- msg.fatal('no modules found.')
- else:
- # scan directory specified in path and load all modules from there
- if default_run is False:
- names = [module for module in os.listdir(path) if module[:-9] in enabled]
- else:
- names = os.listdir(path)
- for mod in names:
- if mod.replace(MODULE_EXTENSION, "") in disabled:
- msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
- continue
- m = self._import_module(path + mod)
- if m is not None:
- msg.debug(mod + ": loading module '" + path + mod + "'")
- loaded.append(m)
- return loaded
-
- def _load_configs(self, modules):
+ for attr in BASE_CONFIG:
+ job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
+
+ if not config:
+ config = {str(): dict()}
+ elif not multi_job_check(config):
+ config = {str(): config}
+
+ for job_name in config:
+ if not isinstance(config[job_name], dict):
+ continue
+
+ job_config = setdefault_values(config[job_name], base_dict=job_base_config)
+ job_name = sub(r'\s+', '_', job_name)
+ config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
+ counter += 1
+ job_id = 'job' + str(counter).zfill(3)
+
+ yield job_id, job_name, job_config
+
+ def initialize_jobs(self):
"""
- Append configuration in list named `config` to every module.
- For multi-job modules `config` list is created in _parse_config,
- otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
- :param modules: list
- :return: list
+ :return:
"""
- for mod in modules:
- configfile = self.configs + mod.__name__ + ".conf"
- if os.path.isfile(configfile):
- msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
- try:
- if not hasattr(mod, 'config'):
- mod.config = {}
- setattr(mod,
- 'config',
- self._parse_config(mod, read_config(configfile)))
- except Exception as e:
- msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+ for job_id, job_name, job_config in self.config:
+ job_config['job_name'] = job_name
+ job_config['override_name'] = job_config.pop('name')
+
+ try:
+ initialized_job = self.service.Service(configuration=job_config)
+ except Exception as error:
+ Logger.error("job initialization: '{module_name} {job_name}' "
+ "=> ['FAILED'] ({error})".format(module_name=self.name,
+ job_name=job_name,
+ error=error))
+ continue
+ else:
+ Logger.debug("job initialization: '{module_name} {job_name}' "
+ "=> ['OK']".format(module_name=self.name,
+ job_name=job_name or self.name))
+ self.jobs[job_id] = Job(initialized_job=initialized_job,
+ job_id=job_id)
+ del self.config
+ del self.service
+
+
+class Plugin(object):
+ def __init__(self):
+ self.loader = ModuleAndConfigLoader()
+ self.modules = OrderedDict()
+ self.sleep_time = 1
+ self.runs_counter = 0
+ self.config, error = self.loader.load_config_from_file(PLUGIN_CONFIG_DIR + 'python.d.conf')
+ if error:
+ run_and_exit(Logger.error)(error)
+
+ if not self.config.get('enabled', True):
+ run_and_exit(Logger.info)('DISABLED in configuration file.')
+
+ self.load_and_initialize_modules()
+ if not self.modules:
+ run_and_exit(Logger.info)('No modules to run. Exit...')
+
+ def __iter__(self):
+ return iter(OrderedDict(self.modules).values())
+
+ @property
+ def jobs(self):
+ return (job for mod in self for job in mod)
+
+ @property
+ def dead_jobs(self):
+ return (job for job in self.jobs if job.is_dead())
+
+ @property
+ def autodetect_jobs(self):
+ return [job for job in self.jobs if job.not_launched()]
+
+ def enabled_modules(self):
+ for mod in MODULES_TO_RUN:
+ mod_name = mod[:-len(MODULE_EXTENSION)]
+ mod_path = CHARTS_PY_DIR + mod
+ conf_path = ''.join([CHARTS_PY_CONFIG_DIR, mod_name, '.conf'])
+
+ if DEBUG:
+ yield mod, mod_name, mod_path, conf_path
else:
- msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
- # set config if not found
- if not hasattr(mod, 'config'):
- msg.debug(mod.__name__ + ": setting configuration for only one job")
- mod.config = {None: {}}
- for var in BASE_CONFIG:
- try:
- mod.config[None][var] = getattr(mod, var)
- except AttributeError:
- mod.config[None][var] = BASE_CONFIG[var]
- return modules
+ if all([self.config.get('default_run', True),
+ self.config.get(mod_name, True)]):
+ yield mod, mod_name, mod_path, conf_path
+
+ elif all([not self.config.get('default_run'),
+ self.config.get(mod_name)]):
+ yield mod, mod_name, mod_path, conf_path
+
+ def load_and_initialize_modules(self):
+ for mod, mod_name, mod_path, conf_path in self.enabled_modules():
+
+ # Load module from file ------------------------------------------------------------
+ loaded_module, error = self.loader.load_module_from_file(mod_name, mod_path)
+ log = Logger.error if error else Logger.debug
+ log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
+ module_name=mod_name))
+ if error:
+ Logger.error("load source error : {0}".format(error))
+ continue
+
+ # Load module config from file ------------------------------------------------------
+ loaded_config, error = self.loader.load_config_from_file(conf_path)
+ log = Logger.error if error else Logger.debug
+ log("module load config: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
+ module_name=mod_name))
+ if error:
+ Logger.error('load config error : {0}'.format(error))
+
+ # Service instance initialization ---------------------------------------------------
+ initialized_module = Module(service=loaded_module, config=loaded_config)
+ Logger.debug("module status: '{module_name}' => [{status}] "
+ "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
+ module_name=initialized_module.name,
+ jobs_number=len(initialized_module)))
+
+ if initialized_module:
+ self.modules[initialized_module.name] = initialized_module
@staticmethod
- def _parse_config(module, config):
+ def check_job(job):
"""
- Parse configuration file or extract configuration from module file.
- Example of returned dictionary:
- config = {'name': {
- 'update_every': 2,
- 'retries': 3,
- 'priority': 30000
- 'other_val': 123}}
- :param module: object
- :param config: dict
- :return: dict
+ :param job: <Job>
+ :return:
"""
- if config is None:
- config = {}
- # get default values
- defaults = {}
- msg.debug(module.__name__ + ": reading configuration")
- for key in BASE_CONFIG:
- try:
- # get defaults from module config
- defaults[key] = int(config.pop(key))
- except (KeyError, ValueError):
- try:
- # get defaults from module source code
- defaults[key] = getattr(module, key)
- except (KeyError, ValueError, AttributeError):
- # if above failed, get defaults from global dict
- defaults[key] = BASE_CONFIG[key]
-
- # check if there are dict in config dict
- many_jobs = False
- for name in config:
- if isinstance(config[name], DICT):
- many_jobs = True
- break
-
- # assign variables needed by supervisor to every job configuration
- if many_jobs:
- for name in config:
- for key in defaults:
- if key not in config[name]:
- config[name][key] = defaults[key]
- # if only one job is needed, values doesn't have to be in dict (in YAML)
+ try:
+ check_ok = bool(job.check())
+ except Exception as error:
+ job.error('check() unhandled exception: {error}'.format(error=error))
+ return None
else:
- config = {None: config.copy()}
- config[None].update(defaults)
-
- # return dictionary of jobs where every job has BASE_CONFIG variables
- return config
+ return check_ok
@staticmethod
- def _create_jobs(modules):
- """
- Create jobs based on module.config dictionary and module.Service class definition.
- :param modules: list
- :return: list
+ def create_job_charts(job):
"""
- jobs = []
- for module in modules:
- for name in module.config:
- # register a new job
- conf = module.config[name]
- try:
- job = module.Service(configuration=conf, name=name)
- except Exception as e:
- msg.error(module.__name__ +
- ("/" + str(name) if name is not None else "") +
- ": cannot start job: '" +
- str(e))
- continue
- else:
- # set chart_name (needed to plot run time graphs)
- job.chart_name = module.__name__
- if name is not None:
- job.chart_name += "_" + name
- jobs.append(job)
- msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
-
- return [j for j in jobs if j is not None]
-
- def _stop(self, job, reason=None):
+ :param job: <Job>
+ :return:
"""
- Stop specified job and remove it from self.jobs list
- Also notifies user about job failure if DEBUG_FLAG is set
- :param job: object
- :param reason: str
- """
- prefix = job.__module__
- if job.name is not None and len(job.name) != 0:
- prefix += "/" + job.name
try:
- msg.error("DISABLED:", prefix)
- self.jobs.remove(job)
- except Exception as e:
- msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
-
- # TODO remove section below and remove `reason`.
- prefix += ": "
- if reason is None:
- return
- elif reason[:3] == "no ":
- msg.error(prefix +
- "does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
- elif reason[:7] == "failed ":
- msg.error(prefix +
- reason[7:] +
- "() function reports failure.")
- elif reason[:13] == "configuration":
- msg.error(prefix +
- "configuration file '" +
- self.configs +
- job.__module__ +
- ".conf' not found. Using defaults.")
- elif reason[:11] == "misbehaving":
- msg.error(prefix + "is " + reason)
-
- def check(self):
- """
- Tries to execute check() on every job.
- This cannot fail thus it is catching every exception
- If job.check() fails job is stopped
- """
- i = 0
- overridden = []
- msg.debug("all job objects", str(self.jobs))
- while i < len(self.jobs):
- job = self.jobs[i]
- try:
- if not job.check():
- msg.error(job.chart_name, "check() failed - disabling job")
- self._stop(job)
- else:
- msg.info("CHECKED OK:", job.chart_name)
- i += 1
- try:
- if job.override_name is not None:
- new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
- if new_name in overridden:
- msg.info("DROPPED:", job.name, ", job '" + job.override_name +
- "' is already served by another job.")
- self._stop(job)
- i -= 1
- else:
- job.name = job.override_name
- msg.info("RENAMED:", new_name, ", from " + job.chart_name)
- job.chart_name = new_name
- overridden.append(job.chart_name)
- except Exception:
- pass
- except AttributeError as e:
- self._stop(job)
- msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
- msg.debug(str(e))
- except (UnboundLocalError, Exception) as e:
- msg.error(job.chart_name, str(e))
- self._stop(job)
- msg.debug("overridden job names:", str(overridden))
- msg.debug("all remaining job objects:", str(self.jobs))
-
- def create(self):
- """
- Tries to execute create() on every job.
- This cannot fail thus it is catching every exception.
- If job.create() fails job is stopped.
- This is also creating job run time chart.
- """
- i = 0
- while i < len(self.jobs):
- job = self.jobs[i]
- try:
- if not job.create():
- msg.error(job.chart_name, "create function failed.")
- self._stop(job)
- else:
- chart = job.chart_name
- sys.stdout.write(
- "CHART netdata.plugin_pythond_" +
- chart +
- " '' 'Execution time for " +
- chart +
- " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
- str(job.timetable['freq']) +
- '\n')
- sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
- msg.debug("created charts for", job.chart_name)
- # sys.stdout.flush()
- i += 1
- except AttributeError:
- msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
- self._stop(job)
- except (UnboundLocalError, Exception) as e:
- msg.error(job.chart_name, str(e))
- self._stop(job)
-
- def update(self):
+ create_ok = job.create()
+ except Exception as error:
+ job.error('create() unhandled exception: {error}'.format(error=error))
+ return False
+ else:
+ return create_ok
+
+ def delete_job(self, job):
"""
- Creates and supervises every job thread.
- This will stay forever and ever and ever forever and ever it'll be the one...
+ :param job: <Job>
+ :return:
"""
- for job in self.jobs:
- job.start()
+ del self.modules[job.module_name][job.id]
- while True:
- if threading.active_count() <= 1:
- msg.fatal("no more jobs")
- time.sleep(1)
-
-
-def read_config(path):
- """
- Read YAML configuration from specified file
- :param path: str
- :return: dict
- """
- try:
- with open(path, 'r') as stream:
- if ORDERED:
- config = ordered_load(stream, yaml.SafeLoader)
+ def run_check(self):
+ checked = list()
+ for job in self.jobs:
+ if job.name in checked:
+ job.info('check() => [DROPPED] (already served by another job)')
+ self.delete_job(job)
+ continue
+ ok = self.check_job(job)
+ if ok:
+ job.info('check() => [OK]')
+ checked.append(job.name)
+ job.checked = True
+ continue
+ if not job.is_autodetect() or ok is None:
+ job.error('check() => [FAILED]')
+ self.delete_job(job)
else:
- config = yaml.load(stream)
- except (OSError, IOError) as error:
- msg.error(str(path), 'reading error:', str(error))
- return None
- except yaml.YAMLError as error:
- msg.error(str(path), "is malformed:", str(error))
- return None
- return config
-
-
-def parse_cmdline(directory, *commands):
- """
- Parse parameters from command line.
- :param directory: str
- :param commands: list of str
- :return: dict
- """
- global DEBUG_FLAG, TRACE_FLAG
- global OVERRIDE_UPDATE_EVERY
- global BASE_CONFIG
-
- changed_update = False
- mods = []
- for cmd in commands[1:]:
- if cmd == "check":
- pass
- elif cmd == "debug" or cmd == "all":
- DEBUG_FLAG = True
- # redirect stderr to stdout?
- elif cmd == "trace" or cmd == "all":
- TRACE_FLAG = True
- elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
- # DEBUG_FLAG = True
- mods.append(cmd.replace(".chart.py", ""))
- else:
- try:
- BASE_CONFIG['update_every'] = int(cmd)
- changed_update = True
- except ValueError:
- pass
- if changed_update and DEBUG_FLAG:
- OVERRIDE_UPDATE_EVERY = True
- msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
-
- msg.debug("started from", commands[0], "with options:", *commands[1:])
-
- return mods
-
-
-# if __name__ == '__main__':
-def run():
- """
- Main program.
- """
- global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
-
- # read configuration file
- disabled = ['nginx_log', 'gunicorn_log', 'apache_cache']
- enabled = list()
- default_run = True
- configfile = CONFIG_DIR + "python.d.conf"
- msg.PROGRAM = PROGRAM
- msg.info("reading configuration file:", configfile)
- log_throttle = 200
- log_interval = 3600
-
- conf = read_config(configfile)
- if conf is not None:
- try:
- # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
- if conf['enabled'] is False:
- msg.fatal('disabled in configuration file.\n')
- except (KeyError, TypeError):
- pass
-
- try:
- for param in BASE_CONFIG:
- BASE_CONFIG[param] = conf[param]
- except (KeyError, TypeError):
- pass # use default update_every from NETDATA_UPDATE_EVERY
-
- try:
- DEBUG_FLAG = conf['debug']
- except (KeyError, TypeError):
- pass
-
- try:
- TRACE_FLAG = conf['trace']
- except (KeyError, TypeError):
- pass
-
- try:
- log_throttle = conf['logs_per_interval']
- except (KeyError, TypeError):
- pass
+ job.error('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
- try:
- log_interval = conf['log_interval']
- except (KeyError, TypeError):
- pass
+ def run_create(self):
+ for job in self.jobs:
+ if not job.checked:
+ # skip autodetection_retry jobs
+ continue
+ ok = self.create_job_charts(job)
+ if ok:
+ job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
+ job.created = True
+ continue
+ job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
+ self.delete_job(job)
- default_run = True if ('default_run' not in conf or conf.get('default_run')) else False
+ def start(self):
+ self.run_check()
+ self.run_create()
+ for job in self.jobs:
+ if job.created:
+ job.start()
- for k, v in conf.items():
- if k in ("update_every", "debug", "enabled", "default_run"):
- continue
- if default_run:
- if v is False:
- disabled.append(k)
- else:
- if v is True:
- enabled.append(k)
- # parse passed command line arguments
- modules = parse_cmdline(MODULES_DIR, *sys.argv)
- msg.DEBUG_FLAG = DEBUG_FLAG
- msg.TRACE_FLAG = TRACE_FLAG
- msg.LOG_THROTTLE = log_throttle
- msg.LOG_INTERVAL = log_interval
- msg.LOG_COUNTER = 0
- msg.LOG_NEXT_CHECK = 0
- msg.info("MODULES_DIR='" + MODULES_DIR +
- "', CONFIG_DIR='" + CONFIG_DIR +
- "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
- ", ONLY_MODULES=" + str(modules))
-
- # run plugins
- charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run)
- charts.check()
- charts.create()
- charts.update()
- msg.fatal("finished")
+ while True:
+ if threading.active_count() <= 1 and not self.autodetect_jobs:
+ run_and_exit(Logger.info)('FINISHED')
+
+ sleep(self.sleep_time)
+ self.cleanup()
+ self.autodetect_retry()
+
+ def cleanup(self):
+ for job in self.dead_jobs:
+ self.delete_job(job)
+ for mod in self:
+ if not mod:
+ del self.modules[mod.name]
+
+ def autodetect_retry(self):
+ self.runs_counter += self.sleep_time
+ for job in self.autodetect_jobs:
+ if self.runs_counter % job.recheck_every == 0:
+ checked = self.check_job(job)
+ if checked:
+ created = self.create_job_charts(job)
+ if not created:
+ self.delete_job(job)
+ continue
+ job.start()
if __name__ == '__main__':
- run()
+ DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
+ Logger = PythonDLogger()
+ if DEBUG:
+ Logger.logger.severity = 'DEBUG'
+ if TRACE:
+ Logger.log_traceback = True
+ Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
+
+ plugin = Plugin()
+ plugin.start()