diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/rrdpush.c | 142 |
1 files changed, 118 insertions, 24 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 256fa8282..62b537f0c 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" -#include "parser/parser.h" /* * rrdpush @@ -69,6 +68,23 @@ static void load_stream_conf() { freez(filename); } +STREAM_CAPABILITIES stream_our_capabilities() { + return STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_HAS_COMPRESSION | + (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | + 0; +} + bool rrdpush_receiver_needs_dbengine() { struct section *co; @@ -174,8 +190,8 @@ static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { else rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); } - else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || - simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) + else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) || + simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name)) rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); else @@ -305,9 +321,11 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { (unsigned long long)db_last_time_t, (unsigned long long)now); - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + } replication_progress = true; #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -327,7 +345,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, "\" ", 2); if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) - buffer_print_llu(wb, st->usec_since_last_update); + buffer_print_uint64(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); @@ -342,7 +360,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); - buffer_print_ll(wb, rd->collected_value); + buffer_print_int64(wb, rd->collected_value); buffer_fast_strcat(wb, "\n", 1); } else { @@ -378,7 +396,74 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { return true; } -void rrdset_done_push(RRDSET *st) { +void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + RRDHOST *host = st->rrdhost; + rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags); +} + +void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) { + if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) + return; + + NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + BUFFER *wb = rsb->wb; + time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC); + if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) { + + if(unlikely(rsb->begin_v2_added)) + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s); + buffer_fast_strcat(wb, " ", 1); + if(point_end_time_s == rsb->wall_clock_time) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); + + rsb->last_point_end_time_s = point_end_time_s; + rsb->begin_v2_added = true; + } + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value); + buffer_fast_strcat(wb, " ", 1); + + if((NETDATA_DOUBLE)rd->last_collected_value == n) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_netdata_double_encoded(wb, doubles_encoding, n); + + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, flags, true); + buffer_fast_strcat(wb, "\n", 1); +} + +void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + if(!rsb->wb) + return; + + if(rsb->v2 && rsb->begin_v2_added) { + if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb); + + buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + } + + sender_commit(st->rrdhost->sender, rsb->wb); + + *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; +} + +RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; // fetch the flags we need to check with one atomic operation @@ -395,7 +480,7 @@ void rrdset_done_push(RRDSET *st) { error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); } - return; + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; } else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); @@ -408,17 +493,24 @@ void rrdset_done_push(RRDSET *st) { if(unlikely((exposed_upstream && replication_in_progress) || !should_send_chart_matching(st, rrdset_flags))) - return; - - BUFFER *wb = sender_start(host->sender); + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; - if(unlikely(!exposed_upstream)) + if(unlikely(!exposed_upstream)) { + BUFFER *wb = sender_start(host->sender); replication_in_progress = rrdpush_send_chart_definition(wb, st); + sender_commit(host->sender, wb); + } - if (likely(!replication_in_progress)) - rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags); + if(replication_in_progress) + return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; - sender_commit(host->sender, wb); + return (RRDSET_STREAM_BUFFER) { + .capabilities = host->sender->capabilities, + .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED), + .rrdset_flags = rrdset_flags, + .wb = sender_start(host->sender), + .wall_clock_time = wall_clock_time, + }; } // labels @@ -633,7 +725,7 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) { } void *rrdpush_receiver_thread(void *ptr); -int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) { if(!service_running(ABILITY_STREAMING_CONNECTIONS)) return rrdpush_receiver_too_busy_now(w); @@ -665,11 +757,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { // parse the parameters and fill rpt and rpt->system_info - while(url) { - char *value = mystrsep(&url, "&"); + while(decoded_query_string) { + char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&"); if(!value || !*value) continue; - char *name = mystrsep(&value, "="); + char *name = strsep_skip_consecutive_separators(&value, "="); if(!name || !*name) continue; if(!value || !*value) continue; @@ -851,7 +943,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { { SIMPLE_PATTERN *key_allow_from = simple_pattern_create( appconfig_get(&stream_config, rpt->key, "allow from", "*"), - NULL, SIMPLE_PATTERN_EXACT); + NULL, SIMPLE_PATTERN_EXACT, true); if(key_allow_from) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { @@ -898,7 +990,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { { SIMPLE_PATTERN *machine_allow_from = simple_pattern_create( appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"), - NULL, SIMPLE_PATTERN_EXACT); + NULL, SIMPLE_PATTERN_EXACT, true); if(machine_allow_from) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { @@ -1077,6 +1169,8 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS "); if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION "); if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY "); + if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED "); + if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 "); } void log_receiver_capabilities(struct receiver_state *rpt) { @@ -1118,7 +1212,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { if(caps & STREAM_CAP_V2) caps &= ~(STREAM_CAP_V1); - return caps & STREAM_OUR_CAPABILITIES; + return caps & stream_our_capabilities(); } int32_t stream_capabilities_to_vn(uint32_t caps) { |