summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_util.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_util.c')
-rw-r--r--aclk/aclk_util.c365
1 files changed, 279 insertions, 86 deletions
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index a5347c466..b8ac66756 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk_util.h"
#include <stdio.h>
@@ -10,6 +12,48 @@
#define UUID_STR_LEN 37
#endif
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
+ if (!strcmp(str, "json")) {
+ return ACLK_ENC_JSON;
+ }
+ if (!strcmp(str, "proto")) {
+ return ACLK_ENC_PROTO;
+ }
+ return ACLK_ENC_UNKNOWN;
+}
+
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
+ if (!strcmp(str, "MQTTv3")) {
+ return ACLK_TRP_MQTT_3_1_1;
+ }
+ if (!strcmp(str, "MQTTv5")) {
+ return ACLK_TRP_MQTT_5;
+ }
+ return ACLK_TRP_UNKNOWN;
+}
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
+ freez(trp_desc->endpoint);
+}
+
+void aclk_env_t_destroy(aclk_env_t *env) {
+ freez(env->auth_endpoint);
+ if (env->transports) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ if(env->transports[i]) {
+ aclk_transport_desc_t_destroy(env->transports[i]);
+ env->transports[i] = NULL;
+ }
+ }
+ freez(env->transports);
+ }
+ if (env->capabilities) {
+ for (size_t i = 0; i < env->capability_count; i++)
+ freez(env->capabilities[i]);
+ freez(env->capabilities);
+ }
+}
+
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
@@ -28,137 +72,246 @@ int aclk_get_conv_log_next()
#define ACLK_TOPIC_PREFIX "/agent/"
struct aclk_topic {
- const char *topic_suffix;
+ enum aclk_topics topic_id;
+ // as received from cloud - we keep this for
+ // eventual topic list update when claim_id changes
+ char *topic_recvd;
+ // constructed topic
char *topic;
};
// This helps to cache finalized topics (assembled with claim_id)
// to not have to alloc or create buffer and construct topic every
// time message is sent as in old ACLK
-static struct aclk_topic aclk_topic_cache[] = {
- { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART
- { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS
- { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA
- { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND
- { .topic_suffix = NULL, .topic = NULL }
-};
+static struct aclk_topic **aclk_topic_cache = NULL;
+static size_t aclk_topic_cache_items = 0;
void free_topic_cache(void)
{
- struct aclk_topic *tc = aclk_topic_cache;
- while (tc->topic_suffix) {
- if (tc->topic) {
- freez(tc->topic);
- tc->topic = NULL;
+ if (aclk_topic_cache) {
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ freez(aclk_topic_cache[i]->topic);
+ freez(aclk_topic_cache[i]->topic_recvd);
+ freez(aclk_topic_cache[i]);
}
- tc++;
+ freez(aclk_topic_cache);
+ aclk_topic_cache = NULL;
+ aclk_topic_cache_items = 0;
}
}
-static inline void generate_topic_cache(void)
-{
- struct aclk_topic *tc = aclk_topic_cache;
- char *ptr;
- if (unlikely(!tc->topic)) {
- rrdhost_aclk_state_lock(localhost);
- while(tc->topic_suffix) {
- tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix));
- ptr = tc->topic;
- strcpy(ptr, ACLK_TOPIC_PREFIX);
- ptr += strlen(ACLK_TOPIC_PREFIX);
- strcpy(ptr, localhost->aclk_state.claimed_id);
- ptr += (UUID_STR_LEN - 1);
- *ptr++ = '/';
- strcpy(ptr, tc->topic_suffix);
- tc++;
+#define JSON_TOPIC_KEY_TOPIC "topic"
+#define JSON_TOPIC_KEY_NAME "name"
+
+struct topic_name {
+ enum aclk_topics id;
+ // cloud name - how is it called
+ // in answer to /password endpoint
+ const char *name;
+} topic_names[] = {
+ { .id = ACLK_TOPICID_CHART, .name = "chart" },
+ { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
+ { .id = ACLK_TOPICID_METADATA, .name = "meta" },
+ { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
+ { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
+};
+
+enum aclk_topics compulsory_topics[] = {
+ ACLK_TOPICID_CHART,
+ ACLK_TOPICID_ALARMS,
+ ACLK_TOPICID_METADATA,
+ ACLK_TOPICID_COMMAND,
+ ACLK_TOPICID_UNKNOWN
+};
+
+static enum aclk_topics topic_name_to_id(const char *name) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (!strcmp(topic->name, name)) {
+ return topic->id;
}
+ topic++;
+ }
+ return ACLK_TOPICID_UNKNOWN;
+}
+
+static const char *topic_id_to_name(enum aclk_topics tid) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (topic->id == tid)
+ return topic->name;
+ topic++;
+ }
+ return "unknown";
+}
+
+#define CLAIM_ID_REPLACE_TAG "#{claim_id}"
+static void topic_generate_final(struct aclk_topic *t) {
+ char *dest;
+ char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
+ if (!replace_tag)
+ return;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("This should never be called if agent not claimed");
rrdhost_aclk_state_unlock(localhost);
+ return;
}
+
+ t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id));
+ memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd);
+ dest = t->topic + (replace_tag - t->topic_recvd);
+
+ memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id));
+ dest += strlen(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ replace_tag += strlen(CLAIM_ID_REPLACE_TAG);
+ strcpy(dest, replace_tag);
+ dest += strlen(replace_tag);
+ *dest = 0;
}
-/*
- * Build a topic based on sub_topic and final_topic
- * if the sub topic starts with / assume that is an absolute topic
- *
- */
-const char *aclk_get_topic(enum aclk_topics topic)
+static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic)
{
- generate_topic_cache();
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (topic->topic_id == ACLK_TOPICID_UNKNOWN) {
+ info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
- return aclk_topic_cache[topic].topic;
+ if (!topic->topic_recvd) {
+ error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC);
+ return 1;
+ }
+
+ topic_generate_final(topic);
+ aclk_topic_cache_items++;
+
+ return 0;
}
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
+int aclk_generate_topic_cache(struct json_object *json)
{
- int pos = 0;
- if (!strncmp("https://", url, 8)) {
- pos = 8;
- } else if (!strncmp("http://", url, 7)) {
- error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
+ json_object *obj;
+
+ size_t array_size = json_object_array_length(json);
+ if (!array_size) {
+ error("Empty topic list!");
return 1;
}
- int host_end = pos;
- while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
- host_end++;
- if (url[host_end] == 0) {
- *aclk_hostname = strdupz(url + pos);
- *aclk_port = 443;
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
- }
- if (url[host_end] == ':') {
- *aclk_hostname = callocz(host_end - pos + 1, 1);
- strncpy(*aclk_hostname, url + pos, host_end - pos);
- int port_end = host_end + 1;
- while (url[port_end] >= '0' && url[port_end] <= '9')
- port_end++;
- if (port_end - host_end > 6) {
- error("Port specified in %s is invalid", url);
- freez(*aclk_hostname);
- *aclk_hostname = NULL;
+
+ if (aclk_topic_cache)
+ free_topic_cache();
+
+ aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *));
+
+ for (size_t i = 0; i < array_size; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_object) {
+ error("expected json_type_object");
+ return 1;
+ }
+ aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic));
+ if (topic_cache_add_topic(obj, aclk_topic_cache[i])) {
+ error("failed to parse topic @idx=%d", (int)i);
return 1;
}
- *aclk_port = atoi(&url[host_end+1]);
}
- if (url[host_end] == '/') {
- *aclk_port = 443;
- *aclk_hostname = callocz(1, host_end - pos + 1);
- strncpy(*aclk_hostname, url+pos, host_end - pos);
+
+ for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
+ if (!aclk_get_topic(compulsory_topics[i])) {
+ error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
+ return 1;
+ }
}
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+
return 0;
}
/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+const char *aclk_get_topic(enum aclk_topics topic)
+{
+ if (!aclk_topic_cache) {
+ error("Topic cache not initialized");
+ return NULL;
+ }
+
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ if (aclk_topic_cache[i]->topic_id == topic)
+ return aclk_topic_cache[i]->topic;
+ }
+ error("Unknown topic");
+ return NULL;
+}
+
+/*
* TBEB with randomness
*
- * @param mode 0 - to reset the delay,
- * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * @param reset 1 - to reset the delay,
+ * 0 - to advance a step and calculate sleep time in ms
+ * @param min, max in seconds
* @returns delay in ms
*
*/
-#define ACLK_MAX_BACKOFF_DELAY 1024
-unsigned long int aclk_reconnect_delay(int mode)
-{
- static int fail = -1;
- unsigned long int delay;
- if (!mode || fail == -1) {
- srandom(time(NULL));
- fail = mode - 1;
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) {
+ static int attempt = -1;
+
+ if (reset) {
+ attempt = -1;
return 0;
}
- delay = (1 << fail);
+ attempt++;
- if (delay >= ACLK_MAX_BACKOFF_DELAY) {
- delay = ACLK_MAX_BACKOFF_DELAY * 1000;
- } else {
- fail++;
- delay *= 1000;
- delay += (random() % (MAX(1000, delay/2)));
+ if (attempt == 0) {
+ srandom(time(NULL));
+ return 0;
}
+ unsigned long int delay = pow(base, attempt - 1);
+ delay *= MSEC_PER_SEC;
+
+ delay += (random() % (MAX(1000, delay/2)));
+
+ if (delay <= min * MSEC_PER_SEC)
+ return min;
+
+ if (delay >= max * MSEC_PER_SEC)
+ return max;
+
return delay;
}
@@ -345,3 +498,43 @@ const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
*type = proxy_type;
return proxy;
}
+
+#define HTTP_PROXY_PREFIX "http://"
+void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type)
+{
+ ACLK_PROXY_TYPE pt;
+ const char *ptr = aclk_get_proxy(&pt);
+ char *tmp;
+ char *host;
+ if (pt != PROXY_TYPE_HTTP)
+ return;
+
+ *port = 0;
+
+ if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
+ ptr += strlen(HTTP_PROXY_PREFIX);
+
+ if ((tmp = strchr(ptr, '@')))
+ ptr = tmp;
+
+ if ((tmp = strchr(ptr, '/'))) {
+ host = mallocz((tmp - ptr) + 1);
+ memcpy(host, ptr, (tmp - ptr));
+ host[tmp - ptr] = 0;
+ } else
+ host = strdupz(ptr);
+
+ if ((tmp = strchr(host, ':'))) {
+ *tmp = 0;
+ tmp++;
+ *port = atoi(tmp);
+ }
+
+ if (*port <= 0 || *port > 65535)
+ *port = 8080;
+
+ *ohost = host;
+
+ if (type)
+ *type = MQTT_WSS_PROXY_HTTP;
+}