diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:34 +0000 |
commit | d079b656b4719739b2247dcd9d46e9bec793095a (patch) | |
tree | d2c950c70a776bcf697c963151c5bd959f8a9f03 /streaming/sender.c | |
parent | Releasing debian version 1.37.1-2. (diff) | |
download | netdata-d079b656b4719739b2247dcd9d46e9bec793095a.tar.xz netdata-d079b656b4719739b2247dcd9d46e9bec793095a.zip |
Merging upstream version 1.38.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 275 |
1 files changed, 184 insertions, 91 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 62097e39f..854b57fc5 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -36,36 +36,41 @@ extern char *netdata_ssl_ca_file; static __thread BUFFER *sender_thread_buffer = NULL; static __thread bool sender_thread_buffer_used = false; +static __thread time_t sender_thread_buffer_last_reset_s = 0; void sender_thread_buffer_free(void) { - if(sender_thread_buffer) { - buffer_free(sender_thread_buffer); - sender_thread_buffer = NULL; - } + buffer_free(sender_thread_buffer); + sender_thread_buffer = NULL; + sender_thread_buffer_used = false; } // Collector thread starting a transmission -BUFFER *sender_start(struct sender_state *s __maybe_unused) { - if(!sender_thread_buffer) - sender_thread_buffer = buffer_create(1024); - - if(sender_thread_buffer_used) +BUFFER *sender_start(struct sender_state *s) { + if(unlikely(sender_thread_buffer_used)) fatal("STREAMING: thread buffer is used multiple times concurrently."); + if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) { + if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) { + buffer_free(sender_thread_buffer); + sender_thread_buffer = NULL; + } + } + + if(unlikely(!sender_thread_buffer)) { + sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming); + sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s); + } + sender_thread_buffer_used = true; buffer_flush(sender_thread_buffer); return sender_thread_buffer; } -void sender_cancel(struct sender_state *s __maybe_unused) { - sender_thread_buffer_used = false; -} - static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); #ifdef ENABLE_COMPRESSION /* -* In case of stream compression buffer oveflow +* In case of stream compression buffer overflow * Inform the user through the error log file and * deactivate compression by downgrading the stream protocol. */ @@ -99,11 +104,11 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { netdata_mutex_lock(&s->mutex); - if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { + if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", - rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); + rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); - s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; + s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } #ifdef ENABLE_COMPRESSION @@ -150,17 +155,17 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { } } - if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len)) + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) s->flags |= SENDER_FLAG_OVERFLOW; src = src + size_to_compress; src_len -= size_to_compress; } } - else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #else - if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #endif @@ -186,6 +191,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU BUFFER *wb = sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer(wb, rva); sender_commit(host->sender, wb); + sender_thread_buffer_free(); } } @@ -214,6 +220,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); (void)ret; sender_commit(host->sender, wb); + sender_thread_buffer_free(); debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } @@ -222,14 +229,12 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { // resets all the chart, so that their definitions // will be resent to the central netdata static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { - error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host)); - RRDSET *st; rrdset_foreach_read(st, host) { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - st->upstream_resync_time = 0; + st->upstream_resync_time_s = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) @@ -241,6 +246,30 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { rrdhost_sender_replicating_charts_zero(host); } +static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) { + static __thread time_t last_reset_time_s = 0; + + if(!force && now_s - last_reset_time_s < 300) + return; + + if(!have_mutex) + netdata_mutex_lock(&s->mutex); + + rrdpush_sender_last_buffer_recreate_set(s, now_s); + last_reset_time_s = now_s; + + if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) { + size_t max = s->buffer->max_size; + cbuffer_free(s->buffer); + s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming); + } + + sender_thread_buffer_free(); + + if(!have_mutex) + netdata_mutex_unlock(&s->mutex); +} + static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); @@ -248,6 +277,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); + rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); netdata_mutex_unlock(&host->sender->mutex); @@ -268,6 +298,9 @@ static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { static void rrdpush_sender_on_connect(RRDHOST *host) { rrdpush_sender_cbuffer_flush(host); rrdpush_sender_charts_and_replication_reset(host); +} + +static void rrdpush_sender_after_connect(RRDHOST *host) { rrdpush_sender_thread_send_custom_host_variables(host); } @@ -418,13 +451,17 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } - error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error); - worker_is_busy(worker_job_id); rrdpush_sender_thread_close_socket(host); host->destination->last_error = error; host->destination->last_handshake = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; + + char buf[LOG_DATE_LENGTH]; + log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); + error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); + return false; } @@ -449,11 +486,11 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p ); if(unlikely(s->rrdpush_sender_socket == -1)) { - error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); + // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); return false; } - info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); + // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); #ifdef ENABLE_HTTPS if(netdata_ssl_client_ctx){ @@ -499,6 +536,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p stream_encoded_t se; rrdpush_encode_variable(&se, host); + host->sender->hops = host->system_info->hops + 1; + char http[HTTP_HEADER_SIZE + 1]; int eol = snprintfz(http, HTTP_HEADER_SIZE, "STREAM " @@ -557,7 +596,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p , rrdhost_timezone(host) , rrdhost_abbrev_timezone(host) , host->utc_offset - , host->system_info->hops + 1 + , host->sender->hops , host->system_info->ml_capable , host->system_info->ml_enabled , host->system_info->mc_version @@ -657,7 +696,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; } - info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to); + // info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to); bytes = recv_timeout( #ifdef ENABLE_HTTPS @@ -726,6 +765,8 @@ static bool attempt_to_connect(struct sender_state *state) // let the data collection threads know we are ready rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + rrdpush_sender_after_connect(state->host); + return true; } @@ -738,7 +779,13 @@ static bool attempt_to_connect(struct sender_state *state) state->sent_bytes_on_this_connection = 0; // slow re-connection on repeating errors - sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds + usec_t now_ut = now_monotonic_usec(); + usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay; + while(now_ut < end_ut) { + netdata_thread_testcancel(); + sleep_usec(500 * USEC_PER_MS); // seconds + now_ut = now_monotonic_usec(); + } return false; } @@ -757,8 +804,8 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS - SSL *conn = s->host->sender->ssl.conn ; - if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + SSL *conn = s->ssl.conn ; + if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) ret = netdata_ssl_write(conn, chunk, outstanding); else ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); @@ -793,16 +840,18 @@ static ssize_t attempt_read(struct sender_state *s) { ssize_t ret = 0; #ifdef ENABLE_HTTPS - if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { size_t desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired); + ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired); if (ret > 0 ) { s->read_len += (int)ret; return ret; } - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(s->host); + if (ret == -1) { + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + rrdpush_sender_thread_close_socket(s->host); + } return ret; } #endif @@ -852,6 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { pluginsd_function_result_end_to_buffer(wb); sender_commit(s, wb); + sender_thread_buffer_free(); internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", rrdhost_hostname(s->host), s->connected_to, @@ -876,7 +926,7 @@ void execute_commands(struct sender_state *s) { log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); - internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); + // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); char *words[PLUGINSD_MAX_WORDS] = { NULL }; size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); @@ -906,7 +956,7 @@ void execute_commands(struct sender_state *s) { tmp->received_ut = now_realtime_usec(); tmp->sender = s; tmp->transaction = string_strdupz(transaction); - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1); + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); if(code != HTTP_RESP_OK) { @@ -1019,59 +1069,83 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { } } -static void rrdpush_sender_thread_cleanup_callback(void *ptr) { - struct rrdpush_sender_thread_data *data = ptr; - worker_unregister(); - - RRDHOST *host = data->host; +static bool rrdhost_set_sender(RRDHOST *host) { + if(unlikely(!host->sender)) return false; + bool ret = false; netdata_mutex_lock(&host->sender->mutex); + if(!host->sender->tid) { + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + host->sender->tid = gettid(); + ret = true; + } + netdata_mutex_unlock(&host->sender->mutex); + + return ret; +} - info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host)); +static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { + if(unlikely(!host->sender)) return; - rrdpush_sender_thread_close_socket(host); - rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); + if(host->sender->tid == gettid()) { + host->sender->tid = 0; + host->sender->exit.shutdown = false; + host->sender->exit.reason = NULL; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + } +} - if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) { - info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host)); - netdata_thread_detach(netdata_thread_self()); +static bool rrdhost_sender_should_exit(struct sender_state *s) { + // check for outstanding cancellation requests + netdata_thread_testcancel(); + + if(unlikely(!service_running(SERVICE_STREAMING))) { + if(!s->exit.reason) + s->exit.reason = "NETDATA EXIT"; + return true; } - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) { + if(!s->exit.reason) + s->exit.reason = "NON STREAMABLE HOST"; + return true; + } - info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host)); + if(unlikely(s->exit.shutdown)) { + if(!s->exit.reason) + s->exit.reason = "SENDER SHUTDOWN REQUESTED"; + return true; + } - netdata_mutex_unlock(&host->sender->mutex); + if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { + if(!s->exit.reason) + s->exit.reason = "RECEIVER LEFT"; + return true; + } - freez(data->pipe_buffer); - freez(data); + return false; } -void sender_init(RRDHOST *host) -{ - if (host->sender) - return; +static void rrdpush_sender_thread_cleanup_callback(void *ptr) { + struct rrdpush_sender_thread_data *s = ptr; + worker_unregister(); + + RRDHOST *host = s->host; - host->sender = callocz(1, sizeof(*host->sender)); - host->sender->host = host; - host->sender->buffer = cbuffer_new(1024, 1024 * 1024); - host->sender->capabilities = STREAM_OUR_CAPABILITIES; + netdata_mutex_lock(&host->sender->mutex); + info("STREAM %s [send]: sending thread exits %s", + rrdhost_hostname(host), + host->sender->exit.reason ? host->sender->exit.reason : ""); - host->sender->rrdpush_sender_pipe[PIPE_READ] = -1; - host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1; - host->sender->rrdpush_sender_socket = -1; + rrdpush_sender_thread_close_socket(host); + rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); -#ifdef ENABLE_COMPRESSION - if(default_compression_enabled) { - host->sender->flags |= SENDER_FLAG_COMPRESSION; - host->sender->compressor = create_compressor(); - } - else - host->sender->flags &= ~SENDER_FLAG_COMPRESSION; -#endif + rrdhost_clear_sender___while_having_sender_mutex(host); + netdata_mutex_unlock(&host->sender->mutex); - netdata_mutex_init(&host->sender->mutex); - replication_init_sender(host->sender); + freez(s->pipe_buffer); + freez(s); } void *rrdpush_sender_thread(void *ptr) { @@ -1103,13 +1177,18 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); struct sender_state *s = ptr; - s->tid = gettid(); if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", - rrdhost_hostname(s->host), s->tid); + rrdhost_hostname(s->host), gettid()); + return NULL; + } + + if(!rrdhost_set_sender(s->host)) { + error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", + rrdhost_hostname(s->host), gettid()); return NULL; } @@ -1125,7 +1204,7 @@ void *rrdpush_sender_thread(void *ptr) { } #endif - info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid); + info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); s->timeout = (int)appconfig_get_number( &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600); @@ -1166,28 +1245,33 @@ void *rrdpush_sender_thread(void *ptr) { thread_data->sender_state = s; thread_data->host = s->host; - // reset our cleanup flags - rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); - netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data); - for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) { - // check for outstanding cancellation requests - netdata_thread_testcancel(); + size_t iterations = 0; + time_t now_s = now_monotonic_sec(); + while(!rrdhost_sender_should_exit(s)) { + iterations++; // The connection attempt blocks (after which we use the socket in nonblocking) if(unlikely(s->rrdpush_sender_socket == -1)) { worker_is_busy(WORKER_SENDER_JOB_CONNECT); + + now_s = now_monotonic_sec(); + rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); s->flags &= ~SENDER_FLAG_OVERFLOW; s->read_len = 0; s->buffer->read = 0; s->buffer->write = 0; - if(unlikely(!attempt_to_connect(s))) + if(!attempt_to_connect(s)) continue; - s->last_traffic_seen_t = now_monotonic_sec(); + if(rrdhost_sender_should_exit(s)) + break; + + now_s = s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); @@ -1197,8 +1281,11 @@ void *rrdpush_sender_thread(void *ptr) { continue; } + if(iterations % 1000 == 0) + now_s = now_monotonic_sec(); + // If the TCP window never opened then something is wrong, restart connection - if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout && + if(unlikely(now_s - s->last_traffic_seen_t > s->timeout && !rrdpush_sender_pending_replication_requests(s) && !rrdpush_sender_replicating_charts(s) )) { @@ -1209,11 +1296,13 @@ void *rrdpush_sender_thread(void *ptr) { } netdata_mutex_lock(&s->mutex); - size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL); - size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); + size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); + size_t available = cbuffer_available_size_unsafe(s->buffer); + if (unlikely(!outstanding)) + rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); netdata_mutex_unlock(&s->mutex); - worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size); + worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); if(outstanding) s->send_attempts++; @@ -1246,12 +1335,14 @@ void *rrdpush_sender_thread(void *ptr) { .revents = 0, } }; + int poll_rc = poll(fds, 2, 1000); debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); - if(unlikely(netdata_exit)) break; + if(unlikely(rrdhost_sender_should_exit(s))) + break; internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ], "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to); @@ -1261,7 +1352,9 @@ void *rrdpush_sender_thread(void *ptr) { // Spurious wake-ups without error - loop again if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { + netdata_thread_testcancel(); debug(D_STREAM, "Spurious wakeup"); + now_s = now_monotonic_sec(); continue; } |