diff options
Diffstat (limited to '')
-rw-r--r-- | collectors/python.d.plugin/python.d.plugin.in | 86 |
1 files changed, 77 insertions, 9 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in index 44b6671cb..9d575d86f 100644 --- a/collectors/python.d.plugin/python.d.plugin.in +++ b/collectors/python.d.plugin/python.d.plugin.in @@ -42,18 +42,17 @@ except ImportError: PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0) - if PY_VERSION > (3, 1): from importlib.machinery import SourceFileLoader else: from imp import load_source as SourceFileLoader - ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR' ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR' ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR' ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR' ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY' +ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR' def add_pythond_packages(): @@ -65,10 +64,10 @@ def add_pythond_packages(): add_pythond_packages() - from bases.collection import safe_print from bases.loggers import PythonDLogger from bases.loaders import load_config +from third_party import filelock try: from collections import OrderedDict @@ -93,6 +92,10 @@ def dirs(): ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__), ) + locks = os.getenv( + ENV_NETDATA_LOCK_DIR, + os.path.join('@varlibdir_POST@', 'lock') + ) modules_user_config = os.path.join(plugin_user_config, 'python.d') modules_stock_config = os.path.join(plugin_stock_config, 'python.d') modules = os.path.abspath(pluginsd + '/../python.d') @@ -106,6 +109,7 @@ def dirs(): 'modules_stock_config', 'modules', 'var_lib', + 'locks', ] ) return Dirs( @@ -115,6 +119,7 @@ def dirs(): modules_stock_config, modules, var_lib, + locks, ) @@ -173,7 +178,7 @@ def multi_path_find(name, *paths): def load_module(name): abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX)) - module = SourceFileLoader(name, abs_path) + module = SourceFileLoader('pythond_' + name, abs_path) if isinstance(module, types.ModuleType): return module return module.load_module() @@ -307,6 +312,9 @@ class Job(threading.Thread): def init(self): self.job = self.service(configuration=copy.deepcopy(self.config)) + def full_name(self): + return self.job.name + def check(self): ok = self.job.check() self.checks -= self.checks != self.inf and not ok @@ -448,15 +456,45 @@ class PluginConfig(dict): return self['default_run'] +class FileLockRegistry: + def __init__(self, path): + self.path = path + self.locks = dict() + + def register(self, name): + if name in self.locks: + return + file = os.path.join(self.path, '{0}.collector.lock'.format(name)) + lock = filelock.FileLock(file) + lock.acquire(timeout=0) + self.locks[name] = lock + + def unregister(self, name): + if name not in self.locks: + return + lock = self.locks[name] + lock.release() + del self.locks[name] + + +class DummyRegistry: + def register(self, name): + pass + + def unregister(self, name): + pass + + class Plugin: config_name = 'python.d.conf' jobs_status_dump_name = 'pythond-jobs-statuses.json' - def __init__(self, modules_to_run, min_update_every): + def __init__(self, modules_to_run, min_update_every, registry): self.modules_to_run = modules_to_run self.min_update_every = min_update_every self.config = PluginConfig(PLUGIN_BASE_CONF) self.log = PythonDLogger() + self.registry = registry self.started_jobs = collections.defaultdict(dict) self.jobs = list() self.saver = None @@ -590,7 +628,7 @@ class Plugin: job.init() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue @@ -598,7 +636,7 @@ class Plugin: ok = job.check() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED continue if not ok: @@ -608,11 +646,29 @@ class Plugin: self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name)) try: + self.registry.register(job.full_name()) + except filelock.Timeout as error: + self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + except Exception as error: + self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format( + job.module_name, job.real_name, error)) + job.status = JOB_STATUS_DROPPED + continue + + try: job.create() except Exception as error: self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format( - job.module_name, job.real_name, repr(error))) + job.module_name, job.real_name, repr(error))) job.status = JOB_STATUS_DROPPED + try: + self.registry.unregister(job.full_name()) + except Exception as error: + self.log.warning('{0}[{1}] : deregistration failed: {2}'.format( + job.module_name, job.real_name, error)) continue self.started_jobs[job.module_name] = job.actual_name @@ -686,6 +742,7 @@ def parse_command_line(): debug = False trace = False + nolock = False update_every = 1 modules_to_run = list() @@ -702,6 +759,9 @@ def parse_command_line(): if 'trace' in opts: trace = True opts.remove('trace') + if 'nolock' in opts: + nolock = True + opts.remove('nolock') if opts: modules_to_run = list(opts) @@ -711,13 +771,15 @@ def parse_command_line(): 'update_every', 'debug', 'trace', + 'nolock', 'modules_to_run', ]) return cmd( update_every, debug, trace, - modules_to_run + nolock, + modules_to_run, ) @@ -765,9 +827,15 @@ def main(): log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1))) return + if DIRS.locks and not cmd.nolock: + registry = FileLockRegistry(DIRS.locks) + else: + registry = DummyRegistry() + p = Plugin( cmd.modules_to_run or AVAILABLE_MODULES, cmd.update_every, + registry, ) try: |