diff options
Diffstat (limited to 'src/streaming/protocol')
-rw-r--r-- | src/streaming/protocol/command-begin-set-end.c | 126 | ||||
-rw-r--r-- | src/streaming/protocol/command-chart-definition.c | 206 | ||||
-rw-r--r-- | src/streaming/protocol/command-claimed_id.c | 78 | ||||
-rw-r--r-- | src/streaming/protocol/command-function.c | 20 | ||||
-rw-r--r-- | src/streaming/protocol/command-host-labels.c | 25 | ||||
-rw-r--r-- | src/streaming/protocol/command-host-variables.c | 52 | ||||
-rw-r--r-- | src/streaming/protocol/command-nodeid.c | 128 | ||||
-rw-r--r-- | src/streaming/protocol/commands.c | 58 | ||||
-rw-r--r-- | src/streaming/protocol/commands.h | 41 |
9 files changed, 734 insertions, 0 deletions
diff --git a/src/streaming/protocol/command-begin-set-end.c b/src/streaming/protocol/command-begin-set-end.c new file mode 100644 index 000000000..17daef776 --- /dev/null +++ b/src/streaming/protocol/command-begin-set-end.c @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { + buffer_fast_strcat(wb, "BEGIN \"", 7); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "\" ", 2); + + if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s) + buffer_print_uint64(wb, st->usec_since_last_update); + else + buffer_fast_strcat(wb, "0", 1); + + buffer_fast_strcat(wb, "\n", 1); + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(unlikely(!rrddim_check_updated(rd))) + continue; + + if(likely(rrddim_check_upstream_exposed_collector(rd))) { + buffer_fast_strcat(wb, "SET \"", 5); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "\" = ", 4); + buffer_print_int64(wb, rd->collector.collected_value); + buffer_fast_strcat(wb, "\n", 1); + } + else { + internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); + // we will include it in the next iteration + rrddim_metadata_updated(rd); + } + } + rrddim_foreach_done(rd); + + if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdvar_print_to_streaming_custom_chart_variables(st, wb); + + buffer_fast_strcat(wb, "END\n", 4); +} + +void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + RRDHOST *host = st->rrdhost; + rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags); +} + +void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) { + if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) + return; + + bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + BUFFER *wb = rsb->wb; + time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC); + if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) { + + if(unlikely(rsb->begin_v2_added)) + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s); + buffer_fast_strcat(wb, " ", 1); + if(point_end_time_s == rsb->wall_clock_time) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); + + rsb->last_point_end_time_s = point_end_time_s; + rsb->begin_v2_added = true; + } + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); + buffer_fast_strcat(wb, " ", 1); + + if((NETDATA_DOUBLE)rd->collector.last_collected_value == n) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_netdata_double_encoded(wb, doubles_encoding, n); + + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, flags, true); + buffer_fast_strcat(wb, "\n", 1); +} + +void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + if(!rsb->wb) + return; + + if(rsb->v2 && rsb->begin_v2_added) { + if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb); + + buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + } + + sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA); + + *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; +} + diff --git a/src/streaming/protocol/command-chart-definition.c b/src/streaming/protocol/command-chart-definition.c new file mode 100644 index 000000000..864d13242 --- /dev/null +++ b/src/streaming/protocol/command-chart-definition.c @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +// chart labels +static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL " \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL)); + return 1; +} + +static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { + if (st->rrdlabels) { + if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0) + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL_COMMIT "\n"); + } +} + +// Send the current chart definition. +// Assumes that collector thread has already called sender_start for mutex / buffer state. +bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { + uint32_t version = rrdset_metadata_version(st); + + RRDHOST *host = st->rrdhost; + NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false; + + bool replication_progress = false; + + // properly set the name for the remote end to parse it + char *name = ""; + if(likely(st->name)) { + if(unlikely(st->id != st->name)) { + // they differ + name = strchr(rrdset_name(st), '.'); + if(name) + name++; + else + name = ""; + } + } + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + + // send the chart + buffer_sprintf( + wb + , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\" \"%s\" \"%s\"\n" + , rrdset_id(st) + , name + , rrdset_title(st) + , rrdset_units(st) + , rrdset_family(st) + , rrdset_context(st) + , rrdset_type_name(st->chart_type) + , st->priority + , st->update_every + , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":"" + , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" + , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":"" + , rrdset_plugin_name(st) + , rrdset_module_name(st) + ); + + // send the chart labels + if (stream_has_capability(host->sender, STREAM_CAP_CLABELS)) + rrdpush_send_clabels(wb, st); + + // send the dimensions + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_sprintf( + wb + , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" + , rrddim_id(rd) + , rrddim_name(rd) + , rrd_algorithm_name(rd->algorithm) + , rd->multiplier + , rd->divisor + , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" + , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" + , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" + ); + } + rrddim_foreach_done(rd); + + // send the chart functions + if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + rrd_chart_functions_expose_rrdpush(st, wb); + + // send the chart local custom variables + rrdvar_print_to_streaming_custom_chart_variables(st, wb); + + if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { + time_t db_first_time_t, db_last_time_t; + + time_t now = now_realtime_sec(); + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n", + (unsigned long long)db_first_time_t, + (unsigned long long)db_last_time_t, + (unsigned long long)now); + + if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + } + replication_progress = true; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + // we can set the exposed flag, after we commit the buffer + // because replication may pick it up prematurely + rrddim_foreach_read(rd, st) { + rrddim_metadata_exposed_upstream(rd, version); + } + rrddim_foreach_done(rd); + rrdset_metadata_exposed_upstream(st, version); + + st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (stream_conf_initial_clock_resync_iterations * st->update_every); + return replication_progress; +} + +bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { + if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) + return false; + + if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) { + RRDHOST *host = st->rrdhost; + + if (flags & RRDSET_FLAG_ANOMALY_DETECTION) { + if(ml_streaming_enabled()) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + else { + int negative = 0, positive = 0; + SIMPLE_PATTERN_RESULT r; + + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->context, NULL, 0); + if(r == SP_MATCHED_POSITIVE) positive++; + else if(r == SP_MATCHED_NEGATIVE) negative++; + + if(!negative) { + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->name, NULL, 0); + if (r == SP_MATCHED_POSITIVE) positive++; + else if (r == SP_MATCHED_NEGATIVE) negative++; + } + + if(!negative) { + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->id, NULL, 0); + if (r == SP_MATCHED_POSITIVE) positive++; + else if (r == SP_MATCHED_NEGATIVE) negative++; + } + + if(!negative && positive) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + + // get the flags again, to know how to respond + flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); + } + + return flags & RRDSET_FLAG_UPSTREAM_SEND; +} + +// Called from the internal collectors to mark a chart obsolete. +bool rrdset_push_chart_definition_now(RRDSET *st) { + RRDHOST *host = st->rrdhost; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !should_send_chart_matching(st, rrdset_flag_get(st)))) { + return false; + } + + BUFFER *wb = sender_start(host->sender); + rrdpush_send_chart_definition(wb, st); + sender_thread_buffer_free(); + + return true; +} + diff --git a/src/streaming/protocol/command-claimed_id.c b/src/streaming/protocol/command-claimed_id.c new file mode 100644 index 000000000..5392e1d3b --- /dev/null +++ b/src/streaming/protocol/command-claimed_id.c @@ -0,0 +1,78 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +PARSER_RC rrdpush_receiver_pluginsd_claimed_id(char **words, size_t num_words, PARSER *parser) { + const char *machine_guid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); + + if (!machine_guid_str || !claim_id_str) { + netdata_log_error("PLUGINSD: command CLAIMED_ID came malformed, machine_guid '%s', claim_id '%s'", + machine_guid_str ? machine_guid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } + + RRDHOST *host = parser->user.host; + + nd_uuid_t machine_uuid; + if(uuid_parse(machine_guid_str, machine_uuid)) { + netdata_log_error("PLUGINSD: parameter machine guid to CLAIMED_ID command is not valid UUID. " + "Received: '%s'.", machine_guid_str); + return PARSER_RC_ERROR; + } + + nd_uuid_t claim_uuid; + if(strcmp(claim_id_str, "NULL") == 0) + uuid_clear(claim_uuid); + + else if(uuid_parse(claim_id_str, claim_uuid) != 0) { + netdata_log_error("PLUGINSD: parameter claim id to CLAIMED_ID command is not valid UUID. " + "Received: '%s'.", claim_id_str); + return PARSER_RC_ERROR; + } + + if(strcmp(machine_guid_str, host->machine_guid) != 0) { + netdata_log_error("PLUGINSD: received claim id for host '%s' but it came over the connection of '%s'", + machine_guid_str, host->machine_guid); + return PARSER_RC_OK; //the message is OK problem must be somewhere else + } + + if(host == localhost) { + netdata_log_error("PLUGINSD: CLAIMED_ID command cannot be used to set the claimed id of localhost. " + "Received: '%s'.", claim_id_str); + return PARSER_RC_OK; + } + + if(!uuid_is_null(claim_uuid)) { + uuid_copy(host->aclk.claim_id_of_origin.uuid, claim_uuid); + rrdpush_sender_send_claimed_id(host); + } + + return PARSER_RC_OK; +} + +void rrdpush_sender_send_claimed_id(RRDHOST *host) { + if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) + return; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) + return; + + BUFFER *wb = sender_start(host->sender); + + char str[UUID_STR_LEN] = ""; + ND_UUID uuid = host->aclk.claim_id_of_origin; + if(!UUIDiszero(uuid)) + uuid_unparse_lower(uuid.uuid, str); + else + strncpyz(str, "NULL", sizeof(str) - 1); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLAIMED_ID " '%s' '%s'\n", + host->machine_guid, str); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} diff --git a/src/streaming/protocol/command-function.c b/src/streaming/protocol/command-function.c new file mode 100644 index 000000000..d9b28eb4e --- /dev/null +++ b/src/streaming/protocol/command-function.c @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +void rrdpush_send_global_functions(RRDHOST *host) { + if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + return; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) + return; + + BUFFER *wb = sender_start(host->sender); + + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); + + sender_thread_buffer_free(); +} diff --git a/src/streaming/protocol/command-host-labels.c b/src/streaming/protocol/command-host-labels.c new file mode 100644 index 000000000..7c2a2d0dd --- /dev/null +++ b/src/streaming/protocol/command-host-labels.c @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); + return 1; +} + +void rrdpush_send_host_labels(RRDHOST *host) { + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !stream_has_capability(host->sender, STREAM_CAP_HLABELS))) + return; + + BUFFER *wb = sender_start(host->sender); + + rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb); + buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} diff --git a/src/streaming/protocol/command-host-variables.c b/src/streaming/protocol/command-host-variables.c new file mode 100644 index 000000000..83e4990d6 --- /dev/null +++ b/src/streaming/protocol/command-host-variables.c @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { + buffer_sprintf( + wb + , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n" + , rrdvar_name(rva) + , rrdvar2number(rva) + ); + + netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); +} + +void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) { + if(rrdhost_can_send_definitions_to_parent(host)) { + BUFFER *wb = sender_start(host->sender); + rrdpush_sender_add_host_variable_to_buffer(wb, rva); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_thread_buffer_free(); + } +} + +struct custom_host_variables_callback { + BUFFER *wb; +}; + +static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) { + const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item; + struct custom_host_variables_callback *tmp = struct_ptr; + BUFFER *wb = tmp->wb; + + rrdpush_sender_add_host_variable_to_buffer(wb, rv); + return 1; +} + +void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { + if(rrdhost_can_send_definitions_to_parent(host)) { + BUFFER *wb = sender_start(host->sender); + struct custom_host_variables_callback tmp = { + .wb = wb + }; + int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); + (void)ret; + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_thread_buffer_free(); + + netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); + } +} diff --git a/src/streaming/protocol/command-nodeid.c b/src/streaming/protocol/command-nodeid.c new file mode 100644 index 000000000..85ace83c8 --- /dev/null +++ b/src/streaming/protocol/command-nodeid.c @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +// the child disconnected from the parent, and it has to clear the parent's claim id +void rrdpush_sender_clear_parent_claim_id(RRDHOST *host) { + host->aclk.claim_id_of_parent = UUID_ZERO; +} + +// the parent sends to the child its claim id, node id and cloud url +void rrdpush_receiver_send_node_and_claim_id_to_child(RRDHOST *host) { + if(host == localhost || UUIDiszero(host->node_id)) return; + + spinlock_lock(&host->receiver_lock); + if(host->receiver && stream_has_capability(host->receiver, STREAM_CAP_NODE_ID)) { + char node_id_str[UUID_STR_LEN] = ""; + uuid_unparse_lower(host->node_id.uuid, node_id_str); + + CLAIM_ID claim_id = claim_id_get(); + + if((!claim_id_is_set(claim_id) || !aclk_online())) { + // the agent is not claimed or not connected, just use parent claim id + // to allow the connection flow. + // this may be zero and it is ok. + claim_id.uuid = host->aclk.claim_id_of_parent; + uuid_unparse_lower(claim_id.uuid.uuid, claim_id.str); + } + + char buf[4096]; + snprintfz(buf, sizeof(buf), + PLUGINSD_KEYWORD_NODE_ID " '%s' '%s' '%s'\n", + claim_id.str, node_id_str, cloud_config_url_get()); + + send_to_plugin(buf, __atomic_load_n(&host->receiver->parser, __ATOMIC_RELAXED)); + } + spinlock_unlock(&host->receiver_lock); +} + +// the sender of the child receives node id, claim id and cloud url from the receiver of the parent +void rrdpush_sender_get_node_and_claim_id_from_parent(struct sender_state *s) { + char *claim_id_str = get_word(s->line.words, s->line.num_words, 1); + char *node_id_str = get_word(s->line.words, s->line.num_words, 2); + char *url = get_word(s->line.words, s->line.num_words, 3); + + bool claimed = is_agent_claimed(); + bool update_node_id = false; + + ND_UUID claim_id; + if (uuid_parse(claim_id_str ? claim_id_str : "", claim_id.uuid) != 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s] received invalid claim id '%s'", + rrdhost_hostname(s->host), s->connected_to, + claim_id_str ? claim_id_str : "(unset)"); + return; + } + + ND_UUID node_id; + if(uuid_parse(node_id_str ? node_id_str : "", node_id.uuid) != 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s] received an invalid node id '%s'", + rrdhost_hostname(s->host), s->connected_to, + node_id_str ? node_id_str : "(unset)"); + return; + } + + if (!UUIDiszero(s->host->aclk.claim_id_of_parent) && !UUIDeq(s->host->aclk.claim_id_of_parent, claim_id)) + nd_log(NDLS_DAEMON, NDLP_INFO, + "STREAM %s [send to %s] changed parent's claim id to %s", + rrdhost_hostname(s->host), s->connected_to, + claim_id_str ? claim_id_str : "(unset)"); + + if(!UUIDiszero(s->host->node_id) && !UUIDeq(s->host->node_id, node_id)) { + if(claimed) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s] parent reports different node id '%s', but we are claimed. Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + node_id_str ? node_id_str : "(unset)"); + return; + } + else { + update_node_id = true; + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s] changed node id to %s", + rrdhost_hostname(s->host), s->connected_to, + node_id_str ? node_id_str : "(unset)"); + } + } + + if(!url || !*url) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s] received an invalid cloud URL '%s'", + rrdhost_hostname(s->host), s->connected_to, + url ? url : "(unset)"); + return; + } + + s->host->aclk.claim_id_of_parent = claim_id; + + // There are some very strange corner cases here: + // + // - Agent is claimed but offline, and it receives node_id and cloud_url from a different Netdata Cloud. + // - Agent is configured to talk to an on-prem Netdata Cloud, it is offline, but the parent is connected + // to a different Netdata Cloud. + // + // The solution below, tries to get the agent online, using the latest information. + // So, if the agent is not claimed or not connected, we inherit whatever information sent from the parent, + // to allow the user to work with it. + + if(claimed && aclk_online()) + // we are directly claimed and connected, ignore node id and cloud url + return; + + bool node_id_updated = false; + if(UUIDiszero(s->host->node_id) || update_node_id) { + s->host->node_id = node_id; + node_id_updated = true; + } + + // we change the URL, to allow the agent dashboard to work with Netdata Cloud on-prem, if any. + cloud_config_url_set(url); + + // send it down the line (to children) + rrdpush_receiver_send_node_and_claim_id_to_child(s->host); + + if(node_id_updated) + stream_path_node_id_updated(s->host); +} diff --git a/src/streaming/protocol/commands.c b/src/streaming/protocol/commands.c new file mode 100644 index 000000000..e9e16bdac --- /dev/null +++ b/src/streaming/protocol/commands.c @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" + +RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { + RRDHOST *host = st->rrdhost; + + // fetch the flags we need to check with one atomic operation + RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST); + + // check if we are not connected + if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { + + if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED)))) + rrdpush_sender_thread_spawn(host); + + if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); + nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + } + + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; + } + else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { + nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); + } + + if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { + BUFFER *wb = sender_start(host->sender); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); + } + + bool exposed_upstream = rrdset_check_upstream_exposed(st); + RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st); + bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + + if(unlikely((exposed_upstream && replication_in_progress) || + !should_send_chart_matching(st, rrdset_flags))) + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; + + if(unlikely(!exposed_upstream)) { + BUFFER *wb = sender_start(host->sender); + replication_in_progress = rrdpush_send_chart_definition(wb, st); + } + + if(replication_in_progress) + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; + + return (RRDSET_STREAM_BUFFER) { + .capabilities = host->sender->capabilities, + .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED), + .rrdset_flags = rrdset_flags, + .wb = sender_start(host->sender), + .wall_clock_time = wall_clock_time, + }; +} diff --git a/src/streaming/protocol/commands.h b/src/streaming/protocol/commands.h new file mode 100644 index 000000000..81344175c --- /dev/null +++ b/src/streaming/protocol/commands.h @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_STREAMING_PROTCOL_COMMANDS_H +#define NETDATA_STREAMING_PROTCOL_COMMANDS_H + +#include "database/rrd.h" +#include "../rrdpush.h" + +typedef struct rrdset_stream_buffer { + STREAM_CAPABILITIES capabilities; + bool v2; + bool begin_v2_added; + time_t wall_clock_time; + RRDSET_FLAGS rrdset_flags; + time_t last_point_end_time_s; + BUFFER *wb; +} RRDSET_STREAM_BUFFER; + +RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time); + +void rrdpush_sender_get_node_and_claim_id_from_parent(struct sender_state *s); +void rrdpush_receiver_send_node_and_claim_id_to_child(RRDHOST *host); +void rrdpush_sender_clear_parent_claim_id(RRDHOST *host); + +void rrdpush_sender_send_claimed_id(RRDHOST *host); + +void rrdpush_send_global_functions(RRDHOST *host); +void rrdpush_send_host_labels(RRDHOST *host); + +void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host); +void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); + +bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st); +bool rrdset_push_chart_definition_now(RRDSET *st); +bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags); + +void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st); +void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags); +void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st); + +#endif //NETDATA_STREAMING_PROTCOL_COMMANDS_H |