diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-05-19 12:33:27 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-05-19 12:33:27 +0000 |
commit | 841395dd16f470e3c051a0a4fff5b91efc983c30 (patch) | |
tree | 4115f6eedcddda75067130b80acaff9e51612f49 /aclk/aclk_util.c | |
parent | Adding upstream version 1.30.1. (diff) | |
download | netdata-841395dd16f470e3c051a0a4fff5b91efc983c30.tar.xz netdata-841395dd16f470e3c051a0a4fff5b91efc983c30.zip |
Adding upstream version 1.31.0.upstream/1.31.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_util.c')
-rw-r--r-- | aclk/aclk_util.c | 365 |
1 files changed, 279 insertions, 86 deletions
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index a5347c466..b8ac66756 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk_util.h" #include <stdio.h> @@ -10,6 +12,48 @@ #define UUID_STR_LEN 37 #endif +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { + if (!strcmp(str, "json")) { + return ACLK_ENC_JSON; + } + if (!strcmp(str, "proto")) { + return ACLK_ENC_PROTO; + } + return ACLK_ENC_UNKNOWN; +} + +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) { + if (!strcmp(str, "MQTTv3")) { + return ACLK_TRP_MQTT_3_1_1; + } + if (!strcmp(str, "MQTTv5")) { + return ACLK_TRP_MQTT_5; + } + return ACLK_TRP_UNKNOWN; +} + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) { + freez(trp_desc->endpoint); +} + +void aclk_env_t_destroy(aclk_env_t *env) { + freez(env->auth_endpoint); + if (env->transports) { + for (size_t i = 0; i < env->transport_count; i++) { + if(env->transports[i]) { + aclk_transport_desc_t_destroy(env->transports[i]); + env->transports[i] = NULL; + } + } + freez(env->transports); + } + if (env->capabilities) { + for (size_t i = 0; i < env->capability_count; i++) + freez(env->capabilities[i]); + freez(env->capabilities); + } +} + #ifdef ACLK_LOG_CONVERSATION_DIR volatile int aclk_conversation_log_counter = 0; #if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) @@ -28,137 +72,246 @@ int aclk_get_conv_log_next() #define ACLK_TOPIC_PREFIX "/agent/" struct aclk_topic { - const char *topic_suffix; + enum aclk_topics topic_id; + // as received from cloud - we keep this for + // eventual topic list update when claim_id changes + char *topic_recvd; + // constructed topic char *topic; }; // This helps to cache finalized topics (assembled with claim_id) // to not have to alloc or create buffer and construct topic every // time message is sent as in old ACLK -static struct aclk_topic aclk_topic_cache[] = { - { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART - { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS - { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA - { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND - { .topic_suffix = NULL, .topic = NULL } -}; +static struct aclk_topic **aclk_topic_cache = NULL; +static size_t aclk_topic_cache_items = 0; void free_topic_cache(void) { - struct aclk_topic *tc = aclk_topic_cache; - while (tc->topic_suffix) { - if (tc->topic) { - freez(tc->topic); - tc->topic = NULL; + if (aclk_topic_cache) { + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + freez(aclk_topic_cache[i]->topic); + freez(aclk_topic_cache[i]->topic_recvd); + freez(aclk_topic_cache[i]); } - tc++; + freez(aclk_topic_cache); + aclk_topic_cache = NULL; + aclk_topic_cache_items = 0; } } -static inline void generate_topic_cache(void) -{ - struct aclk_topic *tc = aclk_topic_cache; - char *ptr; - if (unlikely(!tc->topic)) { - rrdhost_aclk_state_lock(localhost); - while(tc->topic_suffix) { - tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix)); - ptr = tc->topic; - strcpy(ptr, ACLK_TOPIC_PREFIX); - ptr += strlen(ACLK_TOPIC_PREFIX); - strcpy(ptr, localhost->aclk_state.claimed_id); - ptr += (UUID_STR_LEN - 1); - *ptr++ = '/'; - strcpy(ptr, tc->topic_suffix); - tc++; +#define JSON_TOPIC_KEY_TOPIC "topic" +#define JSON_TOPIC_KEY_NAME "name" + +struct topic_name { + enum aclk_topics id; + // cloud name - how is it called + // in answer to /password endpoint + const char *name; +} topic_names[] = { + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } +}; + +enum aclk_topics compulsory_topics[] = { + ACLK_TOPICID_CHART, + ACLK_TOPICID_ALARMS, + ACLK_TOPICID_METADATA, + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_UNKNOWN +}; + +static enum aclk_topics topic_name_to_id(const char *name) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (!strcmp(topic->name, name)) { + return topic->id; } + topic++; + } + return ACLK_TOPICID_UNKNOWN; +} + +static const char *topic_id_to_name(enum aclk_topics tid) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (topic->id == tid) + return topic->name; + topic++; + } + return "unknown"; +} + +#define CLAIM_ID_REPLACE_TAG "#{claim_id}" +static void topic_generate_final(struct aclk_topic *t) { + char *dest; + char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG); + if (!replace_tag) + return; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("This should never be called if agent not claimed"); rrdhost_aclk_state_unlock(localhost); + return; } + + t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id)); + memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd); + dest = t->topic + (replace_tag - t->topic_recvd); + + memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id)); + dest += strlen(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + replace_tag += strlen(CLAIM_ID_REPLACE_TAG); + strcpy(dest, replace_tag); + dest += strlen(replace_tag); + *dest = 0; } -/* - * Build a topic based on sub_topic and final_topic - * if the sub topic starts with / assume that is an absolute topic - * - */ -const char *aclk_get_topic(enum aclk_topics topic) +static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic) { - generate_topic_cache(); + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); + return 1; + } + topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); + if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { + info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + } + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); + return 1; + } + topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } - return aclk_topic_cache[topic].topic; + if (!topic->topic_recvd) { + error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); + return 1; + } + + topic_generate_final(topic); + aclk_topic_cache_items++; + + return 0; } -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) +int aclk_generate_topic_cache(struct json_object *json) { - int pos = 0; - if (!strncmp("https://", url, 8)) { - pos = 8; - } else if (!strncmp("http://", url, 7)) { - error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); + json_object *obj; + + size_t array_size = json_object_array_length(json); + if (!array_size) { + error("Empty topic list!"); return 1; } - int host_end = pos; - while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') - host_end++; - if (url[host_end] == 0) { - *aclk_hostname = strdupz(url + pos); - *aclk_port = 443; - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); - return 0; - } - if (url[host_end] == ':') { - *aclk_hostname = callocz(host_end - pos + 1, 1); - strncpy(*aclk_hostname, url + pos, host_end - pos); - int port_end = host_end + 1; - while (url[port_end] >= '0' && url[port_end] <= '9') - port_end++; - if (port_end - host_end > 6) { - error("Port specified in %s is invalid", url); - freez(*aclk_hostname); - *aclk_hostname = NULL; + + if (aclk_topic_cache) + free_topic_cache(); + + aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *)); + + for (size_t i = 0; i < array_size; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_object) { + error("expected json_type_object"); + return 1; + } + aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic)); + if (topic_cache_add_topic(obj, aclk_topic_cache[i])) { + error("failed to parse topic @idx=%d", (int)i); return 1; } - *aclk_port = atoi(&url[host_end+1]); } - if (url[host_end] == '/') { - *aclk_port = 443; - *aclk_hostname = callocz(1, host_end - pos + 1); - strncpy(*aclk_hostname, url+pos, host_end - pos); + + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { + if (!aclk_get_topic(compulsory_topics[i])) { + error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); + return 1; + } } - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; } /* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ +const char *aclk_get_topic(enum aclk_topics topic) +{ + if (!aclk_topic_cache) { + error("Topic cache not initialized"); + return NULL; + } + + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + if (aclk_topic_cache[i]->topic_id == topic) + return aclk_topic_cache[i]->topic; + } + error("Unknown topic"); + return NULL; +} + +/* * TBEB with randomness * - * @param mode 0 - to reset the delay, - * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * @param reset 1 - to reset the delay, + * 0 - to advance a step and calculate sleep time in ms + * @param min, max in seconds * @returns delay in ms * */ -#define ACLK_MAX_BACKOFF_DELAY 1024 -unsigned long int aclk_reconnect_delay(int mode) -{ - static int fail = -1; - unsigned long int delay; - if (!mode || fail == -1) { - srandom(time(NULL)); - fail = mode - 1; +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) { + static int attempt = -1; + + if (reset) { + attempt = -1; return 0; } - delay = (1 << fail); + attempt++; - if (delay >= ACLK_MAX_BACKOFF_DELAY) { - delay = ACLK_MAX_BACKOFF_DELAY * 1000; - } else { - fail++; - delay *= 1000; - delay += (random() % (MAX(1000, delay/2))); + if (attempt == 0) { + srandom(time(NULL)); + return 0; } + unsigned long int delay = pow(base, attempt - 1); + delay *= MSEC_PER_SEC; + + delay += (random() % (MAX(1000, delay/2))); + + if (delay <= min * MSEC_PER_SEC) + return min; + + if (delay >= max * MSEC_PER_SEC) + return max; + return delay; } @@ -345,3 +498,43 @@ const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) *type = proxy_type; return proxy; } + +#define HTTP_PROXY_PREFIX "http://" +void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type) +{ + ACLK_PROXY_TYPE pt; + const char *ptr = aclk_get_proxy(&pt); + char *tmp; + char *host; + if (pt != PROXY_TYPE_HTTP) + return; + + *port = 0; + + if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) + ptr += strlen(HTTP_PROXY_PREFIX); + + if ((tmp = strchr(ptr, '@'))) + ptr = tmp; + + if ((tmp = strchr(ptr, '/'))) { + host = mallocz((tmp - ptr) + 1); + memcpy(host, ptr, (tmp - ptr)); + host[tmp - ptr] = 0; + } else + host = strdupz(ptr); + + if ((tmp = strchr(host, ':'))) { + *tmp = 0; + tmp++; + *port = atoi(tmp); + } + + if (*port <= 0 || *port > 65535) + *port = 8080; + + *ohost = host; + + if (type) + *type = MQTT_WSS_PROXY_HTTP; +} |