// SPDX-License-Identifier: GPL-3.0-or-later #include "commands.h" #include "plugins.d/pluginsd_internals.h" static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s) buffer_print_uint64(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); buffer_fast_strcat(wb, "\n", 1); RRDDIM *rd; rrddim_foreach_read(rd, st) { if(unlikely(!rrddim_check_updated(rd))) continue; if(likely(rrddim_check_upstream_exposed_collector(rd))) { buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); buffer_print_int64(wb, rd->collector.collected_value); buffer_fast_strcat(wb, "\n", 1); } else { internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration rrddim_metadata_updated(rd); } } rrddim_foreach_done(rd); if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) rrdvar_print_to_streaming_custom_chart_variables(st, wb); buffer_fast_strcat(wb, "END\n", 4); } void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { RRDHOST *host = st->rrdhost; rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags); } void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) { if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) return; bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; BUFFER *wb = rsb->wb; time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC); if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) { if(unlikely(rsb->begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot); } buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s); buffer_fast_strcat(wb, " ", 1); if(point_end_time_s == rsb->wall_clock_time) buffer_fast_strcat(wb, "#", 1); else buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time); buffer_fast_strcat(wb, "\n", 1); rsb->last_point_end_time_s = point_end_time_s; rsb->begin_v2_added = true; } buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); } buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); if((NETDATA_DOUBLE)rd->collector.last_collected_value == n) buffer_fast_strcat(wb, "#", 1); else buffer_print_netdata_double_encoded(wb, doubles_encoding, n); buffer_fast_strcat(wb, " ", 1); buffer_print_sn_flags(wb, flags, true); buffer_fast_strcat(wb, "\n", 1); } void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { if(!rsb->wb) return; if(rsb->v2 && rsb->begin_v2_added) { if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb); buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); } sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA); *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; }