summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python.d.plugin.in
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/python.d.plugin.in')
-rw-r--r--collectors/python.d.plugin/python.d.plugin.in86
1 files changed, 77 insertions, 9 deletions
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
index 44b6671cb..9d575d86f 100644
--- a/collectors/python.d.plugin/python.d.plugin.in
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -42,18 +42,17 @@ except ImportError:
PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
-
if PY_VERSION > (3, 1):
from importlib.machinery import SourceFileLoader
else:
from imp import load_source as SourceFileLoader
-
ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
+ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
def add_pythond_packages():
@@ -65,10 +64,10 @@ def add_pythond_packages():
add_pythond_packages()
-
from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
+from third_party import filelock
try:
from collections import OrderedDict
@@ -93,6 +92,10 @@ def dirs():
ENV_NETDATA_PLUGINS_DIR,
os.path.dirname(__file__),
)
+ locks = os.getenv(
+ ENV_NETDATA_LOCK_DIR,
+ os.path.join('@varlibdir_POST@', 'lock')
+ )
modules_user_config = os.path.join(plugin_user_config, 'python.d')
modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
modules = os.path.abspath(pluginsd + '/../python.d')
@@ -106,6 +109,7 @@ def dirs():
'modules_stock_config',
'modules',
'var_lib',
+ 'locks',
]
)
return Dirs(
@@ -115,6 +119,7 @@ def dirs():
modules_stock_config,
modules,
var_lib,
+ locks,
)
@@ -173,7 +178,7 @@ def multi_path_find(name, *paths):
def load_module(name):
abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
- module = SourceFileLoader(name, abs_path)
+ module = SourceFileLoader('pythond_' + name, abs_path)
if isinstance(module, types.ModuleType):
return module
return module.load_module()
@@ -307,6 +312,9 @@ class Job(threading.Thread):
def init(self):
self.job = self.service(configuration=copy.deepcopy(self.config))
+ def full_name(self):
+ return self.job.name
+
def check(self):
ok = self.job.check()
self.checks -= self.checks != self.inf and not ok
@@ -448,15 +456,45 @@ class PluginConfig(dict):
return self['default_run']
+class FileLockRegistry:
+ def __init__(self, path):
+ self.path = path
+ self.locks = dict()
+
+ def register(self, name):
+ if name in self.locks:
+ return
+ file = os.path.join(self.path, '{0}.collector.lock'.format(name))
+ lock = filelock.FileLock(file)
+ lock.acquire(timeout=0)
+ self.locks[name] = lock
+
+ def unregister(self, name):
+ if name not in self.locks:
+ return
+ lock = self.locks[name]
+ lock.release()
+ del self.locks[name]
+
+
+class DummyRegistry:
+ def register(self, name):
+ pass
+
+ def unregister(self, name):
+ pass
+
+
class Plugin:
config_name = 'python.d.conf'
jobs_status_dump_name = 'pythond-jobs-statuses.json'
- def __init__(self, modules_to_run, min_update_every):
+ def __init__(self, modules_to_run, min_update_every, registry):
self.modules_to_run = modules_to_run
self.min_update_every = min_update_every
self.config = PluginConfig(PLUGIN_BASE_CONF)
self.log = PythonDLogger()
+ self.registry = registry
self.started_jobs = collections.defaultdict(dict)
self.jobs = list()
self.saver = None
@@ -590,7 +628,7 @@ class Plugin:
job.init()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
+ job.module_name, job.real_name, repr(error)))
job.status = JOB_STATUS_DROPPED
continue
@@ -598,7 +636,7 @@ class Plugin:
ok = job.check()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
+ job.module_name, job.real_name, repr(error)))
job.status = JOB_STATUS_DROPPED
continue
if not ok:
@@ -608,11 +646,29 @@ class Plugin:
self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
try:
+ self.registry.register(job.full_name())
+ except filelock.Timeout as error:
+ self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+ except Exception as error:
+ self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+
+ try:
job.create()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
+ job.module_name, job.real_name, repr(error)))
job.status = JOB_STATUS_DROPPED
+ try:
+ self.registry.unregister(job.full_name())
+ except Exception as error:
+ self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
+ job.module_name, job.real_name, error))
continue
self.started_jobs[job.module_name] = job.actual_name
@@ -686,6 +742,7 @@ def parse_command_line():
debug = False
trace = False
+ nolock = False
update_every = 1
modules_to_run = list()
@@ -702,6 +759,9 @@ def parse_command_line():
if 'trace' in opts:
trace = True
opts.remove('trace')
+ if 'nolock' in opts:
+ nolock = True
+ opts.remove('nolock')
if opts:
modules_to_run = list(opts)
@@ -711,13 +771,15 @@ def parse_command_line():
'update_every',
'debug',
'trace',
+ 'nolock',
'modules_to_run',
])
return cmd(
update_every,
debug,
trace,
- modules_to_run
+ nolock,
+ modules_to_run,
)
@@ -765,9 +827,15 @@ def main():
log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
return
+ if DIRS.locks and not cmd.nolock:
+ registry = FileLockRegistry(DIRS.locks)
+ else:
+ registry = DummyRegistry()
+
p = Plugin(
cmd.modules_to_run or AVAILABLE_MODULES,
cmd.update_every,
+ registry,
)
try: