summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py')
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py261
1 files changed, 261 insertions, 0 deletions
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
new file mode 100644
index 00000000..3f122e1d
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
@@ -0,0 +1,261 @@
+# -*- coding: utf-8 -*-
+# Description:
+# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (ilyam8)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import os
+
+from bases.charts import Charts, ChartError, create_runtime_chart
+from bases.collection import safe_print
+from bases.loggers import PythonDLogger
+from third_party.monotonic import monotonic
+from time import sleep, time
+
+RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \
+ 'SET run_time = {elapsed}\n' \
+ 'END\n'
+
+PENALTY_EVERY = 5
+MAX_PENALTY = 10 * 60 # 10 minutes
+
+ND_INTERNAL_MONITORING_DISABLED = os.getenv("NETDATA_INTERNALS_MONITORING") == "NO"
+
+
+class RuntimeCounters:
+ def __init__(self, configuration):
+ """
+ :param configuration: <dict>
+ """
+ self.update_every = int(configuration.pop('update_every'))
+ self.do_penalty = configuration.pop('penalty')
+
+ self.start_mono = 0
+ self.start_real = 0
+ self.retries = 0
+ self.penalty = 0
+ self.elapsed = 0
+ self.prev_update = 0
+
+ self.runs = 1
+
+ def calc_next(self):
+ self.start_mono = monotonic()
+ return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty
+
+ def sleep_until_next(self):
+ next_time = self.calc_next()
+ while self.start_mono < next_time:
+ sleep(next_time - self.start_mono)
+ self.start_mono = monotonic()
+ self.start_real = time()
+
+ def handle_retries(self):
+ self.retries += 1
+ if self.do_penalty and self.retries % PENALTY_EVERY == 0:
+ self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY))
+
+
+def clean_module_name(name):
+ if name.startswith('pythond_'):
+ return name[8:]
+ return name
+
+
+class SimpleService(PythonDLogger, object):
+ """
+ Prototype of Service class.
+ Implemented basic functionality to run jobs by `python.d.plugin`
+ """
+
+ def __init__(self, configuration, name=''):
+ """
+ :param configuration: <dict>
+ :param name: <str>
+ """
+ PythonDLogger.__init__(self)
+ self.configuration = configuration
+ self.order = list()
+ self.definitions = dict()
+
+ self.module_name = clean_module_name(self.__module__)
+ self.job_name = configuration.pop('job_name')
+ self.actual_job_name = self.job_name or self.module_name
+ self.override_name = configuration.pop('override_name')
+ self.fake_name = None
+
+ self._runtime_counters = RuntimeCounters(configuration=configuration)
+ self.charts = Charts(job_name=self.actual_name,
+ actual_job_name=self.actual_job_name,
+ priority=configuration.pop('priority'),
+ cleanup=configuration.pop('chart_cleanup'),
+ get_update_every=self.get_update_every,
+ module_name=self.module_name)
+
+ def __repr__(self):
+ return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__),
+ name=self.name)
+
+ @property
+ def name(self):
+ name = self.override_name or self.job_name
+ if name and name != self.module_name:
+ return '_'.join([self.module_name, name])
+ return self.module_name
+
+ def actual_name(self):
+ return self.fake_name or self.name
+
+ @property
+ def runs_counter(self):
+ return self._runtime_counters.runs
+
+ @property
+ def update_every(self):
+ return self._runtime_counters.update_every
+
+ @update_every.setter
+ def update_every(self, value):
+ """
+ :param value: <int>
+ :return:
+ """
+ self._runtime_counters.update_every = value
+
+ def get_update_every(self):
+ return self.update_every
+
+ def check(self):
+ """
+ check() prototype
+ :return: boolean
+ """
+ self.debug("job doesn't implement check() method. Using default which simply invokes get_data().")
+ data = self.get_data()
+ if data and isinstance(data, dict):
+ return True
+ self.debug('returned value is wrong: {0}'.format(data))
+ return False
+
+ @create_runtime_chart
+ def create(self):
+ for chart_name in self.order:
+ chart_config = self.definitions.get(chart_name)
+
+ if not chart_config:
+ self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. "
+ "Skipping it.".format(chart_name=chart_name))
+ continue
+
+ # create chart
+ chart_params = [chart_name] + chart_config['options']
+ try:
+ self.charts.add_chart(params=chart_params)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name,
+ error=error))
+ continue
+
+ # add dimensions to chart
+ for dimension in chart_config['lines']:
+ try:
+ self.charts[chart_name].add_dimension(dimension)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension,
+ error=error))
+ continue
+
+ # add variables to chart
+ if 'variables' in chart_config:
+ for variable in chart_config['variables']:
+ try:
+ self.charts[chart_name].add_variable(variable)
+ except ChartError as error:
+ self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable,
+ error=error))
+ continue
+
+ del self.order
+ del self.definitions
+
+ # True if job has at least 1 chart else False
+ return bool(self.charts)
+
+ def run(self):
+ """
+ Runs job in thread. Handles retries.
+ Exits when job failed or timed out.
+ :return: None
+ """
+ job = self._runtime_counters
+ self.debug('started, update frequency: {freq}'.format(freq=job.update_every))
+
+ while True:
+ job.sleep_until_next()
+
+ since = 0
+ if job.prev_update:
+ since = int((job.start_real - job.prev_update) * 1e6)
+
+ try:
+ updated = self.update(interval=since)
+ except Exception as error:
+ self.error('update() unhandled exception: {error}'.format(error=error))
+ updated = False
+
+ job.runs += 1
+
+ if not updated:
+ job.handle_retries()
+ else:
+ job.elapsed = int((monotonic() - job.start_mono) * 1e3)
+ job.prev_update = job.start_real
+ job.retries, job.penalty = 0, 0
+ if not ND_INTERNAL_MONITORING_DISABLED:
+ safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name,
+ since_last=since,
+ elapsed=job.elapsed))
+ self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format(
+ status='OK' if updated else 'FAILED',
+ elapsed=job.elapsed if updated else '-',
+ retries=job.retries))
+
+ def update(self, interval):
+ """
+ :return:
+ """
+ data = self.get_data()
+ if not data:
+ self.debug('get_data() returned no data')
+ return False
+ elif not isinstance(data, dict):
+ self.debug('get_data() returned incorrect type data')
+ return False
+
+ updated = False
+
+ for chart in self.charts:
+ if chart.flags.obsoleted:
+ if chart.can_be_updated(data):
+ chart.refresh()
+ else:
+ continue
+ elif self.charts.cleanup and chart.penalty >= self.charts.cleanup:
+ chart.obsolete()
+ self.info("chart '{0}' was suppressed due to non updating".format(chart.name))
+ continue
+
+ ok = chart.update(data, interval)
+ if ok:
+ updated = True
+
+ if not updated:
+ self.debug('none of the charts has been updated')
+
+ return updated
+
+ def get_data(self):
+ return self._get_data()
+
+ def _get_data(self):
+ raise NotImplementedError