diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/receiver.c | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 95652942e..ff7a95629 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" -#include "parser/parser.h" // IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly #define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) @@ -340,10 +339,14 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i .host = rpt->host, .opaque = rpt, .cd = cd, - .trust_durations = 1 + .trust_durations = 1, + .capabilities = rpt->capabilities, }; - PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + PARSER *parser = parser_init(&user, NULL, NULL, fd, + PARSER_INPUT_SPLIT, ssl); + + pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); rrd_collector_started(); @@ -416,7 +419,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i } done: - result = user.count; + result = user.data_collections_count; // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -434,7 +437,7 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) { rrdhost_receiver_replicating_charts_zero(host); } -bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { +static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; @@ -469,6 +472,7 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { rrdpush_receiver_replication_reset(host); rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + aclk_queue_node_info(rpt->host, true); set_this = true; } @@ -689,6 +693,12 @@ static int rrdpush_receive(struct receiver_state *rpt) return 1; } + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { + rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); + close(rpt->fd); + return 1; + } + // system_info has been consumed by the host structure rpt->system_info = NULL; @@ -724,16 +734,12 @@ static int rrdpush_receive(struct receiver_state *rpt) struct plugind cd = { .update_every = default_rrd_update_every, - .serial_failures = 0, - .successful_collections = 0, .unsafe = { .spinlock = NETDATA_SPINLOCK_INITIALIZER, .running = true, .enabled = true, }, .started_t = now_realtime_sec(), - .next = NULL, - .capabilities = 0, }; // put the client IP and port into the buffers used by plugins.d @@ -804,8 +810,6 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); - cd.capabilities = rpt->capabilities; - #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected |