summaryrefslogtreecommitdiffstats
path: root/src/streaming/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/streaming/protocol')
-rw-r--r--src/streaming/protocol/command-begin-set-end.c126
-rw-r--r--src/streaming/protocol/command-chart-definition.c206
-rw-r--r--src/streaming/protocol/command-claimed_id.c78
-rw-r--r--src/streaming/protocol/command-function.c20
-rw-r--r--src/streaming/protocol/command-host-labels.c25
-rw-r--r--src/streaming/protocol/command-host-variables.c52
-rw-r--r--src/streaming/protocol/command-nodeid.c128
-rw-r--r--src/streaming/protocol/commands.c58
-rw-r--r--src/streaming/protocol/commands.h41
9 files changed, 734 insertions, 0 deletions
diff --git a/src/streaming/protocol/command-begin-set-end.c b/src/streaming/protocol/command-begin-set-end.c
new file mode 100644
index 000000000..17daef776
--- /dev/null
+++ b/src/streaming/protocol/command-begin-set-end.c
@@ -0,0 +1,126 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) {
+ buffer_fast_strcat(wb, "BEGIN \"", 7);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+
+ if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s)
+ buffer_print_uint64(wb, st->usec_since_last_update);
+ else
+ buffer_fast_strcat(wb, "0", 1);
+
+ buffer_fast_strcat(wb, "\n", 1);
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rrddim_check_updated(rd)))
+ continue;
+
+ if(likely(rrddim_check_upstream_exposed_collector(rd))) {
+ buffer_fast_strcat(wb, "SET \"", 5);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "\" = ", 4);
+ buffer_print_int64(wb, rd->collector.collected_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
+ else {
+ internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+ // we will include it in the next iteration
+ rrddim_metadata_updated(rd);
+ }
+ }
+ rrddim_foreach_done(rd);
+
+ if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ buffer_fast_strcat(wb, "END\n", 4);
+}
+
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+ rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
+}
+
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
+ return;
+
+ bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ BUFFER *wb = rsb->wb;
+ time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
+ if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
+
+ if(unlikely(rsb->begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
+ buffer_fast_strcat(wb, " ", 1);
+ if(point_end_time_s == rsb->wall_clock_time)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
+
+ rsb->last_point_end_time_s = point_end_time_s;
+ rsb->begin_v2_added = true;
+ }
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+
+ if((NETDATA_DOUBLE)rd->collector.last_collected_value == n)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
+
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+}
+
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ if(!rsb->wb)
+ return;
+
+ if(rsb->v2 && rsb->begin_v2_added) {
+ if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
+
+ buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+ }
+
+ sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA);
+
+ *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
+}
+
diff --git a/src/streaming/protocol/command-chart-definition.c b/src/streaming/protocol/command-chart-definition.c
new file mode 100644
index 000000000..864d13242
--- /dev/null
+++ b/src/streaming/protocol/command-chart-definition.c
@@ -0,0 +1,206 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL " \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL));
+ return 1;
+}
+
+static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
+ if (st->rrdlabels) {
+ if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL_COMMIT "\n");
+ }
+}
+
+// Send the current chart definition.
+// Assumes that collector thread has already called sender_start for mutex / buffer state.
+bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
+ uint32_t version = rrdset_metadata_version(st);
+
+ RRDHOST *host = st->rrdhost;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false;
+
+ bool replication_progress = false;
+
+ // properly set the name for the remote end to parse it
+ char *name = "";
+ if(likely(st->name)) {
+ if(unlikely(st->id != st->name)) {
+ // they differ
+ name = strchr(rrdset_name(st), '.');
+ if(name)
+ name++;
+ else
+ name = "";
+ }
+ }
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
+ // send the chart
+ buffer_sprintf(
+ wb
+ , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\" \"%s\" \"%s\"\n"
+ , rrdset_id(st)
+ , name
+ , rrdset_title(st)
+ , rrdset_units(st)
+ , rrdset_family(st)
+ , rrdset_context(st)
+ , rrdset_type_name(st->chart_type)
+ , st->priority
+ , st->update_every
+ , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
+ , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
+ , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
+ , rrdset_plugin_name(st)
+ , rrdset_module_name(st)
+ );
+
+ // send the chart labels
+ if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
+ rrdpush_send_clabels(wb, st);
+
+ // send the dimensions
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_sprintf(
+ wb
+ , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
+ , rrddim_id(rd)
+ , rrddim_name(rd)
+ , rrd_algorithm_name(rd->algorithm)
+ , rd->multiplier
+ , rd->divisor
+ , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ // send the chart functions
+ if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ rrd_chart_functions_expose_rrdpush(st, wb);
+
+ // send the chart local custom variables
+ rrdvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
+ time_t db_first_time_t, db_last_time_t;
+
+ time_t now = now_realtime_sec();
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
+ (unsigned long long)db_first_time_t,
+ (unsigned long long)db_last_time_t,
+ (unsigned long long)now);
+
+ if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ }
+ replication_progress = true;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ // we can set the exposed flag, after we commit the buffer
+ // because replication may pick it up prematurely
+ rrddim_foreach_read(rd, st) {
+ rrddim_metadata_exposed_upstream(rd, version);
+ }
+ rrddim_foreach_done(rd);
+ rrdset_metadata_exposed_upstream(st, version);
+
+ st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (stream_conf_initial_clock_resync_iterations * st->update_every);
+ return replication_progress;
+}
+
+bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
+ if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+ return false;
+
+ if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
+ RRDHOST *host = st->rrdhost;
+
+ if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
+ if(ml_streaming_enabled())
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ else {
+ int negative = 0, positive = 0;
+ SIMPLE_PATTERN_RESULT r;
+
+ r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->context, NULL, 0);
+ if(r == SP_MATCHED_POSITIVE) positive++;
+ else if(r == SP_MATCHED_NEGATIVE) negative++;
+
+ if(!negative) {
+ r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->name, NULL, 0);
+ if (r == SP_MATCHED_POSITIVE) positive++;
+ else if (r == SP_MATCHED_NEGATIVE) negative++;
+ }
+
+ if(!negative) {
+ r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->id, NULL, 0);
+ if (r == SP_MATCHED_POSITIVE) positive++;
+ else if (r == SP_MATCHED_NEGATIVE) negative++;
+ }
+
+ if(!negative && positive)
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+
+ // get the flags again, to know how to respond
+ flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+
+ return flags & RRDSET_FLAG_UPSTREAM_SEND;
+}
+
+// Called from the internal collectors to mark a chart obsolete.
+bool rrdset_push_chart_definition_now(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !should_send_chart_matching(st, rrdset_flag_get(st)))) {
+ return false;
+ }
+
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_send_chart_definition(wb, st);
+ sender_thread_buffer_free();
+
+ return true;
+}
+
diff --git a/src/streaming/protocol/command-claimed_id.c b/src/streaming/protocol/command-claimed_id.c
new file mode 100644
index 000000000..5392e1d3b
--- /dev/null
+++ b/src/streaming/protocol/command-claimed_id.c
@@ -0,0 +1,78 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+PARSER_RC rrdpush_receiver_pluginsd_claimed_id(char **words, size_t num_words, PARSER *parser) {
+ const char *machine_guid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
+
+ if (!machine_guid_str || !claim_id_str) {
+ netdata_log_error("PLUGINSD: command CLAIMED_ID came malformed, machine_guid '%s', claim_id '%s'",
+ machine_guid_str ? machine_guid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
+
+ RRDHOST *host = parser->user.host;
+
+ nd_uuid_t machine_uuid;
+ if(uuid_parse(machine_guid_str, machine_uuid)) {
+ netdata_log_error("PLUGINSD: parameter machine guid to CLAIMED_ID command is not valid UUID. "
+ "Received: '%s'.", machine_guid_str);
+ return PARSER_RC_ERROR;
+ }
+
+ nd_uuid_t claim_uuid;
+ if(strcmp(claim_id_str, "NULL") == 0)
+ uuid_clear(claim_uuid);
+
+ else if(uuid_parse(claim_id_str, claim_uuid) != 0) {
+ netdata_log_error("PLUGINSD: parameter claim id to CLAIMED_ID command is not valid UUID. "
+ "Received: '%s'.", claim_id_str);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(machine_guid_str, host->machine_guid) != 0) {
+ netdata_log_error("PLUGINSD: received claim id for host '%s' but it came over the connection of '%s'",
+ machine_guid_str, host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewhere else
+ }
+
+ if(host == localhost) {
+ netdata_log_error("PLUGINSD: CLAIMED_ID command cannot be used to set the claimed id of localhost. "
+ "Received: '%s'.", claim_id_str);
+ return PARSER_RC_OK;
+ }
+
+ if(!uuid_is_null(claim_uuid)) {
+ uuid_copy(host->aclk.claim_id_of_origin.uuid, claim_uuid);
+ rrdpush_sender_send_claimed_id(host);
+ }
+
+ return PARSER_RC_OK;
+}
+
+void rrdpush_sender_send_claimed_id(RRDHOST *host) {
+ if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ char str[UUID_STR_LEN] = "";
+ ND_UUID uuid = host->aclk.claim_id_of_origin;
+ if(!UUIDiszero(uuid))
+ uuid_unparse_lower(uuid.uuid, str);
+ else
+ strncpyz(str, "NULL", sizeof(str) - 1);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CLAIMED_ID " '%s' '%s'\n",
+ host->machine_guid, str);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
diff --git a/src/streaming/protocol/command-function.c b/src/streaming/protocol/command-function.c
new file mode 100644
index 000000000..d9b28eb4e
--- /dev/null
+++ b/src/streaming/protocol/command-function.c
@@ -0,0 +1,20 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+void rrdpush_send_global_functions(RRDHOST *host) {
+ if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
+
+ sender_thread_buffer_free();
+}
diff --git a/src/streaming/protocol/command-host-labels.c b/src/streaming/protocol/command-host-labels.c
new file mode 100644
index 000000000..7c2a2d0dd
--- /dev/null
+++ b/src/streaming/protocol/command-host-labels.c
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
+
+void rrdpush_send_host_labels(RRDHOST *host) {
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
+ buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
diff --git a/src/streaming/protocol/command-host-variables.c b/src/streaming/protocol/command-host-variables.c
new file mode 100644
index 000000000..83e4990d6
--- /dev/null
+++ b/src/streaming/protocol/command-host-variables.c
@@ -0,0 +1,52 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
+ buffer_sprintf(
+ wb
+ , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
+ , rrdvar_name(rva)
+ , rrdvar2number(rva)
+ );
+
+ netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
+}
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer(wb, rva);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_thread_buffer_free();
+ }
+}
+
+struct custom_host_variables_callback {
+ BUFFER *wb;
+};
+
+static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
+ const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
+ struct custom_host_variables_callback *tmp = struct_ptr;
+ BUFFER *wb = tmp->wb;
+
+ rrdpush_sender_add_host_variable_to_buffer(wb, rv);
+ return 1;
+}
+
+void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ struct custom_host_variables_callback tmp = {
+ .wb = wb
+ };
+ int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
+ (void)ret;
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_thread_buffer_free();
+
+ netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ }
+}
diff --git a/src/streaming/protocol/command-nodeid.c b/src/streaming/protocol/command-nodeid.c
new file mode 100644
index 000000000..85ace83c8
--- /dev/null
+++ b/src/streaming/protocol/command-nodeid.c
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+#include "plugins.d/pluginsd_internals.h"
+
+// the child disconnected from the parent, and it has to clear the parent's claim id
+void rrdpush_sender_clear_parent_claim_id(RRDHOST *host) {
+ host->aclk.claim_id_of_parent = UUID_ZERO;
+}
+
+// the parent sends to the child its claim id, node id and cloud url
+void rrdpush_receiver_send_node_and_claim_id_to_child(RRDHOST *host) {
+ if(host == localhost || UUIDiszero(host->node_id)) return;
+
+ spinlock_lock(&host->receiver_lock);
+ if(host->receiver && stream_has_capability(host->receiver, STREAM_CAP_NODE_ID)) {
+ char node_id_str[UUID_STR_LEN] = "";
+ uuid_unparse_lower(host->node_id.uuid, node_id_str);
+
+ CLAIM_ID claim_id = claim_id_get();
+
+ if((!claim_id_is_set(claim_id) || !aclk_online())) {
+ // the agent is not claimed or not connected, just use parent claim id
+ // to allow the connection flow.
+ // this may be zero and it is ok.
+ claim_id.uuid = host->aclk.claim_id_of_parent;
+ uuid_unparse_lower(claim_id.uuid.uuid, claim_id.str);
+ }
+
+ char buf[4096];
+ snprintfz(buf, sizeof(buf),
+ PLUGINSD_KEYWORD_NODE_ID " '%s' '%s' '%s'\n",
+ claim_id.str, node_id_str, cloud_config_url_get());
+
+ send_to_plugin(buf, __atomic_load_n(&host->receiver->parser, __ATOMIC_RELAXED));
+ }
+ spinlock_unlock(&host->receiver_lock);
+}
+
+// the sender of the child receives node id, claim id and cloud url from the receiver of the parent
+void rrdpush_sender_get_node_and_claim_id_from_parent(struct sender_state *s) {
+ char *claim_id_str = get_word(s->line.words, s->line.num_words, 1);
+ char *node_id_str = get_word(s->line.words, s->line.num_words, 2);
+ char *url = get_word(s->line.words, s->line.num_words, 3);
+
+ bool claimed = is_agent_claimed();
+ bool update_node_id = false;
+
+ ND_UUID claim_id;
+ if (uuid_parse(claim_id_str ? claim_id_str : "", claim_id.uuid) != 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s] received invalid claim id '%s'",
+ rrdhost_hostname(s->host), s->connected_to,
+ claim_id_str ? claim_id_str : "(unset)");
+ return;
+ }
+
+ ND_UUID node_id;
+ if(uuid_parse(node_id_str ? node_id_str : "", node_id.uuid) != 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s] received an invalid node id '%s'",
+ rrdhost_hostname(s->host), s->connected_to,
+ node_id_str ? node_id_str : "(unset)");
+ return;
+ }
+
+ if (!UUIDiszero(s->host->aclk.claim_id_of_parent) && !UUIDeq(s->host->aclk.claim_id_of_parent, claim_id))
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "STREAM %s [send to %s] changed parent's claim id to %s",
+ rrdhost_hostname(s->host), s->connected_to,
+ claim_id_str ? claim_id_str : "(unset)");
+
+ if(!UUIDiszero(s->host->node_id) && !UUIDeq(s->host->node_id, node_id)) {
+ if(claimed) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s] parent reports different node id '%s', but we are claimed. Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ node_id_str ? node_id_str : "(unset)");
+ return;
+ }
+ else {
+ update_node_id = true;
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s] changed node id to %s",
+ rrdhost_hostname(s->host), s->connected_to,
+ node_id_str ? node_id_str : "(unset)");
+ }
+ }
+
+ if(!url || !*url) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s] received an invalid cloud URL '%s'",
+ rrdhost_hostname(s->host), s->connected_to,
+ url ? url : "(unset)");
+ return;
+ }
+
+ s->host->aclk.claim_id_of_parent = claim_id;
+
+ // There are some very strange corner cases here:
+ //
+ // - Agent is claimed but offline, and it receives node_id and cloud_url from a different Netdata Cloud.
+ // - Agent is configured to talk to an on-prem Netdata Cloud, it is offline, but the parent is connected
+ // to a different Netdata Cloud.
+ //
+ // The solution below, tries to get the agent online, using the latest information.
+ // So, if the agent is not claimed or not connected, we inherit whatever information sent from the parent,
+ // to allow the user to work with it.
+
+ if(claimed && aclk_online())
+ // we are directly claimed and connected, ignore node id and cloud url
+ return;
+
+ bool node_id_updated = false;
+ if(UUIDiszero(s->host->node_id) || update_node_id) {
+ s->host->node_id = node_id;
+ node_id_updated = true;
+ }
+
+ // we change the URL, to allow the agent dashboard to work with Netdata Cloud on-prem, if any.
+ cloud_config_url_set(url);
+
+ // send it down the line (to children)
+ rrdpush_receiver_send_node_and_claim_id_to_child(s->host);
+
+ if(node_id_updated)
+ stream_path_node_id_updated(s->host);
+}
diff --git a/src/streaming/protocol/commands.c b/src/streaming/protocol/commands.c
new file mode 100644
index 000000000..e9e16bdac
--- /dev/null
+++ b/src/streaming/protocol/commands.c
@@ -0,0 +1,58 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
+ RRDHOST *host = st->rrdhost;
+
+ // fetch the flags we need to check with one atomic operation
+ RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
+
+ // check if we are not connected
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
+
+ if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
+ rrdpush_sender_thread_spawn(host);
+
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ }
+
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+ }
+ else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ }
+
+ if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
+ }
+
+ bool exposed_upstream = rrdset_check_upstream_exposed(st);
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st);
+ bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ if(unlikely((exposed_upstream && replication_in_progress) ||
+ !should_send_chart_matching(st, rrdset_flags)))
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+
+ if(unlikely(!exposed_upstream)) {
+ BUFFER *wb = sender_start(host->sender);
+ replication_in_progress = rrdpush_send_chart_definition(wb, st);
+ }
+
+ if(replication_in_progress)
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+
+ return (RRDSET_STREAM_BUFFER) {
+ .capabilities = host->sender->capabilities,
+ .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
+ .rrdset_flags = rrdset_flags,
+ .wb = sender_start(host->sender),
+ .wall_clock_time = wall_clock_time,
+ };
+}
diff --git a/src/streaming/protocol/commands.h b/src/streaming/protocol/commands.h
new file mode 100644
index 000000000..81344175c
--- /dev/null
+++ b/src/streaming/protocol/commands.h
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_STREAMING_PROTCOL_COMMANDS_H
+#define NETDATA_STREAMING_PROTCOL_COMMANDS_H
+
+#include "database/rrd.h"
+#include "../rrdpush.h"
+
+typedef struct rrdset_stream_buffer {
+ STREAM_CAPABILITIES capabilities;
+ bool v2;
+ bool begin_v2_added;
+ time_t wall_clock_time;
+ RRDSET_FLAGS rrdset_flags;
+ time_t last_point_end_time_s;
+ BUFFER *wb;
+} RRDSET_STREAM_BUFFER;
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
+
+void rrdpush_sender_get_node_and_claim_id_from_parent(struct sender_state *s);
+void rrdpush_receiver_send_node_and_claim_id_to_child(RRDHOST *host);
+void rrdpush_sender_clear_parent_claim_id(RRDHOST *host);
+
+void rrdpush_sender_send_claimed_id(RRDHOST *host);
+
+void rrdpush_send_global_functions(RRDHOST *host);
+void rrdpush_send_host_labels(RRDHOST *host);
+
+void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host);
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
+
+bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st);
+bool rrdset_push_chart_definition_now(RRDSET *st);
+bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags);
+
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+
+#endif //NETDATA_STREAMING_PROTCOL_COMMANDS_H