From 8a7b72f7cd1ccd547a03eb4243294e741d661d3f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 8 Feb 2019 08:30:37 +0100 Subject: Adding upstream version 1.12.0. Signed-off-by: Daniel Baumann --- .../bases/FrameworkServices/MySQLService.py | 30 +++--- .../bases/FrameworkServices/SimpleService.py | 109 ++++++++++----------- .../bases/FrameworkServices/SocketService.py | 10 +- .../bases/FrameworkServices/UrlService.py | 19 ++-- 4 files changed, 85 insertions(+), 83 deletions(-) (limited to 'collectors/python.d.plugin/python_modules/bases/FrameworkServices') diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py index 53807e2c4..9a694aa82 100644 --- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/MySQLService.py @@ -131,20 +131,22 @@ class MySQLService(SimpleService): raw_data = dict() queries = dict(self.queries) try: - with self.__connection as cursor: - for name, query in queries.items(): - try: - cursor.execute(query) - except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error: - if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)): - raise RuntimeError - self.error('Removed query: {name}[{query}]. Error: error'.format(name=name, - query=query, - error=error)) - self.queries.pop(name) - continue - else: - raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall() + cursor = self.__connection.cursor() + for name, query in queries.items(): + try: + cursor.execute(query) + except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error: + if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)): + cursor.close() + raise RuntimeError + self.error('Removed query: {name}[{query}]. Error: error'.format(name=name, + query=query, + error=error)) + self.queries.pop(name) + continue + else: + raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall() + cursor.close() self.__connection.commit() except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError): self.__connection.close() diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py index dd53fbc14..c7ab7f244 100644 --- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SimpleService.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later from threading import Thread -from time import sleep +from time import sleep, time from third_party.monotonic import monotonic @@ -17,25 +17,42 @@ RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \ 'SET run_time = {elapsed}\n' \ 'END\n' +PENALTY_EVERY = 5 +MAX_PENALTY = 10 * 60 # 10 minutes + class RuntimeCounters: def __init__(self, configuration): """ :param configuration: """ - self.FREQ = int(configuration.pop('update_every')) - self.START_RUN = 0 - self.NEXT_RUN = 0 - self.PREV_UPDATE = 0 - self.SINCE_UPDATE = 0 - self.ELAPSED = 0 - self.RETRIES = 0 - self.RETRIES_MAX = configuration.pop('retries') - self.PENALTY = 0 - self.RUNS = 1 + self.update_every = int(configuration.pop('update_every')) + self.do_penalty = configuration.pop('penalty') + + self.start_mono = 0 + self.start_real = 0 + self.retries = 0 + self.penalty = 0 + self.elapsed = 0 + self.prev_update = 0 + + self.runs = 1 - def is_sleep_time(self): - return self.START_RUN < self.NEXT_RUN + def calc_next(self): + self.start_mono = monotonic() + return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty + + def sleep_until_next(self): + next_time = self.calc_next() + while self.start_mono < next_time: + sleep(next_time - self.start_mono) + self.start_mono = monotonic() + self.start_real = time() + + def handle_retries(self): + self.retries += 1 + if self.do_penalty and self.retries % PENALTY_EVERY == 0: + self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY)) class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, object): @@ -83,11 +100,11 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec @property def runs_counter(self): - return self._runtime_counters.RUNS + return self._runtime_counters.runs @property def update_every(self): - return self._runtime_counters.FREQ + return self._runtime_counters.update_every @update_every.setter def update_every(self, value): @@ -95,7 +112,7 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec :param value: :return: """ - self._runtime_counters.FREQ = value + self._runtime_counters.update_every = value def get_update_every(self): return self.update_every @@ -163,41 +180,36 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec :return: None """ job = self._runtime_counters - self.debug('started, update frequency: {freq}, ' - 'retries: {retries}'.format(freq=job.FREQ, retries=job.RETRIES_MAX - job.RETRIES)) + self.debug('started, update frequency: {freq}'.format(freq=job.update_every)) while True: - job.START_RUN = monotonic() - - job.NEXT_RUN = job.START_RUN - (job.START_RUN % job.FREQ) + job.FREQ + job.PENALTY + job.sleep_until_next() - self.sleep_until_next_run() - - if job.PREV_UPDATE: - job.SINCE_UPDATE = int((job.START_RUN - job.PREV_UPDATE) * 1e6) + since = 0 + if job.prev_update: + since = int((job.start_real - job.prev_update) * 1e6) try: - updated = self.update(interval=job.SINCE_UPDATE) + updated = self.update(interval=since) except Exception as error: self.error('update() unhandled exception: {error}'.format(error=error)) updated = False - job.RUNS += 1 + job.runs += 1 if not updated: - if not self.manage_retries(): - return + job.handle_retries() else: - job.ELAPSED = int((monotonic() - job.START_RUN) * 1e3) - job.PREV_UPDATE = job.START_RUN - job.RETRIES, job.PENALTY = 0, 0 + job.elapsed = int((monotonic() - job.start_mono) * 1e3) + job.prev_update = job.start_real + job.retries, job.penalty = 0, 0 safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name, - since_last=job.SINCE_UPDATE, - elapsed=job.ELAPSED)) - self.debug('update => [{status}] (elapsed time: {elapsed}, ' - 'retries left: {retries})'.format(status='OK' if updated else 'FAILED', - elapsed=job.ELAPSED if updated else '-', - retries=job.RETRIES_MAX - job.RETRIES)) + since_last=since, + elapsed=job.elapsed)) + self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format( + status='OK' if updated else 'FAILED', + elapsed=job.elapsed if updated else '-', + retries=job.retries)) def update(self, interval): """ @@ -233,27 +245,6 @@ class SimpleService(Thread, PythonDLimitedLogger, OldVersionCompatibility, objec return updated - def manage_retries(self): - rc = self._runtime_counters - rc.RETRIES += 1 - if rc.RETRIES % 5 == 0: - rc.PENALTY = int(rc.RETRIES * self.update_every / 2) - if rc.RETRIES >= rc.RETRIES_MAX: - self.error('stopped after {0} data collection failures in a row'.format(rc.RETRIES_MAX)) - return False - return True - - def sleep_until_next_run(self): - job = self._runtime_counters - - # sleep() is interruptable - while job.is_sleep_time(): - sleep_time = job.NEXT_RUN - job.START_RUN - self.debug('sleeping for {sleep_time} to reach frequency of {freq} sec'.format(sleep_time=sleep_time, - freq=job.FREQ + job.PENALTY)) - sleep(sleep_time) - job.START_RUN = monotonic() - def get_data(self): return self._get_data() diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py index e85455307..f5e6380b8 100644 --- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/SocketService.py @@ -75,9 +75,11 @@ class SocketService(SimpleService): keyfile=self.key, certfile=self.cert, server_side=False, - cert_reqs=ssl.CERT_NONE) + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLS, + ) except (socket.error, ssl.SSLError) as error: - self.error('Failed to wrap socket.') + self.error('failed to wrap socket : {0}'.format(error)) self._disconnect() self.__socket_config = None return False @@ -169,8 +171,8 @@ class SocketService(SimpleService): self.debug('closing socket') self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all self._sock.close() - except Exception: - pass + except Exception as error: + self.error(error) self._sock = None def _send(self, request=None): diff --git a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py index 856f38851..011efff9e 100644 --- a/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py +++ b/collectors/python.d.plugin/python_modules/bases/FrameworkServices/UrlService.py @@ -26,6 +26,7 @@ class UrlService(SimpleService): self.method = self.configuration.get('method', 'GET') self.header = self.configuration.get('header') self.request_timeout = self.configuration.get('timeout', 1) + self.respect_retry_after_header = self.configuration.get('respect_retry_after_header') self.tls_verify = self.configuration.get('tls_verify') self.tls_ca_file = self.configuration.get('tls_ca_file') self.tls_key_file = self.configuration.get('tls_key_file') @@ -111,12 +112,18 @@ class UrlService(SimpleService): """ url = url or self.url manager = manager or self._manager - response = manager.request(method=self.method, - url=url, - timeout=self.request_timeout, - retries=retries, - headers=manager.headers, - redirect=redirect) + retry = urllib3.Retry(retries) + if hasattr(retry, 'respect_retry_after_header'): + retry.respect_retry_after_header = bool(self.respect_retry_after_header) + + response = manager.request( + method=self.method, + url=url, + timeout=self.request_timeout, + retries=retry, + headers=manager.headers, + redirect=redirect, + ) if isinstance(response.data, str): return response.status, response.data return response.status, response.data.decode() -- cgit v1.2.3