summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c142
1 files changed, 118 insertions, 24 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 256fa8282..62b537f0c 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
/*
* rrdpush
@@ -69,6 +68,23 @@ static void load_stream_conf() {
freez(filename);
}
+STREAM_CAPABILITIES stream_our_capabilities() {
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
+ (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ 0;
+}
+
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@@ -174,8 +190,8 @@ static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
}
- else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+ else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) ||
+ simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name))
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
else
@@ -305,9 +321,11 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
(unsigned long long)db_last_time_t,
(unsigned long long)now);
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ }
replication_progress = true;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -327,7 +345,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "\" ", 2);
if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
- buffer_print_llu(wb, st->usec_since_last_update);
+ buffer_print_uint64(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -342,7 +360,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_ll(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -378,7 +396,74 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
return true;
}
-void rrdset_done_push(RRDSET *st) {
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+ rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
+}
+
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
+ return;
+
+ NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ BUFFER *wb = rsb->wb;
+ time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
+ if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
+
+ if(unlikely(rsb->begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
+ buffer_fast_strcat(wb, " ", 1);
+ if(point_end_time_s == rsb->wall_clock_time)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
+
+ rsb->last_point_end_time_s = point_end_time_s;
+ rsb->begin_v2_added = true;
+ }
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+
+ if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
+
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+}
+
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ if(!rsb->wb)
+ return;
+
+ if(rsb->v2 && rsb->begin_v2_added) {
+ if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
+
+ buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+ }
+
+ sender_commit(st->rrdhost->sender, rsb->wb);
+
+ *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
+}
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
// fetch the flags we need to check with one atomic operation
@@ -395,7 +480,7 @@ void rrdset_done_push(RRDSET *st) {
error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
- return;
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
@@ -408,17 +493,24 @@ void rrdset_done_push(RRDSET *st) {
if(unlikely((exposed_upstream && replication_in_progress) ||
!should_send_chart_matching(st, rrdset_flags)))
- return;
-
- BUFFER *wb = sender_start(host->sender);
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- if(unlikely(!exposed_upstream))
+ if(unlikely(!exposed_upstream)) {
+ BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
+ }
- if (likely(!replication_in_progress))
- rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
+ if(replication_in_progress)
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- sender_commit(host->sender, wb);
+ return (RRDSET_STREAM_BUFFER) {
+ .capabilities = host->sender->capabilities,
+ .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
+ .rrdset_flags = rrdset_flags,
+ .wb = sender_start(host->sender),
+ .wall_clock_time = wall_clock_time,
+ };
}
// labels
@@ -633,7 +725,7 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
}
void *rrdpush_receiver_thread(void *ptr);
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
@@ -665,11 +757,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
// parse the parameters and fill rpt and rpt->system_info
- while(url) {
- char *value = mystrsep(&url, "&");
+ while(decoded_query_string) {
+ char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
if(!value || !*value) continue;
- char *name = mystrsep(&value, "=");
+ char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@@ -851,7 +943,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->key, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
@@ -898,7 +990,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
@@ -1077,6 +1169,8 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
+ if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
+ if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
}
void log_receiver_capabilities(struct receiver_state *rpt) {
@@ -1118,7 +1212,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & STREAM_OUR_CAPABILITIES;
+ return caps & stream_our_capabilities();
}
int32_t stream_capabilities_to_vn(uint32_t caps) {