diff options
author | Lennart Weller <lhw@ring0.de> | 2017-01-24 15:21:09 +0000 |
---|---|---|
committer | Lennart Weller <lhw@ring0.de> | 2017-01-24 15:21:09 +0000 |
commit | 3ed3b02ed96ddab1c084811f3579b3a2aec83e04 (patch) | |
tree | 7a61ab288ae47800c4f11be5677d6ad8288dcd98 /python.d/python_modules/base.py | |
parent | New upstream version 1.4.0+dfsg (diff) | |
download | netdata-3ed3b02ed96ddab1c084811f3579b3a2aec83e04.tar.xz netdata-3ed3b02ed96ddab1c084811f3579b3a2aec83e04.zip |
New upstream version 1.5.0+dfsgupstream/1.5.0+dfsg
Diffstat (limited to 'python.d/python_modules/base.py')
-rw-r--r-- | python.d/python_modules/base.py | 297 |
1 files changed, 194 insertions, 103 deletions
diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py index 1508e0965..320c54bae 100644 --- a/python.d/python_modules/base.py +++ b/python.d/python_modules/base.py @@ -116,33 +116,21 @@ class SimpleService(threading.Thread): Return value presents exit status of update() :return: boolean """ - t_start = time.time() - timetable = self.timetable + t_start = float(time.time()) chart_name = self.chart_name - # check if it is time to execute job update() function - if timetable['next'] > t_start: - self.debug(chart_name, "will be run in", str(int((timetable['next'] - t_start) * 1000)), "ms") - return True - since_last = int((t_start - timetable['last']) * 1000000) - self.debug(chart_name, - "ready to run, after", str(int((t_start - timetable['last']) * 1000)), - "ms (update_every:", str(timetable['freq'] * 1000), - "ms, latency:", str(int((t_start - timetable['next']) * 1000)), "ms") + since_last = int((t_start - self.timetable['last']) * 1000000) if self.__first_run: since_last = 0 + if not self.update(since_last): self.error("update function failed.") return False - t_end = time.time() - self.timetable['next'] = t_end - (t_end % timetable['freq']) + timetable['freq'] + # draw performance graph - run_time = str(int((t_end - t_start) * 1000)) - # noinspection SqlNoDataSourceInspection + run_time = int((time.time() - t_start) * 1000) print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % - (self.chart_name, str(since_last), run_time)) - # sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % - # (self.chart_name, str(since_last), run_time)) + (self.chart_name, str(since_last), str(run_time))) self.debug(chart_name, "updated in", str(run_time), "ms") self.timetable['last'] = t_start @@ -155,23 +143,48 @@ class SimpleService(threading.Thread): Exits when job failed or timed out. :return: None """ - self.timetable['last'] = time.time() + step = float(self.timetable['freq']) + penalty = 0 + self.timetable['last'] = float(time.time() - step) + self.debug("starting data collection - update frequency:", str(step), " retries allowed:", str(self.retries)) while True: # run forever, unless something is wrong + now = float(time.time()) + next = self.timetable['next'] = now - (now % step) + step + penalty + + # it is important to do this in a loop + # sleep() is interruptable + while now < next: + self.debug("sleeping for", str(next - now), "secs to reach frequency of", str(step), "secs, now:", str(now), " next:", str(next), " penalty:", str(penalty)) + time.sleep(next - now) + now = float(time.time()) + + # do the job try: status = self._run_once() except Exception as e: - self.error("Something wrong: ", str(e)) - return - if status: # handle retries if update failed - time.sleep(self.timetable['next'] - time.time()) + status = False + + if status: + # it is good self.retries_left = self.retries + penalty = 0 else: + # it failed self.retries_left -= 1 if self.retries_left <= 0: - self.error("no more retries. Exiting") - return + if penalty == 0: + penalty = float(self.retries * step) / 2 + else: + penalty *= 1.5 + + if penalty > 600: + penalty = 600 + + self.retries_left = self.retries + self.alert("failed to collect data for " + str(self.retries) + " times - increasing penalty to " + str(penalty) + " sec and trying again") + else: - time.sleep(self.timetable['freq']) + self.error("failed to collect data - " + str(self.retries_left) + " retries left - penalty: " + str(penalty) + " sec") # --- CHART --- @@ -307,7 +320,10 @@ class SimpleService(threading.Thread): """ Upload new data to netdata. """ - print(self._data_stream) + try: + print(self._data_stream) + except Exception as e: + msg.fatal('cannot send data to netdata:', str(e)) self._data_stream = "" # --- ERROR HANDLING --- @@ -318,6 +334,12 @@ class SimpleService(threading.Thread): """ msg.error(self.chart_name, *params) + def alert(self, *params): + """ + Show error message on stderr + """ + msg.alert(self.chart_name, *params) + def debug(self, *params): """ Show debug message on stderr @@ -345,10 +367,18 @@ class SimpleService(threading.Thread): :return: boolean """ self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.") - if self._get_data() is None or len(self._get_data()) == 0: + data = self._get_data() + + if data is None: + self.debug("failed to receive data during check().") return False - else: - return True + + if len(data) == 0: + self.debug("empty data during check().") + return False + + self.debug("successfully received data during check(): '" + str(data) + "'") + return True def create(self): """ @@ -357,6 +387,7 @@ class SimpleService(threading.Thread): """ data = self._get_data() if data is None: + self.debug("failed to receive data during create().") return False idx = 0 @@ -380,7 +411,7 @@ class SimpleService(threading.Thread): """ data = self._get_data() if data is None: - self.debug("_get_data() returned no data") + self.debug("failed to receive data during update().") return False updated = False @@ -458,7 +489,7 @@ class UrlService(SimpleService): return None try: - raw = f.read().decode('utf-8') + raw = f.read().decode('utf-8', 'ignore') except Exception as e: self.error(str(e)) finally: @@ -506,8 +537,83 @@ class SocketService(SimpleService): self.unix_socket = None self.request = "" self.__socket_config = None + self.__empty_request = "".encode() SimpleService.__init__(self, configuration=configuration, name=name) + def _socketerror(self, message=None): + if self.unix_socket is not None: + self.error("unix socket '" + self.unix_socket + "':", message) + else: + if self.__socket_config is not None: + af, socktype, proto, canonname, sa = self.__socket_config + self.error("socket to '" + str(sa[0]) + "' port " + str(sa[1]) + ":", message) + else: + self.error("unknown socket:", message) + + def _connect2socket(self, res=None): + """ + Connect to a socket, passing the result of getaddrinfo() + :return: boolean + """ + if res is None: + res = self.__socket_config + if res is None: + self.error("Cannot create socket to 'None':") + return False + + af, socktype, proto, canonname, sa = res + try: + self.debug("creating socket to '" + str(sa[0]) + "', port " + str(sa[1])) + self._sock = socket.socket(af, socktype, proto) + except socket.error as e: + self.error("Failed to create socket to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) + self._sock = None + self.__socket_config = None + return False + + try: + self.debug("connecting socket to '" + str(sa[0]) + "', port " + str(sa[1])) + self._sock.connect(sa) + except socket.error as e: + self.error("Failed to connect to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) + self._disconnect() + self.__socket_config = None + return False + + self.debug("connected to '" + str(sa[0]) + "', port " + str(sa[1])) + self.__socket_config = res + return True + + def _connect2unixsocket(self): + """ + Connect to a unix socket, given its filename + :return: boolean + """ + if self.unix_socket is None: + self.error("cannot connect to unix socket 'None'") + return False + + try: + self.debug("attempting DGRAM unix socket '" + str(self.unix_socket) + "'") + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.connect(self.unix_socket) + self.debug("connected DGRAM unix socket '" + str(self.unix_socket) + "'") + return True + except socket.error as e: + self.debug("Failed to connect DGRAM unix socket '" + str(self.unix_socket) + "':", str(e)) + + try: + self.debug("attempting STREAM unix socket '" + str(self.unix_socket) + "'") + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.connect(self.unix_socket) + self.debug("connected STREAM unix socket '" + str(self.unix_socket) + "'") + return True + except socket.error as e: + self.debug("Failed to connect STREAM unix socket '" + str(self.unix_socket) + "':", str(e)) + self.error("Failed to connect to unix socket '" + str(self.unix_socket) + "':", str(e)) + self._sock = None + return False + def _connect(self): """ Recreate socket and connect to it since sockets cannot be reused after closing @@ -515,63 +621,38 @@ class SocketService(SimpleService): :return: """ try: - if self.unix_socket is None: - if self.__socket_config is None: - # establish ipv6 or ipv4 connection. - for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): - try: - # noinspection SpellCheckingInspection - af, socktype, proto, canonname, sa = res - self._sock = socket.socket(af, socktype, proto) - except socket.error as e: - self.debug("Cannot create socket:", str(e)) - self._sock = None - continue - try: - self._sock.connect(sa) - except socket.error as e: - self.debug("Cannot connect to socket:", str(e)) - self._disconnect() - continue - self.__socket_config = res - break - else: - # connect to socket with previously established configuration - try: - af, socktype, proto, canonname, sa = self.__socket_config - self._sock = socket.socket(af, socktype, proto) - self._sock.connect(sa) - except socket.error as e: - self.debug("Cannot create or connect to socket:", str(e)) - self._disconnect() + if self.unix_socket is not None: + self._connect2unixsocket() + else: - # connect to unix socket - try: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self._sock.connect(self.unix_socket) - except socket.error: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._sock.connect(self.unix_socket) + if self.__socket_config is not None: + self._connect2socket() + else: + for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): + if self._connect2socket(res): break except Exception as e: - self.error(str(e), - "Cannot create socket with following configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) self._sock = None - self._sock.setblocking(0) + self.__socket_config = None + + if self._sock is not None: + self._sock.setblocking(0) + self._sock.settimeout(5) + self.debug("set socket timeout to: " + str(self._sock.gettimeout())) def _disconnect(self): """ Close socket connection :return: """ - try: - self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all - self._sock.close() - except Exception: - pass - self._sock = None + if self._sock is not None: + try: + self.debug("closing socket") + self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all + self._sock.close() + except Exception: + pass + self._sock = None def _send(self): """ @@ -579,15 +660,13 @@ class SocketService(SimpleService): :return: boolean """ # Send request if it is needed - if self.request != "".encode(): + if self.request != self.__empty_request: try: + self.debug("sending request:", str(self.request)) self._sock.send(self.request) except Exception as e: + self._socketerror("error sending request:" + str(e)) self._disconnect() - self.error(str(e), - "used configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) return False return True @@ -598,24 +677,28 @@ class SocketService(SimpleService): """ data = "" while True: + self.debug("receiving response") try: - ready_to_read, _, in_error = select.select([self._sock], [], [], 5) + buf = self._sock.recv(4096) except Exception as e: - self.debug("SELECT", str(e)) + self._socketerror("failed to receive response:" + str(e)) self._disconnect() break - if len(ready_to_read) > 0: - buf = self._sock.recv(4096) - if len(buf) == 0 or buf is None: # handle server disconnect - break - data += buf.decode() - if self._check_raw_data(data): - break - else: - self.error("Socket timed out.") + + if buf is None or len(buf) == 0: # handle server disconnect + if data == "": + self._socketerror("unexpectedly disconnected") + else: + self.debug("server closed the connection") self._disconnect() break + self.debug("received data:", str(buf)) + data += buf.decode('utf-8', 'ignore') + if self._check_raw_data(data): + break + + self.debug("final response:", str(data)) return data def _get_raw_data(self): @@ -625,6 +708,8 @@ class SocketService(SimpleService): """ if self._sock is None: self._connect() + if self._sock is None: + return None # Send request if it is needed if not self._send(): @@ -654,6 +739,7 @@ class SocketService(SimpleService): self.name = "" else: self.name = str(self.name) + try: self.unix_socket = str(self.configuration['socket']) except (KeyError, TypeError): @@ -667,10 +753,12 @@ class SocketService(SimpleService): self.port = int(self.configuration['port']) except (KeyError, TypeError): self.debug("No port specified. Using: '" + str(self.port) + "'") + try: self.request = str(self.configuration['request']) except (KeyError, TypeError): self.debug("No request specified. Using: '" + str(self.request) + "'") + self.request = self.request.encode() def check(self): @@ -724,7 +812,7 @@ class LogService(SimpleService): try: self.log_path = str(self.configuration['path']) except (KeyError, TypeError): - self.error("No path to log specified. Using: '" + self.log_path + "'") + self.info("No path to log specified. Using: '" + self.log_path + "'") if os.access(self.log_path, os.R_OK): return True @@ -779,22 +867,25 @@ class ExecutableService(SimpleService): try: self.command = str(self.configuration['command']) except (KeyError, TypeError): - self.error("No command specified. Using: '" + self.command + "'") - command = self.command.split(' ') + self.info("No command specified. Using: '" + self.command + "'") + # Splitting self.command on every space so subprocess.Popen reads it properly + self.command = self.command.split(' ') - for arg in command[1:]: + for arg in self.command[1:]: if any(st in arg for st in self.bad_substrings): self.error("Bad command argument:" + " ".join(self.command[1:])) return False + # test command and search for it in /usr/sbin or /sbin when failed - base = command[0].split('/')[-1] + base = self.command[0].split('/')[-1] if self._get_raw_data() is None: for prefix in ['/sbin/', '/usr/sbin/']: - command[0] = prefix + base - if os.path.isfile(command[0]): + self.command[0] = prefix + base + if os.path.isfile(self.command[0]): break - self.command = command + if self._get_data() is None or len(self._get_data()) == 0: self.error("Command", self.command, "returned no data") return False + return True |