summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c26
1 files changed, 15 insertions, 11 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 95652942..ff7a9562 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
@@ -340,10 +339,14 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.host = rpt->host,
.opaque = rpt,
.cd = cd,
- .trust_durations = 1
+ .trust_durations = 1,
+ .capabilities = rpt->capabilities,
};
- PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ PARSER *parser = parser_init(&user, NULL, NULL, fd,
+ PARSER_INPUT_SPLIT, ssl);
+
+ pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
rrd_collector_started();
@@ -416,7 +419,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
}
done:
- result = user.count;
+ result = user.data_collections_count;
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -434,7 +437,7 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
-bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
+static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
@@ -469,6 +472,7 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdpush_receiver_replication_reset(host);
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+ aclk_queue_node_info(rpt->host, true);
set_this = true;
}
@@ -689,6 +693,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
return 1;
}
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
+ rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ close(rpt->fd);
+ return 1;
+ }
+
// system_info has been consumed by the host structure
rpt->system_info = NULL;
@@ -724,16 +734,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
struct plugind cd = {
.update_every = default_rrd_update_every,
- .serial_failures = 0,
- .successful_collections = 0,
.unsafe = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.running = true,
.enabled = true,
},
.started_t = now_realtime_sec(),
- .next = NULL,
- .capabilities = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -804,8 +810,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
- cd.capabilities = rpt->capabilities;
-
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected