diff options
Diffstat (limited to 'src/streaming/protocol/command-begin-set-end.c')
-rw-r--r-- | src/streaming/protocol/command-begin-set-end.c | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/src/streaming/protocol/command-begin-set-end.c b/src/streaming/protocol/command-begin-set-end.c new file mode 100644 index 000000000..17daef776 --- /dev/null +++ b/src/streaming/protocol/command-begin-set-end.c @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "commands.h" +#include "plugins.d/pluginsd_internals.h" + +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { + buffer_fast_strcat(wb, "BEGIN \"", 7); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "\" ", 2); + + if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s) + buffer_print_uint64(wb, st->usec_since_last_update); + else + buffer_fast_strcat(wb, "0", 1); + + buffer_fast_strcat(wb, "\n", 1); + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(unlikely(!rrddim_check_updated(rd))) + continue; + + if(likely(rrddim_check_upstream_exposed_collector(rd))) { + buffer_fast_strcat(wb, "SET \"", 5); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "\" = ", 4); + buffer_print_int64(wb, rd->collector.collected_value); + buffer_fast_strcat(wb, "\n", 1); + } + else { + internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); + // we will include it in the next iteration + rrddim_metadata_updated(rd); + } + } + rrddim_foreach_done(rd); + + if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdvar_print_to_streaming_custom_chart_variables(st, wb); + + buffer_fast_strcat(wb, "END\n", 4); +} + +void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + RRDHOST *host = st->rrdhost; + rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags); +} + +void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) { + if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) + return; + + bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + BUFFER *wb = rsb->wb; + time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC); + if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) { + + if(unlikely(rsb->begin_v2_added)) + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s); + buffer_fast_strcat(wb, " ", 1); + if(point_end_time_s == rsb->wall_clock_time) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); + + rsb->last_point_end_time_s = point_end_time_s; + rsb->begin_v2_added = true; + } + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); + buffer_fast_strcat(wb, " ", 1); + + if((NETDATA_DOUBLE)rd->collector.last_collected_value == n) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_netdata_double_encoded(wb, doubles_encoding, n); + + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, flags, true); + buffer_fast_strcat(wb, "\n", 1); +} + +void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { + if(!rsb->wb) + return; + + if(rsb->v2 && rsb->begin_v2_added) { + if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb); + + buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + } + + sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA); + + *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; +} + |