summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/sender.c31
1 files changed, 23 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 854b57fc..179c2dc6 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
#define WORKER_SENDER_JOB_CONNECT 0
#define WORKER_SENDER_JOB_PIPE_READ 1
@@ -104,6 +103,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
+// FILE *fp = fopen("/tmp/stream.txt", "a");
+// fprintf(fp,
+// "\n--- SEND BEGIN: %s ----\n"
+// "%s"
+// "--- SEND END ----------------------------------------\n"
+// , rrdhost_hostname(s->host), src);
+// fclose(fp);
+
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
@@ -171,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
replication_recalculate_buffer_used_ratio_unsafe(s);
+ bool signal_sender = false;
+ if(!rrdpush_sender_pipe_has_pending_data(s)) {
+ rrdpush_sender_pipe_set_pending_data(s);
+ signal_sender = true;
+ }
+
netdata_mutex_unlock(&s->mutex);
- rrdpush_signal_sender_to_wake_up(s);
+
+ if(signal_sender)
+ rrdpush_signal_sender_to_wake_up(s);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -522,7 +537,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#endif
// reset our capabilities to default
- s->capabilities = STREAM_OUR_CAPABILITIES;
+ s->capabilities = stream_our_capabilities();
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
@@ -894,7 +909,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_begin_to_buffer(wb
, string2str(tmp->transaction)
, code
- , functions_content_type_to_format(func_wb->contenttype)
+ , functions_content_type_to_format(func_wb->content_type)
, func_wb->expires);
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
@@ -929,7 +944,7 @@ void execute_commands(struct sender_state *s) {
// internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
@@ -1009,7 +1024,6 @@ void execute_commands(struct sender_state *s) {
}
struct rrdpush_sender_thread_data {
- struct sender_state *sender_state;
RRDHOST *host;
char *pipe_buffer;
};
@@ -1242,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) {
struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
thread_data->pipe_buffer = mallocz(pipe_buffer_size);
- thread_data->sender_state = s;
thread_data->host = s->host;
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
@@ -1298,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) {
netdata_mutex_lock(&s->mutex);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
- if (unlikely(!outstanding))
+ if (unlikely(!outstanding)) {
+ rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
+ }
netdata_mutex_unlock(&s->mutex);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);