diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/sender.c | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 854b57fc5..179c2dc60 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" -#include "parser/parser.h" #define WORKER_SENDER_JOB_CONNECT 0 #define WORKER_SENDER_JOB_PIPE_READ 1 @@ -104,6 +103,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { netdata_mutex_lock(&s->mutex); +// FILE *fp = fopen("/tmp/stream.txt", "a"); +// fprintf(fp, +// "\n--- SEND BEGIN: %s ----\n" +// "%s" +// "--- SEND END ----------------------------------------\n" +// , rrdhost_hostname(s->host), src); +// fclose(fp); + if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); @@ -171,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { replication_recalculate_buffer_used_ratio_unsafe(s); + bool signal_sender = false; + if(!rrdpush_sender_pipe_has_pending_data(s)) { + rrdpush_sender_pipe_set_pending_data(s); + signal_sender = true; + } + netdata_mutex_unlock(&s->mutex); - rrdpush_signal_sender_to_wake_up(s); + + if(signal_sender) + rrdpush_signal_sender_to_wake_up(s); } static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { @@ -522,7 +537,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p #endif // reset our capabilities to default - s->capabilities = STREAM_OUR_CAPABILITIES; + s->capabilities = stream_our_capabilities(); #ifdef ENABLE_COMPRESSION // If we don't want compression, remove it from our capabilities @@ -894,7 +909,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { pluginsd_function_result_begin_to_buffer(wb , string2str(tmp->transaction) , code - , functions_content_type_to_format(func_wb->contenttype) + , functions_content_type_to_format(func_wb->content_type) , func_wb->expires); buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); @@ -929,7 +944,7 @@ void execute_commands(struct sender_state *s) { // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS); const char *keyword = get_word(words, num_words, 0); @@ -1009,7 +1024,6 @@ void execute_commands(struct sender_state *s) { } struct rrdpush_sender_thread_data { - struct sender_state *sender_state; RRDHOST *host; char *pipe_buffer; }; @@ -1242,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) { struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data)); thread_data->pipe_buffer = mallocz(pipe_buffer_size); - thread_data->sender_state = s; thread_data->host = s->host; netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data); @@ -1298,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) { netdata_mutex_lock(&s->mutex); size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); size_t available = cbuffer_available_size_unsafe(s->buffer); - if (unlikely(!outstanding)) + if (unlikely(!outstanding)) { + rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); + } netdata_mutex_unlock(&s->mutex); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); |