diff options
Diffstat (limited to 'src/streaming/protocol/command-chart-definition.c')
-rw-r--r-- | src/streaming/protocol/command-chart-definition.c | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/src/streaming/protocol/command-chart-definition.c b/src/streaming/protocol/command-chart-definition.c new file mode 100644 index 000000000..864d13242 --- /dev/null +++ b/src/streaming/protocol/command-chart-definition.c @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +// chart labels +static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL " \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL)); + return 1; +} + +static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { + if (st->rrdlabels) { + if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0) + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL_COMMIT "\n"); + } +} + +// Send the current chart definition. +// Assumes that collector thread has already called sender_start for mutex / buffer state. +bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { + uint32_t version = rrdset_metadata_version(st); + + RRDHOST *host = st->rrdhost; + NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false; + + bool replication_progress = false; + + // properly set the name for the remote end to parse it + char *name = ""; + if(likely(st->name)) { + if(unlikely(st->id != st->name)) { + // they differ + name = strchr(rrdset_name(st), '.'); + if(name) + name++; + else + name = ""; + } + } + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + + // send the chart + buffer_sprintf( + wb + , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\" \"%s\" \"%s\"\n" + , rrdset_id(st) + , name + , rrdset_title(st) + , rrdset_units(st) + , rrdset_family(st) + , rrdset_context(st) + , rrdset_type_name(st->chart_type) + , st->priority + , st->update_every + , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":"" + , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" + , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":"" + , rrdset_plugin_name(st) + , rrdset_module_name(st) + ); + + // send the chart labels + if (stream_has_capability(host->sender, STREAM_CAP_CLABELS)) + rrdpush_send_clabels(wb, st); + + // send the dimensions + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_sprintf( + wb + , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" + , rrddim_id(rd) + , rrddim_name(rd) + , rrd_algorithm_name(rd->algorithm) + , rd->multiplier + , rd->divisor + , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" + , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" + , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" + ); + } + rrddim_foreach_done(rd); + + // send the chart functions + if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + rrd_chart_functions_expose_rrdpush(st, wb); + + // send the chart local custom variables + rrdvar_print_to_streaming_custom_chart_variables(st, wb); + + if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { + time_t db_first_time_t, db_last_time_t; + + time_t now = now_realtime_sec(); + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n", + (unsigned long long)db_first_time_t, + (unsigned long long)db_last_time_t, + (unsigned long long)now); + + if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + } + replication_progress = true; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + // we can set the exposed flag, after we commit the buffer + // because replication may pick it up prematurely + rrddim_foreach_read(rd, st) { + rrddim_metadata_exposed_upstream(rd, version); + } + rrddim_foreach_done(rd); + rrdset_metadata_exposed_upstream(st, version); + + st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (stream_conf_initial_clock_resync_iterations * st->update_every); + return replication_progress; +} + +bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { + if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) + return false; + + if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) { + RRDHOST *host = st->rrdhost; + + if (flags & RRDSET_FLAG_ANOMALY_DETECTION) { + if(ml_streaming_enabled()) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + else { + int negative = 0, positive = 0; + SIMPLE_PATTERN_RESULT r; + + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->context, NULL, 0); + if(r == SP_MATCHED_POSITIVE) positive++; + else if(r == SP_MATCHED_NEGATIVE) negative++; + + if(!negative) { + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->name, NULL, 0); + if (r == SP_MATCHED_POSITIVE) positive++; + else if (r == SP_MATCHED_NEGATIVE) negative++; + } + + if(!negative) { + r = simple_pattern_matches_string_extract(host->rrdpush.send.charts_matching, st->id, NULL, 0); + if (r == SP_MATCHED_POSITIVE) positive++; + else if (r == SP_MATCHED_NEGATIVE) negative++; + } + + if(!negative && positive) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + + // get the flags again, to know how to respond + flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); + } + + return flags & RRDSET_FLAG_UPSTREAM_SEND; +} + +// Called from the internal collectors to mark a chart obsolete. +bool rrdset_push_chart_definition_now(RRDSET *st) { + RRDHOST *host = st->rrdhost; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !should_send_chart_matching(st, rrdset_flag_get(st)))) { + return false; + } + + BUFFER *wb = sender_start(host->sender); + rrdpush_send_chart_definition(wb, st); + sender_thread_buffer_free(); + + return true; +} + |