summaryrefslogtreecommitdiffstats
path: root/src/streaming/stream-path.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/streaming/stream-path.c')
-rw-r--r--src/streaming/stream-path.c353
1 files changed, 353 insertions, 0 deletions
diff --git a/src/streaming/stream-path.c b/src/streaming/stream-path.c
new file mode 100644
index 000000000..7aad9a0bf
--- /dev/null
+++ b/src/streaming/stream-path.c
@@ -0,0 +1,353 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "stream-path.h"
+#include "rrdpush.h"
+#include "plugins.d/pluginsd_internals.h"
+
+ENUM_STR_MAP_DEFINE(STREAM_PATH_FLAGS) = {
+ { .id = STREAM_PATH_FLAG_ACLK, .name = "aclk" },
+
+ // terminator
+ { . id = 0, .name = NULL }
+};
+
+BITMAP_STR_DEFINE_FUNCTIONS(STREAM_PATH_FLAGS, STREAM_PATH_FLAG_NONE, "");
+
+static void stream_path_clear(STREAM_PATH *p) {
+ string_freez(p->hostname);
+ p->hostname = NULL;
+ p->host_id = UUID_ZERO;
+ p->node_id = UUID_ZERO;
+ p->claim_id = UUID_ZERO;
+ p->hops = 0;
+ p->since = 0;
+ p->first_time_t = 0;
+ p->capabilities = 0;
+ p->flags = STREAM_PATH_FLAG_NONE;
+ p->start_time = 0;
+ p->shutdown_time = 0;
+}
+
+static void rrdhost_stream_path_clear_unsafe(RRDHOST *host, bool destroy) {
+ for(size_t i = 0; i < host->rrdpush.path.used ; i++)
+ stream_path_clear(&host->rrdpush.path.array[i]);
+
+ host->rrdpush.path.used = 0;
+
+ if(destroy) {
+ freez(host->rrdpush.path.array);
+ host->rrdpush.path.array = NULL;
+ host->rrdpush.path.size = 0;
+ }
+}
+
+void rrdhost_stream_path_clear(RRDHOST *host, bool destroy) {
+ spinlock_lock(&host->rrdpush.path.spinlock);
+ rrdhost_stream_path_clear_unsafe(host, destroy);
+ spinlock_unlock(&host->rrdpush.path.spinlock);
+}
+
+static void stream_path_to_json_object(BUFFER *wb, STREAM_PATH *p) {
+ buffer_json_add_array_item_object(wb);
+ buffer_json_member_add_string(wb, "hostname", string2str(p->hostname));
+ buffer_json_member_add_uuid(wb, "host_id", p->host_id.uuid);
+ buffer_json_member_add_uuid(wb, "node_id", p->node_id.uuid);
+ buffer_json_member_add_uuid(wb, "claim_id", p->claim_id.uuid);
+ buffer_json_member_add_int64(wb, "hops", p->hops);
+ buffer_json_member_add_uint64(wb, "since", p->since);
+ buffer_json_member_add_uint64(wb, "first_time_t", p->first_time_t);
+ buffer_json_member_add_uint64(wb, "start_time", p->start_time);
+ buffer_json_member_add_uint64(wb, "shutdown_time", p->shutdown_time);
+ stream_capabilities_to_json_array(wb, p->capabilities, "capabilities");
+ STREAM_PATH_FLAGS_2json(wb, "flags", p->flags);
+ buffer_json_object_close(wb);
+}
+
+static STREAM_PATH rrdhost_stream_path_self(RRDHOST *host) {
+ STREAM_PATH p = { 0 };
+
+ bool is_localhost = host == localhost || rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST);
+
+ p.hostname = string_dup(localhost->hostname);
+ p.host_id = localhost->host_id;
+ p.node_id = localhost->node_id;
+ p.claim_id = claim_id_get_uuid();
+ p.start_time = get_agent_event_time_median(EVENT_AGENT_START_TIME) / USEC_PER_MS;
+ p.shutdown_time = get_agent_event_time_median(EVENT_AGENT_SHUTDOWN_TIME) / USEC_PER_MS;
+
+ p.flags = STREAM_PATH_FLAG_NONE;
+ if(!UUIDiszero(p.claim_id))
+ p.flags |= STREAM_PATH_FLAG_ACLK;
+
+ bool has_receiver = false;
+ spinlock_lock(&host->receiver_lock);
+ if(host->receiver) {
+ has_receiver = true;
+ p.hops = (int16_t)host->receiver->hops;
+ p.since = host->receiver->connected_since_s;
+ }
+ spinlock_unlock(&host->receiver_lock);
+
+ if(!has_receiver) {
+ p.hops = (is_localhost) ? 0 : -1; // -1 for stale nodes
+ p.since = netdata_start_time;
+ }
+
+ // the following may get the receiver lock again!
+ p.capabilities = stream_our_capabilities(host, true);
+
+ rrdhost_retention(host, 0, false, &p.first_time_t, NULL);
+
+ return p;
+}
+
+STREAM_PATH rrdhost_stream_path_fetch(RRDHOST *host) {
+ STREAM_PATH p = { 0 };
+
+ spinlock_lock(&host->rrdpush.path.spinlock);
+ for (size_t i = 0; i < host->rrdpush.path.used; i++) {
+ STREAM_PATH *tmp_path = &host->rrdpush.path.array[i];
+ if(UUIDeq(host->host_id, tmp_path->host_id)) {
+ p = *tmp_path;
+ break;
+ }
+ }
+ spinlock_unlock(&host->rrdpush.path.spinlock);
+ return p;
+}
+
+void rrdhost_stream_path_to_json(BUFFER *wb, struct rrdhost *host, const char *key, bool add_version) {
+ if(add_version)
+ buffer_json_member_add_uint64(wb, "version", 1);
+
+ spinlock_lock(&host->rrdpush.path.spinlock);
+ buffer_json_member_add_array(wb, key);
+ {
+ {
+ STREAM_PATH tmp = rrdhost_stream_path_self(host);
+
+ bool found_self = false;
+ for (size_t i = 0; i < host->rrdpush.path.used; i++) {
+ STREAM_PATH *p = &host->rrdpush.path.array[i];
+ if(UUIDeq(localhost->host_id, p->host_id)) {
+ // this is us, use the current data
+ p = &tmp;
+ found_self = true;
+ }
+ stream_path_to_json_object(wb, p);
+ }
+
+ if(!found_self) {
+ // we didn't find ourselves in the list.
+ // append us.
+ stream_path_to_json_object(wb, &tmp);
+ }
+
+ stream_path_clear(&tmp);
+ }
+ }
+ buffer_json_array_close(wb); // key
+ spinlock_unlock(&host->rrdpush.path.spinlock);
+}
+
+static BUFFER *stream_path_payload(RRDHOST *host) {
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
+ rrdhost_stream_path_to_json(wb, host, STREAM_PATH_JSON_MEMBER, true);
+ buffer_json_finalize(wb);
+ return wb;
+}
+
+void stream_path_send_to_parent(RRDHOST *host) {
+ struct sender_state *s = host->sender;
+ if(!s || !stream_has_capability(s, STREAM_CAP_PATHS)) return;
+
+ CLEAN_BUFFER *payload = stream_path_payload(host);
+
+ BUFFER *wb = sender_start(s);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_JSON " " PLUGINSD_KEYWORD_STREAM_PATH "\n%s\n" PLUGINSD_KEYWORD_JSON_END "\n", buffer_tostring(payload));
+ sender_commit(s, wb, STREAM_TRAFFIC_TYPE_METADATA);
+}
+
+void stream_path_send_to_child(RRDHOST *host) {
+ if(host == localhost)
+ return;
+
+ CLEAN_BUFFER *payload = stream_path_payload(host);
+
+ spinlock_lock(&host->receiver_lock);
+ if(host->receiver && stream_has_capability(host->receiver, STREAM_CAP_PATHS)) {
+
+ CLEAN_BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_JSON " " PLUGINSD_KEYWORD_STREAM_PATH "\n%s\n" PLUGINSD_KEYWORD_JSON_END "\n", buffer_tostring(payload));
+ send_to_plugin(buffer_tostring(wb), __atomic_load_n(&host->receiver->parser, __ATOMIC_RELAXED));
+ }
+ spinlock_unlock(&host->receiver_lock);
+}
+
+void stream_path_child_disconnected(RRDHOST *host) {
+ rrdhost_stream_path_clear(host, true);
+}
+
+void stream_path_parent_disconnected(RRDHOST *host) {
+ spinlock_lock(&host->rrdpush.path.spinlock);
+
+ size_t cleared = 0;
+ size_t used = host->rrdpush.path.used;
+ for (size_t i = 0; i < used; i++) {
+ STREAM_PATH *p = &host->rrdpush.path.array[i];
+ if(UUIDeq(localhost->host_id, p->host_id)) {
+ host->rrdpush.path.used = i + 1;
+
+ for(size_t j = i + 1; j < used ;j++) {
+ stream_path_clear(&host->rrdpush.path.array[j]);
+ cleared++;
+ }
+
+ break;
+ }
+ }
+
+ spinlock_unlock(&host->rrdpush.path.spinlock);
+
+ if(cleared)
+ stream_path_send_to_child(host);
+}
+
+void stream_path_retention_updated(RRDHOST *host) {
+ if(!host || !localhost) return;
+ stream_path_send_to_parent(host);
+ stream_path_send_to_child(host);
+}
+
+void stream_path_node_id_updated(RRDHOST *host) {
+ if(!host || !localhost) return;
+ stream_path_send_to_parent(host);
+ stream_path_send_to_child(host);
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+
+static bool parse_single_path(json_object *jobj, const char *path, STREAM_PATH *p, BUFFER *error) {
+ JSONC_PARSE_TXT2STRING_OR_ERROR_AND_RETURN(jobj, path, "hostname", p->hostname, error, true);
+ JSONC_PARSE_TXT2UUID_OR_ERROR_AND_RETURN(jobj, path, "host_id", p->host_id.uuid, error, true);
+ JSONC_PARSE_TXT2UUID_OR_ERROR_AND_RETURN(jobj, path, "node_id", p->node_id.uuid, error, true);
+ JSONC_PARSE_TXT2UUID_OR_ERROR_AND_RETURN(jobj, path, "claim_id", p->claim_id.uuid, error, true);
+ JSONC_PARSE_INT64_OR_ERROR_AND_RETURN(jobj, path, "hops", p->hops, error, true);
+ JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, "since", p->since, error, true);
+ JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, "first_time_t", p->first_time_t, error, true);
+ JSONC_PARSE_INT64_OR_ERROR_AND_RETURN(jobj, path, "start_time", p->start_time, error, true);
+ JSONC_PARSE_INT64_OR_ERROR_AND_RETURN(jobj, path, "shutdown_time", p->shutdown_time, error, true);
+ JSONC_PARSE_ARRAY_OF_TXT2BITMAP_OR_ERROR_AND_RETURN(jobj, path, "flags", STREAM_PATH_FLAGS_2id_one, p->flags, error, true);
+ JSONC_PARSE_ARRAY_OF_TXT2BITMAP_OR_ERROR_AND_RETURN(jobj, path, "capabilities", stream_capabilities_parse_one, p->capabilities, error, true);
+
+ if(!p->hostname) {
+ buffer_strcat(error, "hostname cannot be empty");
+ return false;
+ }
+
+ if(UUIDiszero(p->host_id)) {
+ buffer_strcat(error, "host_id cannot be zero");
+ return false;
+ }
+
+ if(p->hops < 0) {
+ buffer_strcat(error, "hops cannot be negative");
+ return false;
+ }
+
+ if(p->capabilities == STREAM_CAP_NONE) {
+ buffer_strcat(error, "capabilities cannot be empty");
+ return false;
+ }
+
+ if(p->since <= 0) {
+ buffer_strcat(error, "since cannot be <= 0");
+ return false;
+ }
+
+ return true;
+}
+
+static XXH128_hash_t stream_path_hash_unsafe(RRDHOST *host) {
+ if(!host->rrdpush.path.used)
+ return (XXH128_hash_t){ 0 };
+
+ return XXH3_128bits(host->rrdpush.path.array, sizeof(*host->rrdpush.path.array) * host->rrdpush.path.used);
+}
+
+static int compare_by_hops(const void *a, const void *b) {
+ const STREAM_PATH *path1 = a;
+ const STREAM_PATH *path2 = b;
+
+ if (path1->hops < path2->hops)
+ return -1;
+ else if (path1->hops > path2->hops)
+ return 1;
+
+ return 0;
+}
+
+bool stream_path_set_from_json(RRDHOST *host, const char *json, bool from_parent) {
+ if(!json || !*json)
+ return false;
+
+ CLEAN_JSON_OBJECT *jobj = json_tokener_parse(json);
+ if(!jobj) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM PATH: Cannot parse json: %s", json);
+ return false;
+ }
+
+ spinlock_lock(&host->rrdpush.path.spinlock);
+ XXH128_hash_t old_hash = stream_path_hash_unsafe(host);
+ rrdhost_stream_path_clear_unsafe(host, true);
+
+ CLEAN_BUFFER *error = buffer_create(0, NULL);
+
+ json_object *_jarray;
+ if (json_object_object_get_ex(jobj, STREAM_PATH_JSON_MEMBER, &_jarray) &&
+ json_object_is_type(_jarray, json_type_array)) {
+ size_t items = json_object_array_length(_jarray);
+ host->rrdpush.path.array = callocz(items, sizeof(*host->rrdpush.path.array));
+ host->rrdpush.path.size = items;
+
+ for (size_t i = 0; i < items; ++i) {
+ json_object *joption = json_object_array_get_idx(_jarray, i);
+ if (!json_object_is_type(joption, json_type_object)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM PATH: Array item No %zu is not an object: %s", i, json);
+ continue;
+ }
+
+ if(!parse_single_path(joption, "", &host->rrdpush.path.array[host->rrdpush.path.used], error)) {
+ stream_path_clear(&host->rrdpush.path.array[host->rrdpush.path.used]);
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM PATH: Array item No %zu cannot be parsed: %s: %s", i, buffer_tostring(error), json);
+ }
+ else
+ host->rrdpush.path.used++;
+ }
+ }
+
+ if(host->rrdpush.path.used > 1) {
+ // sorting is required in order to support stream_path_parent_disconnected()
+ qsort(host->rrdpush.path.array, host->rrdpush.path.used,
+ sizeof(*host->rrdpush.path.array), compare_by_hops);
+ }
+
+ XXH128_hash_t new_hash = stream_path_hash_unsafe(host);
+ spinlock_unlock(&host->rrdpush.path.spinlock);
+
+ if(!XXH128_isEqual(old_hash, new_hash)) {
+ if(!from_parent)
+ stream_path_send_to_parent(host);
+
+ // when it comes from the child, we still need to send it back to the child
+ // including our own entry in it.
+ stream_path_send_to_child(host);
+ }
+
+ return host->rrdpush.path.used > 0;
+}