summaryrefslogtreecommitdiffstats
path: root/src/streaming/protocol/commands.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/streaming/protocol/commands.c')
-rw-r--r--src/streaming/protocol/commands.c58
1 files changed, 58 insertions, 0 deletions
diff --git a/src/streaming/protocol/commands.c b/src/streaming/protocol/commands.c
new file mode 100644
index 000000000..e9e16bdac
--- /dev/null
+++ b/src/streaming/protocol/commands.c
@@ -0,0 +1,58 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "commands.h"
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
+ RRDHOST *host = st->rrdhost;
+
+ // fetch the flags we need to check with one atomic operation
+ RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
+
+ // check if we are not connected
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
+
+ if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
+ rrdpush_sender_thread_spawn(host);
+
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ }
+
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+ }
+ else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ }
+
+ if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
+ }
+
+ bool exposed_upstream = rrdset_check_upstream_exposed(st);
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st);
+ bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ if(unlikely((exposed_upstream && replication_in_progress) ||
+ !should_send_chart_matching(st, rrdset_flags)))
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+
+ if(unlikely(!exposed_upstream)) {
+ BUFFER *wb = sender_start(host->sender);
+ replication_in_progress = rrdpush_send_chart_definition(wb, st);
+ }
+
+ if(replication_in_progress)
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
+
+ return (RRDSET_STREAM_BUFFER) {
+ .capabilities = host->sender->capabilities,
+ .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
+ .rrdset_flags = rrdset_flags,
+ .wb = sender_start(host->sender),
+ .wall_clock_time = wall_clock_time,
+ };
+}