summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c342
1 files changed, 114 insertions, 228 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 6e58d9a21..76843518e 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -11,7 +11,7 @@
#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
@@ -66,7 +66,7 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
/*
* In case of stream compression buffer overflow
* Inform the user through the error log file and
@@ -74,9 +74,9 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
*/
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- error("STREAM_COMPRESSION: Compression returned error, disabling it.");
+ netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
s->flags &= ~SENDER_FLAG_COMPRESSION;
- error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
}
#endif
@@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
if(unlikely(!src || !src_len))
return;
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
// FILE *fp = fopen("/tmp/stream.txt", "a");
// fprintf(fp,
@@ -111,14 +111,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
// fclose(fp);
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
- info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
-#ifdef ENABLE_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -144,19 +144,19 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
char *dst;
- size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
- error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
+ netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
- s->compressor->reset(s->compressor);
- dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ rrdpush_compressor_reset(&s->compressor);
+ dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
- error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
+ netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
deactivate_compression(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return;
}
}
@@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
signal_sender = true;
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
if(signal_sender)
rrdpush_signal_sender_to_wake_up(s);
@@ -203,7 +203,7 @@ static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const
, rrdvar2number(rva)
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
+ netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
@@ -242,7 +242,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
- debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
}
}
@@ -258,7 +258,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rd->exposed = 0;
+ rrddim_clear_exposed(rd);
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
@@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
return;
if(!have_mutex)
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
rrdpush_sender_last_buffer_recreate_set(s, now_s);
last_reset_time_s = now_s;
@@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
sender_thread_buffer_free();
if(!have_mutex)
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
}
static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
rrdpush_sender_set_flush_time(host->sender);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
@@ -490,27 +490,26 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
break;
}
}
- const char *error = stream_responses[i].error;
- int worker_job_id = stream_responses[i].worker_job_id;
- time_t delay = stream_responses[i].postpone_reconnect_seconds;
if(version >= STREAM_HANDSHAKE_OK_V1) {
- host->destination->last_error = NULL;
- host->destination->last_handshake = version;
- host->destination->postpone_reconnection_until = 0;
- s->capabilities = convert_stream_version_to_capabilities(version);
+ host->destination->reason = version;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
+ s->capabilities = convert_stream_version_to_capabilities(version, host, true);
return true;
}
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = error;
- host->destination->last_handshake = version;
+ host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
char buf[LOG_DATE_LENGTH];
log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
- error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
@@ -532,8 +531,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "SSL error";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -543,10 +541,9 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
// certificate is not valid
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
+ netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "invalid SSL certificate";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -554,7 +551,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
return true;
}
- error("SSL: failed to establish connection.");
+ netdata_log_error("SSL: failed to establish connection.");
return false;
#else
@@ -584,20 +581,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
);
if(unlikely(s->rrdpush_sender_socket == -1)) {
- // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
return false;
}
- // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
+ // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
// reset our capabilities to default
- s->capabilities = stream_our_capabilities();
+ s->capabilities = stream_our_capabilities(host, true);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
// If we don't want compression, remove it from our capabilities
if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
@@ -708,7 +705,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_connect_ssl(s))
return false;
- ssize_t bytes, len = strlen(http);
+ ssize_t bytes, len = (ssize_t)strlen(http);
bytes = send_timeout(
#ifdef ENABLE_HTTPS
@@ -723,9 +720,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while sending request";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
}
@@ -743,36 +739,33 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while expecting first response";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
http[bytes] = '\0';
- debug(D_STREAM, "Response to sender from far end: %s", http);
+ netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
-#ifdef ENABLE_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
- if(!s->compressor)
- s->compressor = create_compressor();
- else
- s->compressor->reset(s->compressor);
- }
-#endif //ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ rrdpush_compressor_reset(&s->compressor);
+ else
+ rrdpush_compressor_destroy(&s->compressor);
+#endif // ENABLE_RRDPUSH_COMPRESSION
log_sender_capabilities(s);
- debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+ netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
return true;
}
@@ -820,18 +813,18 @@ static bool attempt_to_connect(struct sender_state *state)
return false;
}
-// TCP window is open and we have data to transmit.
+// TCP window is open, and we have data to transmit.
static ssize_t attempt_to_send(struct sender_state *s) {
- ssize_t ret = 0;
+ ssize_t ret;
#ifdef NETDATA_INTERNAL_CHECKS
struct circular_buffer *cb = s->buffer;
#endif
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
- debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
if(SSL_connection(&s->ssl))
@@ -846,21 +839,21 @@ static ssize_t attempt_to_send(struct sender_state *s) {
cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
- debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
+ netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
}
else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
- debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
else if (ret == -1) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
- debug(D_STREAM, "STREAM: Send failed - closing socket...");
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
+ netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
else
- debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+ netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
replication_recalculate_buffer_used_ratio_unsafe(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return ret;
}
@@ -893,11 +886,11 @@ static ssize_t attempt_read(struct sender_state *s) {
if (ret == 0 || errno == ECONNRESET) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
- error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
}
else {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
- error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
+ netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
}
rrdpush_sender_thread_close_socket(s->host);
@@ -951,13 +944,13 @@ void execute_commands(struct sender_state *s) {
while( start < end && (newline = strchr(start, '\n')) ) {
*newline = '\0';
- log_access("STREAM: %d from '%s' for host '%s': %s",
+ netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
// internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
+ size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
@@ -969,7 +962,7 @@ void execute_commands(struct sender_state *s) {
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
keyword,
transaction?transaction:"(unset)",
@@ -1002,7 +995,7 @@ void execute_commands(struct sender_state *s) {
const char *before = get_word(words, num_words, 4);
if (!chart_id || !start_streaming || !after || !before) {
- error("STREAM %s [send to %s] %s command is incomplete"
+ netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
" (chart=%s, start_streaming=%s, after=%s, before=%s)",
rrdhost_hostname(s->host), s->connected_to,
keyword,
@@ -1020,7 +1013,7 @@ void execute_commands(struct sender_state *s) {
}
}
else {
- error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
@@ -1051,7 +1044,7 @@ static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen)
int new_pipe_fds[2];
if(reopen) {
if(pipe(new_pipe_fds) != 0) {
- error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
new_pipe_fds[PIPE_READ] = -1;
new_pipe_fds[PIPE_WRITE] = -1;
ret = false;
@@ -1091,138 +1084,26 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
// signal the sender there are more data
if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
- error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
}
}
-static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
- size_t charts = rrdhost_sender_replicating_charts(host);
- NETDATA_DOUBLE completion;
- if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
- completion = 100.0;
- else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
- completion = 0.0;
- else {
- time_t total = now - host->sender->replication.oldest_request_after_t;
- time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
- completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
- }
-
- *instances = charts;
-
- return completion;
-}
-
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- buffer_json_member_add_object(wb, key);
-
- if(host->sender)
- buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
-
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->sender && host->sender->last_state_since_t) {
- buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
- buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
- }
-
- if(!online && host->sender && host->sender->exit.reason)
- buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
-
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances;
- NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", completion);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb);
-
- if(host->sender) {
- netdata_mutex_lock(&host->sender->mutex);
-
- buffer_json_member_add_object(wb, "destination");
- {
- char buf[1024 + 1];
- if(online && host->sender->rrdpush_sender_socket != -1) {
- SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
- bool ssl = SSL_connection(&host->sender->ssl);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "local", buf);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "remote", buf);
-
- stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities");
-
- buffer_json_member_add_object(wb, "traffic");
- {
- bool compression = false;
-#ifdef ENABLE_COMPRESSION
- compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
-#endif
- buffer_json_member_add_boolean(wb, "compression", compression);
- buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
- buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
- buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
- buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
- }
- buffer_json_object_close(wb); // traffic
- }
-
- buffer_json_member_add_array(wb, "candidates");
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next) {
- buffer_json_add_array_item_object(wb);
- {
-
- if (d->ssl) {
- snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
- buffer_json_member_add_string(wb, "destination", buf);
- }
- else
- buffer_json_member_add_string(wb, "destination", string2str(d->destination));
-
- buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
- buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
- buffer_json_member_add_string(wb, "last_error", d->last_error);
- buffer_json_member_add_string(wb, "last_handshake",
- stream_handshake_error_to_string(d->last_handshake));
- buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
- buffer_json_member_add_time_t(wb, "next_in",
- (d->postpone_reconnection_until > now) ?
- d->postpone_reconnection_until - now : 0);
- }
- buffer_json_object_close(wb); // each candidate
- }
- buffer_json_array_close(wb); // candidates
- }
- buffer_json_object_close(wb); // destination
-
- netdata_mutex_unlock(&host->sender->mutex);
- }
-
- buffer_json_object_close(wb); // streaming
-}
-
static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
bool ret = false;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!host->sender->tid) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ host->rrdpush_sender_connection_counter++;
host->sender->tid = gettid();
host->sender->last_state_since_t = now_realtime_sec();
- host->sender->exit.reason = NULL;
+ host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
ret = true;
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
rrdpush_reset_destinations_postpone_time(host);
@@ -1237,6 +1118,10 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
host->sender->exit.shutdown = false;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
host->sender->last_state_since_t = now_realtime_sec();
+ if(host->destination) {
+ host->destination->since = host->sender->last_state_since_t;
+ host->destination->reason = host->sender->exit.reason;
+ }
}
rrdpush_reset_destinations_postpone_time(host);
@@ -1248,25 +1133,25 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
if(unlikely(!service_running(SERVICE_STREAMING))) {
if(!s->exit.reason)
- s->exit.reason = "NETDATA EXIT";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
return true;
}
if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
if(!s->exit.reason)
- s->exit.reason = "NON STREAMABLE HOST";
+ s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
return true;
}
if(unlikely(s->exit.shutdown)) {
if(!s->exit.reason)
- s->exit.reason = "SENDER SHUTDOWN REQUESTED";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
return true;
}
if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
if(!s->exit.reason)
- s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
return true;
}
@@ -1279,16 +1164,16 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
RRDHOST *host = s->host;
- netdata_mutex_lock(&host->sender->mutex);
- info("STREAM %s [send]: sending thread exits %s",
+ sender_lock(host->sender);
+ netdata_log_info("STREAM %s [send]: sending thread exits %s",
rrdhost_hostname(host),
- host->sender->exit.reason ? host->sender->exit.reason : "");
+ host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
rrdpush_sender_thread_close_socket(host);
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
freez(s->pipe_buffer);
freez(s);
@@ -1297,10 +1182,10 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
#ifdef ENABLE_HTTPS
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&sp);
+ spinlock_lock(&sp);
if(netdata_ssl_streaming_sender_ctx || !host) {
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
return;
}
@@ -1316,7 +1201,7 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
}
}
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
#endif
}
@@ -1331,7 +1216,7 @@ void *rrdpush_sender_thread(void *ptr) {
// disconnection reasons
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
@@ -1353,20 +1238,20 @@ void *rrdpush_sender_thread(void *ptr) {
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
- error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
+ netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
rrdhost_hostname(s->host), gettid());
return NULL;
}
if(!rrdhost_set_sender(s->host)) {
- error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
+ netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
rrdhost_hostname(s->host), gettid());
return NULL;
}
rrdpush_initialize_ssl_ctx(s->host);
- info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
+ netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
s->timeout = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
@@ -1397,7 +1282,7 @@ void *rrdpush_sender_thread(void *ptr) {
pipe_buffer_size = 10 * 1024;
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
- error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
rrdhost_hostname(s->host));
return NULL;
}
@@ -1433,12 +1318,13 @@ void *rrdpush_sender_thread(void *ptr) {
break;
now_s = s->last_traffic_seen_t = now_monotonic_sec();
- rrdpush_claimed_id(s->host);
+ rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
+ rrdpush_send_global_functions(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1452,19 +1338,19 @@ void *rrdpush_sender_thread(void *ptr) {
!rrdpush_sender_replicating_charts(s)
)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
if (unlikely(!outstanding)) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
@@ -1473,7 +1359,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
- error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
rrdhost_hostname(s->host));
rrdpush_sender_thread_close_socket(s->host);
break;
@@ -1502,7 +1388,7 @@ void *rrdpush_sender_thread(void *ptr) {
int poll_rc = poll(fds, 2, 1000);
- debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
+ netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);
if(unlikely(rrdhost_sender_should_exit(s)))
@@ -1517,7 +1403,7 @@ void *rrdpush_sender_thread(void *ptr) {
// Spurious wake-ups without error - loop again
if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
netdata_thread_testcancel();
- debug(D_STREAM, "Spurious wakeup");
+ netdata_log_debug(D_STREAM, "Spurious wakeup");
now_s = now_monotonic_sec();
continue;
}
@@ -1525,7 +1411,7 @@ void *rrdpush_sender_thread(void *ptr) {
// Only errors from poll() are internal, but try restarting the connection
if(unlikely(poll_rc == -1)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
- error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
rrdpush_sender_thread_close_socket(s->host);
continue;
@@ -1544,10 +1430,10 @@ void *rrdpush_sender_thread(void *ptr) {
// If the collector woke us up then empty the pipe to remove the signal
if (fds[Collector].revents & (POLLIN|POLLPRI)) {
worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
- debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
+ netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
}
// Read as much as possible to fill the buffer, split into full lines for execution.
@@ -1575,7 +1461,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(error) {
rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
- error("STREAM %s [send to %s]: restarting internal pipe: %s.",
+ netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
rrdhost_hostname(s->host), s->connected_to, error);
}
}
@@ -1591,8 +1477,8 @@ void *rrdpush_sender_thread(void *ptr) {
error = "connection is invalid (POLLNVAL)";
if(unlikely(error)) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
- error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
+ netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
@@ -1602,7 +1488,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
errno = 0;
- error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
+ netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}