summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/sender.c275
1 files changed, 184 insertions, 91 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 62097e39f..854b57fc5 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -36,36 +36,41 @@ extern char *netdata_ssl_ca_file;
static __thread BUFFER *sender_thread_buffer = NULL;
static __thread bool sender_thread_buffer_used = false;
+static __thread time_t sender_thread_buffer_last_reset_s = 0;
void sender_thread_buffer_free(void) {
- if(sender_thread_buffer) {
- buffer_free(sender_thread_buffer);
- sender_thread_buffer = NULL;
- }
+ buffer_free(sender_thread_buffer);
+ sender_thread_buffer = NULL;
+ sender_thread_buffer_used = false;
}
// Collector thread starting a transmission
-BUFFER *sender_start(struct sender_state *s __maybe_unused) {
- if(!sender_thread_buffer)
- sender_thread_buffer = buffer_create(1024);
-
- if(sender_thread_buffer_used)
+BUFFER *sender_start(struct sender_state *s) {
+ if(unlikely(sender_thread_buffer_used))
fatal("STREAMING: thread buffer is used multiple times concurrently.");
+ if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
+ if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
+ buffer_free(sender_thread_buffer);
+ sender_thread_buffer = NULL;
+ }
+ }
+
+ if(unlikely(!sender_thread_buffer)) {
+ sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
+ sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
+ }
+
sender_thread_buffer_used = true;
buffer_flush(sender_thread_buffer);
return sender_thread_buffer;
}
-void sender_cancel(struct sender_state *s __maybe_unused) {
- sender_thread_buffer_used = false;
-}
-
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
#ifdef ENABLE_COMPRESSION
/*
-* In case of stream compression buffer oveflow
+* In case of stream compression buffer overflow
* Inform the user through the error log file and
* deactivate compression by downgrading the stream protocol.
*/
@@ -99,11 +104,11 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+ rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
- s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
#ifdef ENABLE_COMPRESSION
@@ -150,17 +155,17 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
}
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
s->flags |= SENDER_FLAG_OVERFLOW;
src = src + size_to_compress;
src_len -= size_to_compress;
}
}
- else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ else if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
#else
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
#endif
@@ -186,6 +191,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU
BUFFER *wb = sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer(wb, rva);
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
}
}
@@ -214,6 +220,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
(void)ret;
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
}
@@ -222,14 +229,12 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
// resets all the chart, so that their definitions
// will be resent to the central netdata
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
- error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
-
RRDSET *st;
rrdset_foreach_read(st, host) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- st->upstream_resync_time = 0;
+ st->upstream_resync_time_s = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st)
@@ -241,6 +246,30 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
rrdhost_sender_replicating_charts_zero(host);
}
+static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
+ static __thread time_t last_reset_time_s = 0;
+
+ if(!force && now_s - last_reset_time_s < 300)
+ return;
+
+ if(!have_mutex)
+ netdata_mutex_lock(&s->mutex);
+
+ rrdpush_sender_last_buffer_recreate_set(s, now_s);
+ last_reset_time_s = now_s;
+
+ if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
+ size_t max = s->buffer->max_size;
+ cbuffer_free(s->buffer);
+ s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
+ }
+
+ sender_thread_buffer_free();
+
+ if(!have_mutex)
+ netdata_mutex_unlock(&s->mutex);
+}
+
static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
rrdpush_sender_set_flush_time(host->sender);
@@ -248,6 +277,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
+ rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
netdata_mutex_unlock(&host->sender->mutex);
@@ -268,6 +298,9 @@ static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
static void rrdpush_sender_on_connect(RRDHOST *host) {
rrdpush_sender_cbuffer_flush(host);
rrdpush_sender_charts_and_replication_reset(host);
+}
+
+static void rrdpush_sender_after_connect(RRDHOST *host) {
rrdpush_sender_thread_send_custom_host_variables(host);
}
@@ -418,13 +451,17 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
- error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error);
-
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
host->destination->last_error = error;
host->destination->last_handshake = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
+
+ char buf[LOG_DATE_LENGTH];
+ log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
+ error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
+
return false;
}
@@ -449,11 +486,11 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
);
if(unlikely(s->rrdpush_sender_socket == -1)) {
- error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
return false;
}
- info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
+ // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
#ifdef ENABLE_HTTPS
if(netdata_ssl_client_ctx){
@@ -499,6 +536,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
stream_encoded_t se;
rrdpush_encode_variable(&se, host);
+ host->sender->hops = host->system_info->hops + 1;
+
char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
"STREAM "
@@ -557,7 +596,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
, rrdhost_timezone(host)
, rrdhost_abbrev_timezone(host)
, host->utc_offset
- , host->system_info->hops + 1
+ , host->sender->hops
, host->system_info->ml_capable
, host->system_info->ml_enabled
, host->system_info->mc_version
@@ -657,7 +696,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
}
- info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
+ // info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
bytes = recv_timeout(
#ifdef ENABLE_HTTPS
@@ -726,6 +765,8 @@ static bool attempt_to_connect(struct sender_state *state)
// let the data collection threads know we are ready
rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+ rrdpush_sender_after_connect(state->host);
+
return true;
}
@@ -738,7 +779,13 @@ static bool attempt_to_connect(struct sender_state *state)
state->sent_bytes_on_this_connection = 0;
// slow re-connection on repeating errors
- sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
+ usec_t now_ut = now_monotonic_usec();
+ usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
+ while(now_ut < end_ut) {
+ netdata_thread_testcancel();
+ sleep_usec(500 * USEC_PER_MS); // seconds
+ now_ut = now_monotonic_usec();
+ }
return false;
}
@@ -757,8 +804,8 @@ static ssize_t attempt_to_send(struct sender_state *s) {
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
- SSL *conn = s->host->sender->ssl.conn ;
- if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ SSL *conn = s->ssl.conn ;
+ if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
ret = netdata_ssl_write(conn, chunk, outstanding);
else
ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
@@ -793,16 +840,18 @@ static ssize_t attempt_read(struct sender_state *s) {
ssize_t ret = 0;
#ifdef ENABLE_HTTPS
- if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired);
+ ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired);
if (ret > 0 ) {
s->read_len += (int)ret;
return ret;
}
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- rrdpush_sender_thread_close_socket(s->host);
+ if (ret == -1) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
return ret;
}
#endif
@@ -852,6 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_end_to_buffer(wb);
sender_commit(s, wb);
+ sender_thread_buffer_free();
internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
rrdhost_hostname(s->host), s->connected_to,
@@ -876,7 +926,7 @@ void execute_commands(struct sender_state *s) {
log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
- internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
+ // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
@@ -906,7 +956,7 @@ void execute_commands(struct sender_state *s) {
tmp->received_ut = now_realtime_usec();
tmp->sender = s;
tmp->transaction = string_strdupz(transaction);
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1);
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
if(code != HTTP_RESP_OK) {
@@ -1019,59 +1069,83 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
}
}
-static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
- struct rrdpush_sender_thread_data *data = ptr;
- worker_unregister();
-
- RRDHOST *host = data->host;
+static bool rrdhost_set_sender(RRDHOST *host) {
+ if(unlikely(!host->sender)) return false;
+ bool ret = false;
netdata_mutex_lock(&host->sender->mutex);
+ if(!host->sender->tid) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ host->sender->tid = gettid();
+ ret = true;
+ }
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ return ret;
+}
- info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host));
+static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
+ if(unlikely(!host->sender)) return;
- rrdpush_sender_thread_close_socket(host);
- rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
+ if(host->sender->tid == gettid()) {
+ host->sender->tid = 0;
+ host->sender->exit.shutdown = false;
+ host->sender->exit.reason = NULL;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ }
+}
- if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) {
- info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host));
- netdata_thread_detach(netdata_thread_self());
+static bool rrdhost_sender_should_exit(struct sender_state *s) {
+ // check for outstanding cancellation requests
+ netdata_thread_testcancel();
+
+ if(unlikely(!service_running(SERVICE_STREAMING))) {
+ if(!s->exit.reason)
+ s->exit.reason = "NETDATA EXIT";
+ return true;
}
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
+ if(!s->exit.reason)
+ s->exit.reason = "NON STREAMABLE HOST";
+ return true;
+ }
- info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host));
+ if(unlikely(s->exit.shutdown)) {
+ if(!s->exit.reason)
+ s->exit.reason = "SENDER SHUTDOWN REQUESTED";
+ return true;
+ }
- netdata_mutex_unlock(&host->sender->mutex);
+ if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
+ if(!s->exit.reason)
+ s->exit.reason = "RECEIVER LEFT";
+ return true;
+ }
- freez(data->pipe_buffer);
- freez(data);
+ return false;
}
-void sender_init(RRDHOST *host)
-{
- if (host->sender)
- return;
+static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ struct rrdpush_sender_thread_data *s = ptr;
+ worker_unregister();
+
+ RRDHOST *host = s->host;
- host->sender = callocz(1, sizeof(*host->sender));
- host->sender->host = host;
- host->sender->buffer = cbuffer_new(1024, 1024 * 1024);
- host->sender->capabilities = STREAM_OUR_CAPABILITIES;
+ netdata_mutex_lock(&host->sender->mutex);
+ info("STREAM %s [send]: sending thread exits %s",
+ rrdhost_hostname(host),
+ host->sender->exit.reason ? host->sender->exit.reason : "");
- host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
- host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
- host->sender->rrdpush_sender_socket = -1;
+ rrdpush_sender_thread_close_socket(host);
+ rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
-#ifdef ENABLE_COMPRESSION
- if(default_compression_enabled) {
- host->sender->flags |= SENDER_FLAG_COMPRESSION;
- host->sender->compressor = create_compressor();
- }
- else
- host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
-#endif
+ rrdhost_clear_sender___while_having_sender_mutex(host);
+ netdata_mutex_unlock(&host->sender->mutex);
- netdata_mutex_init(&host->sender->mutex);
- replication_init_sender(host->sender);
+ freez(s->pipe_buffer);
+ freez(s);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1103,13 +1177,18 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
struct sender_state *s = ptr;
- s->tid = gettid();
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
- rrdhost_hostname(s->host), s->tid);
+ rrdhost_hostname(s->host), gettid());
+ return NULL;
+ }
+
+ if(!rrdhost_set_sender(s->host)) {
+ error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
+ rrdhost_hostname(s->host), gettid());
return NULL;
}
@@ -1125,7 +1204,7 @@ void *rrdpush_sender_thread(void *ptr) {
}
#endif
- info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid);
+ info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
s->timeout = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
@@ -1166,28 +1245,33 @@ void *rrdpush_sender_thread(void *ptr) {
thread_data->sender_state = s;
thread_data->host = s->host;
- // reset our cleanup flags
- rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
-
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
- for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) {
- // check for outstanding cancellation requests
- netdata_thread_testcancel();
+ size_t iterations = 0;
+ time_t now_s = now_monotonic_sec();
+ while(!rrdhost_sender_should_exit(s)) {
+ iterations++;
// The connection attempt blocks (after which we use the socket in nonblocking)
if(unlikely(s->rrdpush_sender_socket == -1)) {
worker_is_busy(WORKER_SENDER_JOB_CONNECT);
+
+ now_s = now_monotonic_sec();
+ rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
+
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
s->flags &= ~SENDER_FLAG_OVERFLOW;
s->read_len = 0;
s->buffer->read = 0;
s->buffer->write = 0;
- if(unlikely(!attempt_to_connect(s)))
+ if(!attempt_to_connect(s))
continue;
- s->last_traffic_seen_t = now_monotonic_sec();
+ if(rrdhost_sender_should_exit(s))
+ break;
+
+ now_s = s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
@@ -1197,8 +1281,11 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
+ if(iterations % 1000 == 0)
+ now_s = now_monotonic_sec();
+
// If the TCP window never opened then something is wrong, restart connection
- if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
+ if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
!rrdpush_sender_pending_replication_requests(s) &&
!rrdpush_sender_replicating_charts(s)
)) {
@@ -1209,11 +1296,13 @@ void *rrdpush_sender_thread(void *ptr) {
}
netdata_mutex_lock(&s->mutex);
- size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
- size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
+ size_t available = cbuffer_available_size_unsafe(s->buffer);
+ if (unlikely(!outstanding))
+ rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
netdata_mutex_unlock(&s->mutex);
- worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size);
+ worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
if(outstanding)
s->send_attempts++;
@@ -1246,12 +1335,14 @@ void *rrdpush_sender_thread(void *ptr) {
.revents = 0,
}
};
+
int poll_rc = poll(fds, 2, 1000);
debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(rrdhost_sender_should_exit(s)))
+ break;
internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
"STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
@@ -1261,7 +1352,9 @@ void *rrdpush_sender_thread(void *ptr) {
// Spurious wake-ups without error - loop again
if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
+ netdata_thread_testcancel();
debug(D_STREAM, "Spurious wakeup");
+ now_s = now_monotonic_sec();
continue;
}