diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-07-20 04:49:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-07-20 04:49:55 +0000 |
commit | ab1bb5b7f1c3c3a7b240ab7fc8661459ecd7decb (patch) | |
tree | 7a900833aad3ccc685712c6c2a7d87576d54f427 /streaming/sender.c | |
parent | Adding upstream version 1.40.1. (diff) | |
download | netdata-878715cc218c4a71a71c4cb4b4634082aa9627ba.tar.xz netdata-878715cc218c4a71a71c4cb4b4634082aa9627ba.zip |
Adding upstream version 1.41.0.upstream/1.41.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 342 |
1 files changed, 114 insertions, 228 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 6e58d9a21..76843518e 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -11,7 +11,7 @@ #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 -#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 @@ -66,7 +66,7 @@ BUFFER *sender_start(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION /* * In case of stream compression buffer overflow * Inform the user through the error log file and @@ -74,9 +74,9 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); */ static inline void deactivate_compression(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - error("STREAM_COMPRESSION: Compression returned error, disabling it."); + netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it."); s->flags &= ~SENDER_FLAG_COMPRESSION; - error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); rrdpush_sender_thread_close_socket(s->host); } #endif @@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) if(unlikely(!src || !src_len)) return; - netdata_mutex_lock(&s->mutex); + sender_lock(s); // FILE *fp = fopen("/tmp/stream.txt", "a"); // fprintf(fp, @@ -111,14 +111,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) // fclose(fp); if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { - info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", + netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } -#ifdef ENABLE_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) { +#ifdef ENABLE_RRDPUSH_COMPRESSION + if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) { while(src_len) { size_t size_to_compress = src_len; @@ -144,19 +144,19 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } char *dst; - size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { - error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", + netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); - s->compressor->reset(s->compressor); - dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + rrdpush_compressor_reset(&s->compressor); + dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if(!dst_len) { - error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", + netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", rrdhost_hostname(s->host), s->connected_to); deactivate_compression(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return; } } @@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) signal_sender = true; } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); if(signal_sender) rrdpush_signal_sender_to_wake_up(s); @@ -203,7 +203,7 @@ static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const , rrdvar2number(rva) ); - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); + netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); } void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) { @@ -242,7 +242,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); - debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); + netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } } @@ -258,7 +258,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDDIM *rd; rrddim_foreach_read(rd, st) - rd->exposed = 0; + rrddim_clear_exposed(rd); rrddim_foreach_done(rd); } rrdset_foreach_done(st); @@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t return; if(!have_mutex) - netdata_mutex_lock(&s->mutex); + sender_lock(s); rrdpush_sender_last_buffer_recreate_set(s, now_s); last_reset_time_s = now_s; @@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t sender_thread_buffer_free(); if(!have_mutex) - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); } static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { @@ -490,27 +490,26 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender break; } } - const char *error = stream_responses[i].error; - int worker_job_id = stream_responses[i].worker_job_id; - time_t delay = stream_responses[i].postpone_reconnect_seconds; if(version >= STREAM_HANDSHAKE_OK_V1) { - host->destination->last_error = NULL; - host->destination->last_handshake = version; - host->destination->postpone_reconnection_until = 0; - s->capabilities = convert_stream_version_to_capabilities(version); + host->destination->reason = version; + host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay; + s->capabilities = convert_stream_version_to_capabilities(version, host, true); return true; } + const char *error = stream_responses[i].error; + int worker_job_id = stream_responses[i].worker_job_id; + time_t delay = stream_responses[i].postpone_reconnect_seconds; + worker_is_busy(worker_job_id); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = error; - host->destination->last_handshake = version; + host->destination->reason = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; char buf[LOG_DATE_LENGTH]; log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); - error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; @@ -532,8 +531,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "SSL error"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; + host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR; host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; return false; } @@ -543,10 +541,9 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { // certificate is not valid worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - error("SSL: closing the stream connection, because the server SSL certificate is not valid."); + netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "invalid SSL certificate"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; + host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; return false; } @@ -554,7 +551,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { return true; } - error("SSL: failed to establish connection."); + netdata_log_error("SSL: failed to establish connection."); return false; #else @@ -584,20 +581,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p ); if(unlikely(s->rrdpush_sender_socket == -1)) { - // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); + // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); return false; } - // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); + // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); // reset our capabilities to default - s->capabilities = stream_our_capabilities(); + s->capabilities = stream_our_capabilities(host, true); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION // If we don't want compression, remove it from our capabilities if(!(s->flags & SENDER_FLAG_COMPRESSION)) s->capabilities &= ~STREAM_CAP_COMPRESSION; -#endif // ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. @@ -708,7 +705,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(!rrdpush_sender_connect_ssl(s)) return false; - ssize_t bytes, len = strlen(http); + ssize_t bytes, len = (ssize_t)strlen(http); bytes = send_timeout( #ifdef ENABLE_HTTPS @@ -723,9 +720,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); - host->destination->last_error = "timeout while sending request"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; + netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; return false; } @@ -743,36 +739,33 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); - host->destination->last_error = "timeout while expecting first response"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; + netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 30; return false; } if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); http[bytes] = '\0'; - debug(D_STREAM, "Response to sender from far end: %s", http); + netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; -#ifdef ENABLE_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) { - if(!s->compressor) - s->compressor = create_compressor(); - else - s->compressor->reset(s->compressor); - } -#endif //ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION + if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) + rrdpush_compressor_reset(&s->compressor); + else + rrdpush_compressor_destroy(&s->compressor); +#endif // ENABLE_RRDPUSH_COMPRESSION log_sender_capabilities(s); - debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); + netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); return true; } @@ -820,18 +813,18 @@ static bool attempt_to_connect(struct sender_state *state) return false; } -// TCP window is open and we have data to transmit. +// TCP window is open, and we have data to transmit. static ssize_t attempt_to_send(struct sender_state *s) { - ssize_t ret = 0; + ssize_t ret; #ifdef NETDATA_INTERNAL_CHECKS struct circular_buffer *cb = s->buffer; #endif - netdata_mutex_lock(&s->mutex); + sender_lock(s); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); - debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); + netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS if(SSL_connection(&s->ssl)) @@ -846,21 +839,21 @@ static ssize_t attempt_to_send(struct sender_state *s) { cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; - debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret); + netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret); } else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)) - debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to); + netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to); else if (ret == -1) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR); - debug(D_STREAM, "STREAM: Send failed - closing socket..."); - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection); + netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket..."); + netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } else - debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); + netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); replication_recalculate_buffer_used_ratio_unsafe(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return ret; } @@ -893,11 +886,11 @@ static ssize_t attempt_read(struct sender_state *s) { if (ret == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED); - error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); } else { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); - error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); + netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); } rrdpush_sender_thread_close_socket(s->host); @@ -951,13 +944,13 @@ void execute_commands(struct sender_state *s) { while( start < end && (newline = strchr(start, '\n')) ) { *newline = '\0'; - log_access("STREAM: %d from '%s' for host '%s': %s", + netdata_log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS); + size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS); const char *keyword = get_word(words, num_words, 0); @@ -969,7 +962,7 @@ void execute_commands(struct sender_state *s) { char *function = get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, keyword, transaction?transaction:"(unset)", @@ -1002,7 +995,7 @@ void execute_commands(struct sender_state *s) { const char *before = get_word(words, num_words, 4); if (!chart_id || !start_streaming || !after || !before) { - error("STREAM %s [send to %s] %s command is incomplete" + netdata_log_error("STREAM %s [send to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, keyword, @@ -1020,7 +1013,7 @@ void execute_commands(struct sender_state *s) { } } else { - error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); } worker_is_busy(WORKER_SENDER_JOB_EXECUTE); @@ -1051,7 +1044,7 @@ static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) int new_pipe_fds[2]; if(reopen) { if(pipe(new_pipe_fds) != 0) { - error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host)); new_pipe_fds[PIPE_READ] = -1; new_pipe_fds[PIPE_WRITE] = -1; ret = false; @@ -1091,138 +1084,26 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { // signal the sender there are more data if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) { - error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host)); rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true); } } -static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) { - size_t charts = rrdhost_sender_replicating_charts(host); - NETDATA_DOUBLE completion; - if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t) - completion = 100.0; - else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t) - completion = 0.0; - else { - time_t total = now - host->sender->replication.oldest_request_after_t; - time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t; - completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total; - } - - *instances = charts; - - return completion; -} - -void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { - bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - buffer_json_member_add_object(wb, key); - - if(host->sender) - buffer_json_member_add_uint64(wb, "hops", host->sender->hops); - - buffer_json_member_add_boolean(wb, "online", online); - - if(host->sender && host->sender->last_state_since_t) { - buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t); - buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t); - } - - if(!online && host->sender && host->sender->exit.reason) - buffer_json_member_add_string(wb, "reason", host->sender->exit.reason); - - buffer_json_member_add_object(wb, "replication"); - { - size_t instances; - NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances); - buffer_json_member_add_boolean(wb, "in_progress", instances); - buffer_json_member_add_double(wb, "completion", completion); - buffer_json_member_add_uint64(wb, "instances", instances); - } - buffer_json_object_close(wb); - - if(host->sender) { - netdata_mutex_lock(&host->sender->mutex); - - buffer_json_member_add_object(wb, "destination"); - { - char buf[1024 + 1]; - if(online && host->sender->rrdpush_sender_socket != -1) { - SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket); - bool ssl = SSL_connection(&host->sender->ssl); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "local", buf); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "remote", buf); - - stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities"); - - buffer_json_member_add_object(wb, "traffic"); - { - bool compression = false; -#ifdef ENABLE_COMPRESSION - compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor); -#endif - buffer_json_member_add_boolean(wb, "compression", compression); - buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]); - buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]); - buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]); - buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]); - } - buffer_json_object_close(wb); // traffic - } - - buffer_json_member_add_array(wb, "candidates"); - struct rrdpush_destinations *d; - for (d = host->destinations; d; d = d->next) { - buffer_json_add_array_item_object(wb); - { - - if (d->ssl) { - snprintfz(buf, 1024, "%s:SSL", string2str(d->destination)); - buffer_json_member_add_string(wb, "destination", buf); - } - else - buffer_json_member_add_string(wb, "destination", string2str(d->destination)); - - buffer_json_member_add_time_t(wb, "last_check", d->last_attempt); - buffer_json_member_add_time_t(wb, "age", now - d->last_attempt); - buffer_json_member_add_string(wb, "last_error", d->last_error); - buffer_json_member_add_string(wb, "last_handshake", - stream_handshake_error_to_string(d->last_handshake)); - buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until); - buffer_json_member_add_time_t(wb, "next_in", - (d->postpone_reconnection_until > now) ? - d->postpone_reconnection_until - now : 0); - } - buffer_json_object_close(wb); // each candidate - } - buffer_json_array_close(wb); // candidates - } - buffer_json_object_close(wb); // destination - - netdata_mutex_unlock(&host->sender->mutex); - } - - buffer_json_object_close(wb); // streaming -} - static bool rrdhost_set_sender(RRDHOST *host) { if(unlikely(!host->sender)) return false; bool ret = false; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!host->sender->tid) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + host->rrdpush_sender_connection_counter++; host->sender->tid = gettid(); host->sender->last_state_since_t = now_realtime_sec(); - host->sender->exit.reason = NULL; + host->sender->exit.reason = STREAM_HANDSHAKE_NEVER; ret = true; } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); rrdpush_reset_destinations_postpone_time(host); @@ -1237,6 +1118,10 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { host->sender->exit.shutdown = false; rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); host->sender->last_state_since_t = now_realtime_sec(); + if(host->destination) { + host->destination->since = host->sender->last_state_since_t; + host->destination->reason = host->sender->exit.reason; + } } rrdpush_reset_destinations_postpone_time(host); @@ -1248,25 +1133,25 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) { if(unlikely(!service_running(SERVICE_STREAMING))) { if(!s->exit.reason) - s->exit.reason = "NETDATA EXIT"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT; return true; } if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) { if(!s->exit.reason) - s->exit.reason = "NON STREAMABLE HOST"; + s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST; return true; } if(unlikely(s->exit.shutdown)) { if(!s->exit.reason) - s->exit.reason = "SENDER SHUTDOWN REQUESTED"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN; return true; } if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { if(!s->exit.reason) - s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST; return true; } @@ -1279,16 +1164,16 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { RRDHOST *host = s->host; - netdata_mutex_lock(&host->sender->mutex); - info("STREAM %s [send]: sending thread exits %s", + sender_lock(host->sender); + netdata_log_info("STREAM %s [send]: sending thread exits %s", rrdhost_hostname(host), - host->sender->exit.reason ? host->sender->exit.reason : ""); + host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : ""); rrdpush_sender_thread_close_socket(host); rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); freez(s->pipe_buffer); freez(s); @@ -1297,10 +1182,10 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { void rrdpush_initialize_ssl_ctx(RRDHOST *host) { #ifdef ENABLE_HTTPS static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&sp); + spinlock_lock(&sp); if(netdata_ssl_streaming_sender_ctx || !host) { - netdata_spinlock_unlock(&sp); + spinlock_unlock(&sp); return; } @@ -1316,7 +1201,7 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host) { } } - netdata_spinlock_unlock(&sp); + spinlock_unlock(&sp); #endif } @@ -1331,7 +1216,7 @@ void *rrdpush_sender_thread(void *ptr) { // disconnection reasons worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed"); @@ -1353,20 +1238,20 @@ void *rrdpush_sender_thread(void *ptr) { if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { - error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", + netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", rrdhost_hostname(s->host), gettid()); return NULL; } if(!rrdhost_set_sender(s->host)) { - error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", + netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", rrdhost_hostname(s->host), gettid()); return NULL; } rrdpush_initialize_ssl_ctx(s->host); - info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); + netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); s->timeout = (int)appconfig_get_number( &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600); @@ -1397,7 +1282,7 @@ void *rrdpush_sender_thread(void *ptr) { pipe_buffer_size = 10 * 1024; if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { - error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", rrdhost_hostname(s->host)); return NULL; } @@ -1433,12 +1318,13 @@ void *rrdpush_sender_thread(void *ptr) { break; now_s = s->last_traffic_seen_t = now_monotonic_sec(); - rrdpush_claimed_id(s->host); + rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); + rrdpush_send_global_functions(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1452,19 +1338,19 @@ void *rrdpush_sender_thread(void *ptr) { !rrdpush_sender_replicating_charts(s) )) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); - error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); + netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); rrdpush_sender_thread_close_socket(s->host); continue; } - netdata_mutex_lock(&s->mutex); + sender_lock(s); size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); size_t available = cbuffer_available_size_unsafe(s->buffer); if (unlikely(!outstanding)) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); @@ -1473,7 +1359,7 @@ void *rrdpush_sender_thread(void *ptr) { if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) { if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { - error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", rrdhost_hostname(s->host)); rrdpush_sender_thread_close_socket(s->host); break; @@ -1502,7 +1388,7 @@ void *rrdpush_sender_thread(void *ptr) { int poll_rc = poll(fds, 2, 1000); - debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", + netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); if(unlikely(rrdhost_sender_should_exit(s))) @@ -1517,7 +1403,7 @@ void *rrdpush_sender_thread(void *ptr) { // Spurious wake-ups without error - loop again if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { netdata_thread_testcancel(); - debug(D_STREAM, "Spurious wakeup"); + netdata_log_debug(D_STREAM, "Spurious wakeup"); now_s = now_monotonic_sec(); continue; } @@ -1525,7 +1411,7 @@ void *rrdpush_sender_thread(void *ptr) { // Only errors from poll() are internal, but try restarting the connection if(unlikely(poll_rc == -1)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR); - error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to); rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); rrdpush_sender_thread_close_socket(s->host); continue; @@ -1544,10 +1430,10 @@ void *rrdpush_sender_thread(void *ptr) { // If the collector woke us up then empty the pipe to remove the signal if (fds[Collector].revents & (POLLIN|POLLPRI)) { worker_is_busy(WORKER_SENDER_JOB_PIPE_READ); - debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding); + netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding); if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1) - error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to); } // Read as much as possible to fill the buffer, split into full lines for execution. @@ -1575,7 +1461,7 @@ void *rrdpush_sender_thread(void *ptr) { if(error) { rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); - error("STREAM %s [send to %s]: restarting internal pipe: %s.", + netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.", rrdhost_hostname(s->host), s->connected_to, error); } } @@ -1591,8 +1477,8 @@ void *rrdpush_sender_thread(void *ptr) { error = "connection is invalid (POLLNVAL)"; if(unlikely(error)) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR); - error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.", + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR); + netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.", rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } @@ -1602,7 +1488,7 @@ void *rrdpush_sender_thread(void *ptr) { if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW); errno = 0; - error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection", + netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection", rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } |