summaryrefslogtreecommitdiffstats
path: root/src/streaming/stream-capabilities.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/streaming/stream-capabilities.c')
-rw-r--r--src/streaming/stream-capabilities.c169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/streaming/stream-capabilities.c b/src/streaming/stream-capabilities.c
new file mode 100644
index 000000000..b089e8f9d
--- /dev/null
+++ b/src/streaming/stream-capabilities.c
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+static STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE;
+
+static struct {
+ STREAM_CAPABILITIES cap;
+ const char *str;
+} capability_names[] = {
+ {STREAM_CAP_V1, "V1" },
+ {STREAM_CAP_V2, "V2" },
+ {STREAM_CAP_VN, "VN" },
+ {STREAM_CAP_VCAPS, "VCAPS" },
+ {STREAM_CAP_HLABELS, "HLABELS" },
+ {STREAM_CAP_CLAIM, "CLAIM" },
+ {STREAM_CAP_CLABELS, "CLABELS" },
+ {STREAM_CAP_LZ4, "LZ4" },
+ {STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
+ {STREAM_CAP_REPLICATION, "REPLICATION" },
+ {STREAM_CAP_BINARY, "BINARY" },
+ {STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
+ {STREAM_CAP_IEEE754, "IEEE754" },
+ {STREAM_CAP_DATA_WITH_ML, "ML" },
+ {STREAM_CAP_DYNCFG, "DYNCFG" },
+ {STREAM_CAP_SLOTS, "SLOTS" },
+ {STREAM_CAP_ZSTD, "ZSTD" },
+ {STREAM_CAP_GZIP, "GZIP" },
+ {STREAM_CAP_BROTLI, "BROTLI" },
+ {STREAM_CAP_PROGRESS, "PROGRESS" },
+ {STREAM_CAP_NODE_ID, "NODEID" },
+ {STREAM_CAP_PATHS, "PATHS" },
+ {0 , NULL },
+};
+
+STREAM_CAPABILITIES stream_capabilities_parse_one(const char *str) {
+ if (!str || !*str)
+ return STREAM_CAP_NONE;
+
+ for (size_t i = 0; capability_names[i].str; i++) {
+ if (strcmp(capability_names[i].str, str) == 0)
+ return capability_names[i].cap;
+ }
+
+ return STREAM_CAP_NONE;
+}
+
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap) {
+ buffer_strcat(wb, capability_names[i].str);
+ buffer_strcat(wb, " ");
+ }
+ }
+}
+
+void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
+ if(key)
+ buffer_json_member_add_array(wb, key);
+ else
+ buffer_json_add_array_item_array(wb);
+
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap)
+ buffer_json_add_array_item_string(wb, capability_names[i].str);
+ }
+
+ buffer_json_array_close(wb);
+}
+
+void log_receiver_capabilities(struct receiver_state *rpt) {
+ BUFFER *wb = buffer_create(100, NULL);
+ stream_capabilities_to_string(wb, rpt->capabilities);
+
+ nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+void log_sender_capabilities(struct sender_state *s) {
+ BUFFER *wb = buffer_create(100, NULL);
+ stream_capabilities_to_string(wb, s->capabilities);
+
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+ STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ spinlock_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ disabled_capabilities |= STREAM_CAP_DATA_WITH_ML;
+
+ spinlock_unlock(&host->receiver_lock);
+
+ if(host->sender)
+ disabled_capabilities |= host->sender->disabled_capabilities;
+ }
+
+ return (STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_CAP_SLOTS |
+ STREAM_CAP_PROGRESS |
+ STREAM_CAP_COMPRESSIONS_AVAILABLE |
+ STREAM_CAP_DYNCFG |
+ STREAM_CAP_NODE_ID |
+ STREAM_CAP_PATHS |
+ STREAM_CAP_IEEE754 |
+ STREAM_CAP_DATA_WITH_ML |
+ 0) & ~disabled_capabilities;
+}
+
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
+ STREAM_CAPABILITIES caps = 0;
+
+ if(version <= 1) caps = STREAM_CAP_V1;
+ else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
+ else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
+ else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
+ else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE;
+ else caps = version;
+
+ if(caps & STREAM_CAP_VCAPS)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN);
+
+ if(caps & STREAM_CAP_VN)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2);
+
+ if(caps & STREAM_CAP_V2)
+ caps &= ~(STREAM_CAP_V1);
+
+ STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender);
+
+ if(!(common_caps & STREAM_CAP_INTERPOLATED))
+ // DATA WITH ML requires INTERPOLATED
+ common_caps &= ~STREAM_CAP_DATA_WITH_ML;
+
+ return common_caps;
+}
+
+int32_t stream_capabilities_to_vn(uint32_t caps) {
+ if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4;
+ if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
+ return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
+}
+
+void check_local_streaming_capabilities(void) {
+ ieee754_doubles = is_system_ieee754_double();
+ if(!ieee754_doubles)
+ globally_disabled_capabilities |= STREAM_CAP_IEEE754;
+}