summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/python_modules/bases
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/python_modules/bases')
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py30
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py109
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py10
-rw-r--r--collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py19
-rw-r--r--collectors/python.d.plugin/python_modules/bases/charts.py2
-rw-r--r--collectors/python.d.plugin/python_modules/bases/loggers.py2
6 files changed, 87 insertions, 85 deletions
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py
index 53807e2c4..9a694aa82 100644
--- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py
@@ -131,20 +131,22 @@ class MySQLService(SimpleService):
raw_data = dict()
queries = dict(self.queries)
try:
- with self.__connection as cursor:
- for name, query in queries.items():
- try:
- cursor.execute(query)
- except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
- if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
- raise RuntimeError
- self.error('Removed query: {name}[{query}]. Error: error'.format(name=name,
- query=query,
- error=error))
- self.queries.pop(name)
- continue
- else:
- raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
+ cursor = self.__connection.cursor()
+ for name, query in queries.items():
+ try:
+ cursor.execute(query)
+ except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
+ if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
+ cursor.close()
+ raise RuntimeError
+ self.error('Removed query: {name}[{query}]. Error: error'.format(name=name,
+ query=query,
+ error=error))
+ self.queries.pop(name)
+ continue
+ else:
+ raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
+ cursor.close()
self.__connection.commit()
except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError):
self.__connection.close()
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
index dd53fbc14..c7ab7f244 100644
--- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py
@@ -5,7 +5,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from threading import Thread
-from time import sleep
+from time import sleep, time
from third_party.monotonic import monotonic
@@ -17,25 +17,42 @@ RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \
'SET run_time = {elapsed}\n' \
'END\n'
+PENALTY_EVERY = 5
+MAX_PENALTY = 10 * 60 # 10 minutes
+
class RuntimeCounters:
def __init__(self, configuration):
"""
:param configuration: <dict>
"""
- self.FREQ = int(configuration.pop('update_every'))
- self.START_RUN = 0
- self.NEXT_RUN = 0
- self.PREV_UPDATE = 0
- self.SINCE_UPDATE = 0
- self.ELAPSED = 0
- self.RETRIES = 0
- self.RETRIES_MAX = configuration.pop('retries')
- self.PENALTY = 0
- self.RUNS = 1
+ self.update_every = int(configuration.pop('update_every'))
+ self.do_penalty = configuration.pop('penalty')
+
+ self.start_mono = 0
+ self.start_real = 0
+ self.retries = 0
+ self.penalty = 0
+ self.elapsed = 0
+ self.prev_update = 0
+
+ self.runs = 1
- def is_sleep_time(self):
- return self.START_RUN < self.NEXT_RUN
+ def calc_next(self):
+ self.start_mono = monotonic()
+ return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty
+
+ def sleep_until_next(self):
+ next_time = self.calc_next()
+ while self.start_mono < next_time:
+ sleep(next_time - self.start_mono)
+ self.start_mono = monotonic()
+ self.start_real = time()
+
+ def handle_retries(self):
+ self.retries += 1
+ if self.do_penalty and self.retries % PENALTY_EVERY == 0:
+ self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY))
class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, object):
@@ -83,11 +100,11 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec
@property
def runs_counter(self):
- return self._runtime_counters.RUNS
+ return self._runtime_counters.runs
@property
def update_every(self):
- return self._runtime_counters.FREQ
+ return self._runtime_counters.update_every
@update_every.setter
def update_every(self, value):
@@ -95,7 +112,7 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec
:param value: <int>
:return:
"""
- self._runtime_counters.FREQ = value
+ self._runtime_counters.update_every = value
def get_update_every(self):
return self.update_every
@@ -163,41 +180,36 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec
:return: None
"""
job = self._runtime_counters
- self.debug('started, update frequency: {freq}, '
- 'retries: {retries}'.format(freq=job.FREQ, retries=job.RETRIES_MAX - job.RETRIES))
+ self.debug('started, update frequency: {freq}'.format(freq=job.update_every))
while True:
- job.START_RUN = monotonic()
-
- job.NEXT_RUN = job.START_RUN - (job.START_RUN % job.FREQ) + job.FREQ + job.PENALTY
+ job.sleep_until_next()
- self.sleep_until_next_run()
-
- if job.PREV_UPDATE:
- job.SINCE_UPDATE = int((job.START_RUN - job.PREV_UPDATE) * 1e6)
+ since = 0
+ if job.prev_update:
+ since = int((job.start_real - job.prev_update) * 1e6)
try:
- updated = self.update(interval=job.SINCE_UPDATE)
+ updated = self.update(interval=since)
except Exception as error:
self.error('update() unhandled exception: {error}'.format(error=error))
updated = False
- job.RUNS += 1
+ job.runs += 1
if not updated:
- if not self.manage_retries():
- return
+ job.handle_retries()
else:
- job.ELAPSED = int((monotonic() - job.START_RUN) * 1e3)
- job.PREV_UPDATE = job.START_RUN
- job.RETRIES, job.PENALTY = 0, 0
+ job.elapsed = int((monotonic() - job.start_mono) * 1e3)
+ job.prev_update = job.start_real
+ job.retries, job.penalty = 0, 0
safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name,
- since_last=job.SINCE_UPDATE,
- elapsed=job.ELAPSED))
- self.debug('update => [{status}] (elapsed time: {elapsed}, '
- 'retries left: {retries})'.format(status='OK' if updated else 'FAILED',
- elapsed=job.ELAPSED if updated else '-',
- retries=job.RETRIES_MAX - job.RETRIES))
+ since_last=since,
+ elapsed=job.elapsed))
+ self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format(
+ status='OK' if updated else 'FAILED',
+ elapsed=job.elapsed if updated else '-',
+ retries=job.retries))
def update(self, interval):
"""
@@ -233,27 +245,6 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec
return updated
- def manage_retries(self):
- rc = self._runtime_counters
- rc.RETRIES += 1
- if rc.RETRIES % 5 == 0:
- rc.PENALTY = int(rc.RETRIES * self.update_every / 2)
- if rc.RETRIES >= rc.RETRIES_MAX:
- self.error('stopped after {0} data collection failures in a row'.format(rc.RETRIES_MAX))
- return False
- return True
-
- def sleep_until_next_run(self):
- job = self._runtime_counters
-
- # sleep() is interruptable
- while job.is_sleep_time():
- sleep_time = job.NEXT_RUN - job.START_RUN
- self.debug('sleeping for {sleep_time} to reach frequency of {freq} sec'.format(sleep_time=sleep_time,
- freq=job.FREQ + job.PENALTY))
- sleep(sleep_time)
- job.START_RUN = monotonic()
-
def get_data(self):
return self._get_data()
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py
index e85455307..f5e6380b8 100644
--- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py
@@ -75,9 +75,11 @@ class SocketService(SimpleService):
keyfile=self.key,
certfile=self.cert,
server_side=False,
- cert_reqs=ssl.CERT_NONE)
+ cert_reqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLS,
+ )
except (socket.error, ssl.SSLError) as error:
- self.error('Failed to wrap socket.')
+ self.error('failed to wrap socket : {0}'.format(error))
self._disconnect()
self.__socket_config = None
return False
@@ -169,8 +171,8 @@ class SocketService(SimpleService):
self.debug('closing socket')
self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all
self._sock.close()
- except Exception:
- pass
+ except Exception as error:
+ self.error(error)
self._sock = None
def _send(self, request=None):
diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py
index 856f38851..011efff9e 100644
--- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py
+++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py
@@ -26,6 +26,7 @@ class UrlService(SimpleService):
self.method = self.configuration.get('method', 'GET')
self.header = self.configuration.get('header')
self.request_timeout = self.configuration.get('timeout', 1)
+ self.respect_retry_after_header = self.configuration.get('respect_retry_after_header')
self.tls_verify = self.configuration.get('tls_verify')
self.tls_ca_file = self.configuration.get('tls_ca_file')
self.tls_key_file = self.configuration.get('tls_key_file')
@@ -111,12 +112,18 @@ class UrlService(SimpleService):
"""
url = url or self.url
manager = manager or self._manager
- response = manager.request(method=self.method,
- url=url,
- timeout=self.request_timeout,
- retries=retries,
- headers=manager.headers,
- redirect=redirect)
+ retry = urllib3.Retry(retries)
+ if hasattr(retry, 'respect_retry_after_header'):
+ retry.respect_retry_after_header = bool(self.respect_retry_after_header)
+
+ response = manager.request(
+ method=self.method,
+ url=url,
+ timeout=self.request_timeout,
+ retries=retry,
+ headers=manager.headers,
+ redirect=redirect,
+ )
if isinstance(response.data, str):
return response.status, response.data
return response.status, response.data.decode()
diff --git a/collectors/python.d.plugin/python_modules/bases/charts.py b/collectors/python.d.plugin/python_modules/bases/charts.py
index 2963739ec..0a0719056 100644
--- a/collectors/python.d.plugin/python_modules/bases/charts.py
+++ b/collectors/python.d.plugin/python_modules/bases/charts.py
@@ -45,7 +45,7 @@ def create_runtime_chart(func):
ok = func(*args, **kwargs)
if ok:
safe_print(RUNTIME_CHART_CREATE.format(job_name=self.name,
- update_every=self._runtime_counters.FREQ))
+ update_every=self._runtime_counters.update_every))
return ok
return wrapper
diff --git a/collectors/python.d.plugin/python_modules/bases/loggers.py b/collectors/python.d.plugin/python_modules/bases/loggers.py
index 39be77a79..098294d3e 100644
--- a/collectors/python.d.plugin/python_modules/bases/loggers.py
+++ b/collectors/python.d.plugin/python_modules/bases/loggers.py
@@ -34,7 +34,7 @@ def limiter(log_max_count=30, allowed_in_seconds=60):
def on_decorator(func):
def on_call(*args):
- current_time = args[0]._runtime_counters.START_RUN
+ current_time = args[0]._runtime_counters.start_mono
lc = args[0]._logger_counters
if lc.logged and lc.logged % log_max_count == 0: