diff options
Diffstat (limited to 'src/aclk/aclk_util.c')
-rw-r--r-- | src/aclk/aclk_util.c | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/src/aclk/aclk_util.c b/src/aclk/aclk_util.c new file mode 100644 index 000000000..3bf2e3f18 --- /dev/null +++ b/src/aclk/aclk_util.c @@ -0,0 +1,484 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_util.h" + +#ifdef ENABLE_ACLK + +#include "aclk_proxy.h" + +#include "daemon/common.h" + +usec_t aclk_session_newarch = 0; + +aclk_env_t *aclk_env = NULL; + +int chart_batch_id; + +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { + if (!strcmp(str, "json")) { + return ACLK_ENC_JSON; + } + if (!strcmp(str, "proto")) { + return ACLK_ENC_PROTO; + } + return ACLK_ENC_UNKNOWN; +} + +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) { + if (!strcmp(str, "MQTTv3")) { + return ACLK_TRP_MQTT_3_1_1; + } + if (!strcmp(str, "MQTTv5")) { + return ACLK_TRP_MQTT_5; + } + return ACLK_TRP_UNKNOWN; +} + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) { + freez(trp_desc->endpoint); +} + +void aclk_env_t_destroy(aclk_env_t *env) { + freez(env->auth_endpoint); + if (env->transports) { + for (size_t i = 0; i < env->transport_count; i++) { + if(env->transports[i]) { + aclk_transport_desc_t_destroy(env->transports[i]); + freez(env->transports[i]); + env->transports[i] = NULL; + } + } + freez(env->transports); + } + if (env->capabilities) { + for (size_t i = 0; i < env->capability_count; i++) + freez(env->capabilities[i]); + freez(env->capabilities); + } +} + +int aclk_env_has_capa(const char *capa) +{ + for (int i = 0; i < (int) aclk_env->capability_count; i++) { + if (!strcasecmp(capa, aclk_env->capabilities[i])) + return 1; + } + return 0; +} + +#ifdef ACLK_LOG_CONVERSATION_DIR +volatile int aclk_conversation_log_counter = 0; +#endif + +#define ACLK_TOPIC_PREFIX "/agent/" + +struct aclk_topic { + enum aclk_topics topic_id; + // as received from cloud - we keep this for + // eventual topic list update when claim_id changes + char *topic_recvd; + // constructed topic + char *topic; +}; + +// This helps to cache finalized topics (assembled with claim_id) +// to not have to alloc or create buffer and construct topic every +// time message is sent as in old ACLK +static struct aclk_topic **aclk_topic_cache = NULL; +static size_t aclk_topic_cache_items = 0; + +void free_topic_cache(void) +{ + if (aclk_topic_cache) { + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + freez(aclk_topic_cache[i]->topic); + freez(aclk_topic_cache[i]->topic_recvd); + freez(aclk_topic_cache[i]); + } + freez(aclk_topic_cache); + aclk_topic_cache = NULL; + aclk_topic_cache_items = 0; + } +} + +#define JSON_TOPIC_KEY_TOPIC "topic" +#define JSON_TOPIC_KEY_NAME "name" + +struct topic_name { + enum aclk_topics id; + // cloud name - how is it called + // in answer to /password endpoint + const char *name; +} topic_names[] = { + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, + { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, + { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, + { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, + { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" }, + { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" }, + { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, + { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, + { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, + { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log-v2" }, + { .id = ACLK_TOPICID_ALARM_CHECKPOINT, .name = "alarm-checkpoint" }, + { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, + { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot-v2" }, + { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, + { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, + { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } +}; + +enum aclk_topics compulsory_topics[] = { +// TODO remove old topics once not needed anymore + ACLK_TOPICID_CHART, //TODO from legacy + ACLK_TOPICID_ALARMS, //TODO from legacy + ACLK_TOPICID_METADATA, //TODO from legacy + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_AGENT_CONN, + ACLK_TOPICID_CMD_NG_V1, + ACLK_TOPICID_CREATE_NODE, + ACLK_TOPICID_NODE_CONN, + ACLK_TOPICID_CHART_DIMS, + ACLK_TOPICID_CHART_CONFIGS_UPDATED, + ACLK_TOPICID_CHART_RESET, + ACLK_TOPICID_RETENTION_UPDATED, + ACLK_TOPICID_NODE_INFO, + ACLK_TOPICID_ALARM_LOG, + ACLK_TOPICID_ALARM_CHECKPOINT, + ACLK_TOPICID_ALARM_CONFIG, + ACLK_TOPICID_ALARM_SNAPSHOT, + ACLK_TOPICID_NODE_COLLECTORS, + ACLK_TOPICID_CTXS_SNAPSHOT, + ACLK_TOPICID_CTXS_UPDATED, + ACLK_TOPICID_UNKNOWN +}; + +static enum aclk_topics topic_name_to_id(const char *name) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (!strcmp(topic->name, name)) { + return topic->id; + } + topic++; + } + return ACLK_TOPICID_UNKNOWN; +} + +static const char *topic_id_to_name(enum aclk_topics tid) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (topic->id == tid) + return topic->name; + topic++; + } + return "unknown"; +} + +#define CLAIM_ID_REPLACE_TAG "#{claim_id}" +static void topic_generate_final(struct aclk_topic *t) { + char *dest; + char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG); + if (!replace_tag) + return; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + netdata_log_error("This should never be called if agent not claimed"); + rrdhost_aclk_state_unlock(localhost); + return; + } + + t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id)); + memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd); + dest = t->topic + (replace_tag - t->topic_recvd); + + memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id)); + dest += strlen(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + replace_tag += strlen(CLAIM_ID_REPLACE_TAG); + strcpy(dest, replace_tag); + dest += strlen(replace_tag); + *dest = 0; +} + +static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic) +{ + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + netdata_log_error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); + return 1; + } + topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); + if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { + netdata_log_debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + } + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + netdata_log_error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); + return 1; + } + topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + netdata_log_error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (!topic->topic_recvd) { + netdata_log_error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); + return 1; + } + + topic_generate_final(topic); + aclk_topic_cache_items++; + + return 0; +} + +int aclk_generate_topic_cache(struct json_object *json) +{ + json_object *obj; + + size_t array_size = json_object_array_length(json); + if (!array_size) { + netdata_log_error("Empty topic list!"); + return 1; + } + + if (aclk_topic_cache) + free_topic_cache(); + + aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *)); + + for (size_t i = 0; i < array_size; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_object) { + netdata_log_error("expected json_type_object"); + return 1; + } + aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic)); + if (topic_cache_add_topic(obj, aclk_topic_cache[i])) { + netdata_log_error("failed to parse topic @idx=%d", (int)i); + return 1; + } + } + + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { + if (!aclk_get_topic(compulsory_topics[i])) { + netdata_log_error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); + return 1; + } + } + + return 0; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ +const char *aclk_get_topic(enum aclk_topics topic) +{ + if (!aclk_topic_cache) { + netdata_log_error("Topic cache not initialized"); + return NULL; + } + + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + if (aclk_topic_cache[i]->topic_id == topic) + return aclk_topic_cache[i]->topic; + } + netdata_log_error("Unknown topic"); + return NULL; +} + +/* + * Allows iterating all topics in topic cache without + * having to resort to callbacks. + */ + +const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter) +{ + if (!aclk_topic_cache) { + netdata_log_error("Topic cache not initialized when %s was called.", __FUNCTION__); + return NULL; + } + + if (*iter >= aclk_topic_cache_items) + return NULL; + + return aclk_topic_cache[(*iter)++]->topic; +} + +/* + * TBEB with randomness + * + * @param reset 1 - to reset the delay, + * 0 - to advance a step and calculate sleep time in ms + * @param min, max in seconds + * @returns delay in ms + * + */ + +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) { + static int attempt = -1; + + if (reset) { + attempt = -1; + return 0; + } + + attempt++; + + if (attempt == 0) { + srandom(time(NULL)); + return 0; + } + + unsigned long int delay = pow(base, attempt - 1); + delay *= MSEC_PER_SEC; + + delay += (random() % (MAX(1000, delay/2))); + + if (delay <= min * MSEC_PER_SEC) + return min; + + if (delay >= max * MSEC_PER_SEC) + return max; + + return delay; +} + +static inline int aclk_parse_pair(const char *src, const char c, char **a, char **b) +{ + const char *ptr = strchr(src, c); + if (ptr == NULL) + return 1; + +// allow empty string +/* if (!*(ptr+1)) + return 1;*/ + + *a = callocz(1, ptr - src + 1); + memcpy (*a, src, ptr - src); + + *b = strdupz(ptr+1); + + return 0; +} + +#define HTTP_PROXY_PREFIX "http://" +void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type) +{ + ACLK_PROXY_TYPE pt; + const char *ptr = aclk_get_proxy(&pt); + char *tmp; + + if (pt != PROXY_TYPE_HTTP) + return; + + *uname = NULL; + *pwd = NULL; + *port = 0; + + char *proxy = strdupz(ptr); + ptr = proxy; + + if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) + ptr += strlen(HTTP_PROXY_PREFIX); + + if ((tmp = strchr(ptr, '@'))) { + *tmp = 0; + if(aclk_parse_pair(ptr, ':', uname, pwd)) { + error_report("Failed to get username and password for proxy. Will attempt connection without authentication"); + } + ptr = tmp+1; + } + + if (!*ptr) { + freez(proxy); + freez(*uname); + freez(*pwd); + return; + } + + if ((tmp = strchr(ptr, ':'))) { + *tmp = 0; + tmp++; + if(*tmp) + *port = atoi(tmp); + } + *ohost = strdupz(ptr); + + if (*port <= 0 || *port > 65535) + *port = 8080; + + if (type) + *type = MQTT_WSS_PROXY_HTTP; + else { + freez(*uname); + freez(*pwd); + } + + freez(proxy); +} +#endif /* ENABLE_ACLK */ + +#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 +static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) +{ + EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); + + if (ctx != NULL) { + memset(ctx, 0, sizeof(*ctx)); + } + return ctx; +} +static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) +{ + OPENSSL_free(ctx); + return; +} +#endif + +int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) +{ + int len; + unsigned char *str = out; + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_EncodeInit(ctx); + EVP_EncodeUpdate(ctx, str, outl, in, in_len); + str += *outl; + EVP_EncodeFinal(ctx, str, &len); + *outl += len; + + str = out; + while(*str) { + if (*str != 0x0D && *str != 0x0A) + *out++ = *str++; + else + str++; + } + *out = 0; + + EVP_ENCODE_CTX_free(ctx); + return 0; +} |