diff options
Diffstat (limited to 'src/streaming/stream-capabilities.c')
-rw-r--r-- | src/streaming/stream-capabilities.c | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/src/streaming/stream-capabilities.c b/src/streaming/stream-capabilities.c new file mode 100644 index 000000000..b089e8f9d --- /dev/null +++ b/src/streaming/stream-capabilities.c @@ -0,0 +1,169 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdpush.h" + +static STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE; + +static struct { + STREAM_CAPABILITIES cap; + const char *str; +} capability_names[] = { + {STREAM_CAP_V1, "V1" }, + {STREAM_CAP_V2, "V2" }, + {STREAM_CAP_VN, "VN" }, + {STREAM_CAP_VCAPS, "VCAPS" }, + {STREAM_CAP_HLABELS, "HLABELS" }, + {STREAM_CAP_CLAIM, "CLAIM" }, + {STREAM_CAP_CLABELS, "CLABELS" }, + {STREAM_CAP_LZ4, "LZ4" }, + {STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + {STREAM_CAP_REPLICATION, "REPLICATION" }, + {STREAM_CAP_BINARY, "BINARY" }, + {STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + {STREAM_CAP_IEEE754, "IEEE754" }, + {STREAM_CAP_DATA_WITH_ML, "ML" }, + {STREAM_CAP_DYNCFG, "DYNCFG" }, + {STREAM_CAP_SLOTS, "SLOTS" }, + {STREAM_CAP_ZSTD, "ZSTD" }, + {STREAM_CAP_GZIP, "GZIP" }, + {STREAM_CAP_BROTLI, "BROTLI" }, + {STREAM_CAP_PROGRESS, "PROGRESS" }, + {STREAM_CAP_NODE_ID, "NODEID" }, + {STREAM_CAP_PATHS, "PATHS" }, + {0 , NULL }, +}; + +STREAM_CAPABILITIES stream_capabilities_parse_one(const char *str) { + if (!str || !*str) + return STREAM_CAP_NONE; + + for (size_t i = 0; capability_names[i].str; i++) { + if (strcmp(capability_names[i].str, str) == 0) + return capability_names[i].cap; + } + + return STREAM_CAP_NONE; +} + +void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) { + buffer_strcat(wb, capability_names[i].str); + buffer_strcat(wb, " "); + } + } +} + +void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) { + if(key) + buffer_json_member_add_array(wb, key); + else + buffer_json_add_array_item_array(wb); + + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) + buffer_json_add_array_item_string(wb, capability_names[i].str); + } + + buffer_json_array_close(wb); +} + +void log_receiver_capabilities(struct receiver_state *rpt) { + BUFFER *wb = buffer_create(100, NULL); + stream_capabilities_to_string(wb, rpt->capabilities); + + nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); + + buffer_free(wb); +} + +void log_sender_capabilities(struct sender_state *s) { + BUFFER *wb = buffer_create(100, NULL); + stream_capabilities_to_string(wb, s->capabilities); + + nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s", + rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); + + buffer_free(wb); +} + +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + spinlock_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + disabled_capabilities |= STREAM_CAP_DATA_WITH_ML; + + spinlock_unlock(&host->receiver_lock); + + if(host->sender) + disabled_capabilities |= host->sender->disabled_capabilities; + } + + return (STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_CAP_SLOTS | + STREAM_CAP_PROGRESS | + STREAM_CAP_COMPRESSIONS_AVAILABLE | + STREAM_CAP_DYNCFG | + STREAM_CAP_NODE_ID | + STREAM_CAP_PATHS | + STREAM_CAP_IEEE754 | + STREAM_CAP_DATA_WITH_ML | + 0) & ~disabled_capabilities; +} + +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { + STREAM_CAPABILITIES caps = 0; + + if(version <= 1) caps = STREAM_CAP_V1; + else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS; + else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM; + else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS; + else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE; + else caps = version; + + if(caps & STREAM_CAP_VCAPS) + caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN); + + if(caps & STREAM_CAP_VN) + caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2); + + if(caps & STREAM_CAP_V2) + caps &= ~(STREAM_CAP_V1); + + STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender); + + if(!(common_caps & STREAM_CAP_INTERPOLATED)) + // DATA WITH ML requires INTERPOLATED + common_caps &= ~STREAM_CAP_DATA_WITH_ML; + + return common_caps; +} + +int32_t stream_capabilities_to_vn(uint32_t caps) { + if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4; + if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS; + return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM) +} + +void check_local_streaming_capabilities(void) { + ieee754_doubles = is_system_ieee754_double(); + if(!ieee754_doubles) + globally_disabled_capabilities |= STREAM_CAP_IEEE754; +} |