diff options
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 226 | ||||
-rw-r--r-- | aclk/aclk.h | 28 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 746 | ||||
-rw-r--r-- | aclk/aclk_otp.h | 5 | ||||
-rw-r--r-- | aclk/aclk_query.c | 19 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 88 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 1 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 2 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 64 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 2 | ||||
-rw-r--r-- | aclk/aclk_util.c | 365 | ||||
-rw-r--r-- | aclk/aclk_util.h | 59 | ||||
-rw-r--r-- | aclk/https_client.c | 556 | ||||
-rw-r--r-- | aclk/https_client.h | 73 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.c | 8 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.c | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.h | 6 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.c | 2 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.c | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.c | 4 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 10 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.h | 4 | ||||
-rw-r--r-- | aclk/legacy/mqtt.c | 6 | ||||
-rw-r--r-- | aclk/legacy/tests/paho-inspection.py | 2 |
24 files changed, 1779 insertions, 505 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 889fa1e4..35549cfe 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk.h" #include "aclk_stats.h" @@ -9,6 +11,7 @@ #include "aclk_util.h" #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" +#include "https_client.h" #ifdef ACLK_LOG_CONVERSATION_DIR #include <sys/types.h> @@ -26,9 +29,13 @@ int aclk_kill_link = 0; int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +time_t aclk_block_until = 0; + usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer +aclk_env_t *aclk_env = NULL; + mqtt_wss_client mqttwss_client; netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; @@ -38,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_shared_state aclk_shared_state = { .agent_state = AGENT_INITIALIZING, .last_popcorn_interrupt = 0, - .version_neg = 0, - .version_neg_wait_till = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -138,8 +143,7 @@ static int wait_till_agent_claimed(void) */ static int wait_till_agent_claim_ready() { - int port; - char *hostname = NULL; + url_t url; while (!netdata_exit) { if (wait_till_agent_claimed()) return 1; @@ -154,15 +158,14 @@ static int wait_till_agent_claim_ready() // We just check configuration is valid here // TODO make it without malloc/free - if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) { + memset(&url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &url)) { error("Agent is claimed but the configuration is invalid, please fix"); - freez(hostname); - hostname = NULL; + url_t_destroy(&url); sleep(5); continue; } - freez(hostname); - hostname = NULL; + url_t_destroy(&url); if (!load_private_key()) { sleep(5); @@ -198,6 +201,11 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int { char cmsg[RX_MSGLEN_MAX]; size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); + const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + if (!cmd_topic) { + error("Error retrieving command topic"); + return; + } if (msglen > RX_MSGLEN_MAX - 1) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -221,7 +229,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg); - if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic)) + if (strcmp(cmd_topic, topic)) error("Received message on unexpected topic %s", topic); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { @@ -235,7 +243,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) - aclk_reconnect_delay(0); + aclk_tbeb_reset(); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_puback(packet_id); @@ -320,15 +328,20 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_session_sec = now / USEC_PER_SEC; aclk_session_us = now % USEC_PER_SEC; - mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1); + const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + + if (!topic) + error("Unable to fetch topic for COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - aclk_hello_msg(client); + ACLK_SHARED_STATE_LOCK; if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - error("Sending `connect` payload immediatelly as popcorning was finished already."); + error("Sending `connect` payload immediately as popcorning was finished already."); queue_connect_payloads(); } ACLK_SHARED_STATE_UNLOCK; @@ -393,16 +406,41 @@ void aclk_graceful_disconnect(mqtt_wss_client client) mqtt_wss_disconnect(client, 1000); } +static unsigned long aclk_reconnect_delay() { + unsigned long recon_delay; + time_t now; + + if (aclk_disable_runtime) { + aclk_tbeb_reset(); + return 60 * MSEC_PER_SEC; + } + + now = now_monotonic_sec(); + if (aclk_block_until) { + if (now < aclk_block_until) { + recon_delay = aclk_block_until - now; + recon_delay *= MSEC_PER_SEC; + aclk_block_until = 0; + aclk_tbeb_reset(); + return recon_delay; + } + aclk_block_until = 0; + } + + if (!aclk_env || !aclk_env->backoff.base) + return aclk_tbeb_delay(0, 2, 0, 1024); + + return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s); +} + /* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled * @return 0 - Go ahead and connect (delay expired) * 1 - netdata_exit */ #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4) static int aclk_block_till_recon_allowed() { - // Handle reconnect exponential backoff - // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss - // but has been modifed slightly (more randomness) - unsigned long recon_delay = aclk_reconnect_delay(1); + unsigned long recon_delay = aclk_reconnect_delay(); + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) @@ -420,44 +458,22 @@ static int aclk_block_till_recon_allowed() { return 0; } -#define HTTP_PROXY_PREFIX "http://" -static void set_proxy(struct mqtt_wss_proxy *out) -{ - ACLK_PROXY_TYPE pt; - const char *ptr = aclk_get_proxy(&pt); - char *tmp; - char *host; - if (pt != PROXY_TYPE_HTTP) - return; - - out->port = 0; - - if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) - ptr += strlen(HTTP_PROXY_PREFIX); - - if ((tmp = strchr(ptr, '@'))) - ptr = tmp; - - if ((tmp = strchr(ptr, '/'))) { - host = mallocz((tmp - ptr) + 1); - memcpy(host, ptr, (tmp - ptr)); - host[tmp - ptr] = 0; - } else - host = strdupz(ptr); - - if ((tmp = strchr(host, ':'))) { - *tmp = 0; - tmp++; - out->port = atoi(tmp); +#ifndef ACLK_DISABLE_CHALLENGE +/* Cloud returns transport list ordered with highest + * priority first. This function selects highest prio + * transport that we can actually use (support) + */ +static int aclk_get_transport_idx(aclk_env_t *env) { + for (size_t i = 0; i < env->transport_count; i++) { + // currently we support only MQTT 3 + // therefore select first transport that matches + if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + return i; + } } - - if (out->port <= 0 || out->port > 65535) - out->port = 8080; - - out->host = host; - - out->type = MQTT_WSS_PROXY_HTTP; + return -1; } +#endif /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client @@ -473,12 +489,13 @@ static void set_proxy(struct mqtt_wss_proxy *out) #endif static int aclk_attempt_to_connect(mqtt_wss_client client) { - char *aclk_hostname = NULL; - int aclk_port; + int ret; + + url_t base_url; #ifndef ACLK_DISABLE_CHALLENGE - char *mqtt_otp_user = NULL; - char *mqtt_otp_pass = NULL; + url_t auth_url; + url_t mqtt_url; #endif json_object *lwt; @@ -494,48 +511,103 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) return 1; info("Attempting connection now"); - if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { + memset(&base_url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &base_url)) { error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); + url_t_destroy(&base_url); continue; } - struct mqtt_wss_proxy proxy_conf; - proxy_conf.type = MQTT_WSS_DIRECT; - set_proxy(&proxy_conf); + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); struct mqtt_connect_params mqtt_conn_params = { .clientid = "anon", .username = "anon", .password = "anon", - .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA), + .will_topic = "lwt", .will_msg = NULL, .will_flags = MQTT_WSS_PUB_QOS2, .keep_alive = 60 }; + #ifndef ACLK_DISABLE_CHALLENGE - aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass); - mqtt_conn_params.clientid = mqtt_otp_user; - mqtt_conn_params.username = mqtt_otp_user; - mqtt_conn_params.password = mqtt_otp_pass; + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } + aclk_env = callocz(1, sizeof(aclk_env_t)); + + ret = aclk_get_env(aclk_env, base_url.host, base_url.port); + url_t_destroy(&base_url); + if (ret) { + error("Failed to Get ACLK environment"); + // delay handled by aclk_block_till_recon_allowed + continue; + } + + memset(&auth_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + url_t_destroy(&auth_url); + continue; + } + + ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); + url_t_destroy(&auth_url); + if (ret) { + error("Error passing Challenge/Response to get OTP"); + continue; + } + + // aclk_get_topic moved here as during OTP we + // generate the topic cache + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { + error("Couldn't get LWT topic. Will not send LWT."); + continue; + } + + // Do the MQTT connection + ret = aclk_get_transport_idx(aclk_env); + if (ret < 0) { + error("Cloud /env endpoint didn't return any transport usable by this Agent."); + continue; + } + + memset(&mqtt_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + url_t_destroy(&mqtt_url); + continue; + } #endif lwt = aclk_generate_disconnect(NULL); mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); - if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) { - json_object_put(lwt); - freez(aclk_hostname); - aclk_hostname = NULL; + +#ifdef ACLK_DISABLE_CHALLENGE + ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&base_url); +#else + ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&mqtt_url); + + freez((char*)mqtt_conn_params.clientid); + freez((char*)mqtt_conn_params.password); + freez((char*)mqtt_conn_params.username); +#endif + + json_object_put(lwt); + + if (!ret) { info("MQTTWSS connection succeeded"); mqtt_connected_actions(client); return 0; } - freez(aclk_hostname); - aclk_hostname = NULL; - json_object_put(lwt); error("Connect failed\n"); } @@ -637,6 +709,10 @@ exit_full: free_topic_cache(); mqtt_wss_destroy(mqttwss_client); exit: + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } diff --git a/aclk/aclk.h b/aclk/aclk.h index 29626c7f..b02b93d7 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -9,23 +9,8 @@ typedef struct aclk_rrdhost_state { #include "../daemon/common.h" #include "aclk_util.h" -// minimum and maximum supported version of ACLK -// in this version of agent -#define ACLK_VERSION_MIN 2 -#define ACLK_VERSION_MAX 2 - -// Version negotiation messages have they own versioning -// this is also used for LWT message as we set that up -// before version negotiation -#define ACLK_VERSION_NEG_VERSION 1 - -// Maximum time to wait for version negotiation before aborting -// and defaulting to oldest supported version -#define VERSION_NEG_TIMEOUT 3 - -#if ACLK_VERSION_MIN > ACLK_VERSION_MAX -#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" -#endif +// version for aclk legacy (old cloud arch) +#define ACLK_VERSION 2 // Define ACLK Feature Version Boundaries Here #define ACLK_V_COMPRESSION 2 @@ -40,9 +25,13 @@ extern int aclk_disable_single_updates; extern int aclk_kill_link; extern int aclk_connected; +extern time_t aclk_block_until; + extern usec_t aclk_session_us; extern time_t aclk_session_sec; +extern aclk_env_t *aclk_env; + void *aclk_main(void *ptr); void aclk_single_update_disable(); void aclk_single_update_enable(); @@ -68,11 +57,6 @@ extern struct aclk_shared_state { ACLK_AGENT_STATE agent_state; time_t last_popcorn_interrupt; - // read only while ACLK connected - // protect by lock otherwise - int version_neg; - usec_t version_neg_wait_till; - // To wait for `disconnect` message PUBACK // when shuting down // at the same time if > 0 we know link is diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index fcb9d600..411a5f89 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -3,12 +3,16 @@ #include "aclk_otp.h" -#include "https_client.h" - #include "../daemon/common.h" #include "../mqtt_websockets/c-rbuf/include/ringbuffer.h" +// CentOS 7 has older version that doesn't define this +// same goes for MacOS +#ifndef UUID_STR_LEN +#define UUID_STR_LEN 37 +#endif + struct dictionary_singleton { char *key; char *result; @@ -167,54 +171,321 @@ static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, u return result; } -// aclk_get_mqtt_otp is slightly modified original code from @amoss -void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass) -{ - char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - debug(D_ACLK, "Performing challenge-response sequence"); - if (*mqtt_pass != NULL) - { - freez(*mqtt_pass); - *mqtt_pass = NULL; +static int aclk_https_request(https_req_t *request, https_req_response_t *response) { + int rc; + // wrapper for ACLK only which loads ACLK specific proxy settings + // then only calls https_request + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); + + if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) { + request->proxy_host = (char*)proxy_conf.host; // TODO make it const as well + request->proxy_port = proxy_conf.port; + } + + rc = https_request(request, response); + freez((char*)proxy_conf.host); + return rc; +} + +struct auth_data { + char *client_id; + char *username; + char *passwd; +}; + +#define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \ + if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \ + error("value of key \"%s\" should be %s", name, #type); \ + goto exit; \ + } + +#define JSON_KEY_CLIENTID "clientID" +#define JSON_KEY_USER "username" +#define JSON_KEY_PASS "password" +#define JSON_KEY_TOPICS "topics" + +static int parse_passwd_response(const char *json_str, struct auth_data *auth) { + int rc = 1; + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http respons of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CLIENTID)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_CLIENTID) + + auth->client_id = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_USER)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_USER) + + auth->username = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_PASS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_PASS) + + auth->passwd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TOPICS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS) + + if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) { + error("Failed to generate topic cache!"); + goto exit; + } + json_object_iter_next(&it); + continue; + } + error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (!auth->client_id) { + error(JSON_KEY_CLIENTID " is compulsory key in /password response"); + goto exit; + } + if (!auth->passwd) { + error(JSON_KEY_PASS " is compulsory in /password response"); + goto exit; + } + if (!auth->username) { + error(JSON_KEY_USER " is compulsory in /password response"); + goto exit; + } + + rc = 0; +exit: + json_object_put(json); + return rc; +} + +#define JSON_KEY_ERTRY "errorNonRetryable" +#define JSON_KEY_EDELAY "errorRetryDelaySeconds" +#define JSON_KEY_EEC "errorCode" +#define JSON_KEY_EMSGKEY "errorMsgKey" +#define JSON_KEY_EMSG "errorMessage" +#if JSON_C_MINOR_VERSION >= 13 +static const char *get_json_str_by_path(json_object *json, const char *path) { + json_object *ptr; + if (json_pointer_get(json, path, &ptr)) { + error("Missing compulsory key \"%s\" in error response", path); + return NULL; + } + if (json_object_get_type(ptr) != json_type_string) { + error("Value of Key \"%s\" in error response should be string", path); + return NULL; + } + return json_object_get_string(ptr); +} + +static int aclk_parse_otp_error(const char *json_str) { + int rc = 1; + json_object *json, *ptr; + const char *ec; + const char *ek; + const char *emsg; + int block_retry = -1, backoff = -1; + + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http response of /env endpoint"); + return 1; + } + + if ((ec = get_json_str_by_path(json, "/" JSON_KEY_EEC)) == NULL) + goto exit; + + if ((ek = get_json_str_by_path(json, "/" JSON_KEY_EMSGKEY)) == NULL) + goto exit; + + if ((emsg = get_json_str_by_path(json, "/" JSON_KEY_EMSG)) == NULL) + goto exit; + + // optional field + if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) { + if (json_object_get_type(ptr) != json_type_boolean) { + error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type"); + goto exit; + } + block_retry = json_object_get_boolean(ptr); + } + + // optional field + if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) { + if (json_object_get_type(ptr) != json_type_int) { + error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type"); + goto exit; + } + backoff = json_object_get_int(ptr); + } + + if (block_retry > 0) + aclk_disable_runtime = 1; + + if (backoff > 0) + aclk_block_until = now_monotonic_sec() + backoff; + + error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + rc = 0; +exit: + json_object_put(json); + return rc; +} +#else +static int aclk_parse_otp_error(const char *json_str) { + int rc = 1; + int block_retry = -1, backoff = -1; + + const char *ec = NULL; + const char *ek = NULL; + const char *emsg = NULL; + + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http respons of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSG)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSG) + + emsg = json_object_get_string(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSGKEY)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSGKEY) + + ek = json_object_get_string(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EEC)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EEC) + + ec = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) { + error("value of key " JSON_KEY_EDELAY " should be integer"); + goto exit; + } + + backoff = json_object_get_int(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) { + error("value of key " JSON_KEY_ERTRY " should be integer"); + goto exit; + } + + block_retry = json_object_get_boolean(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); } - // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge - // TODO - target host? + + if (block_retry > 0) + aclk_disable_runtime = 1; + + if (backoff > 0) + aclk_block_until = now_monotonic_sec() + backoff; + + error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + rc = 0; +exit: + json_object_put(json); + return rc; +} +#endif + +#define OTP_URL_PREFIX "/api/v1/auth/node/" +int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) { + // TODO this fnc will be rewritten and simplified in following PRs + // still carries lot of baggage from ACLK Legacy + int rc = 1; + BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + char *agent_id = is_agent_claimed(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); - goto CLEANUP; + goto cleanup; } - char url[1024]; - sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); - info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); - if (https_request(HTTP_REQ_GET, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) - { - error("Challenge failed: %s", data_buffer); - goto CLEANUP; + + // GET Challenge + req.host = target->host; + req.port = target->port; + buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id); + req.url = url->buffer; + + if (aclk_https_request(&req, &resp)) { + error ("ACLK_OTP Challenge failed"); + goto cleanup; } + if (resp.http_code != 200) { + error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); + goto cleanup_resp; + } + info ("ACLK_OTP Got Challenge from Cloud"); + struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; - debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); - if (json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) + if (json_parse(resp.payload, &challenge, json_extract_singleton) != JSON_OK) { freez(challenge.result); - error("Could not parse the json response with the challenge: %s", data_buffer); - goto CLEANUP; + error("Could not parse the the challenge"); + goto cleanup_resp; } if (challenge.result == NULL) { - error("Could not retrieve challenge from auth response: %s", data_buffer); - goto CLEANUP; + error("Could not retrieve challenge JSON key from challenge response"); + goto cleanup_resp; } - + // Decrypt the Challenge and Calculate Response size_t challenge_len = strlen(challenge.result); unsigned char decoded[512]; size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); + freez(challenge.result); unsigned char plaintext[4096]={}; int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext); - freez(challenge.result); char encoded[512]; size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); encoded[encoded_len] = 0; @@ -223,39 +494,394 @@ void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_us char response_json[4096]={}; sprintf(response_json, "{\"response\":\"%s\"}", encoded); debug(D_ACLK, "Password phase: %s",response_json); - // TODO - host - sprintf(url, "/api/v1/auth/node/%s/password", agent_id); - if (https_request(HTTP_REQ_POST, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) - { - error("Challenge-response failed: %s", data_buffer); - goto CLEANUP; + + https_req_response_free(&resp); + https_req_response_init(&resp); + + // POST password + req.request_type = HTTP_REQ_POST; + buffer_flush(url); + buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id); + req.url = url->buffer; + req.payload = response_json; + req.payload_size = strlen(response_json); + + if (aclk_https_request(&req, &resp)) { + error ("ACLK_OTP Password error trying to post result to password"); + goto cleanup; + } + if (resp.http_code != 201) { + error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); + goto cleanup_resp; + } + info ("ACLK_OTP Got Password from Cloud"); + + struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL }; + + if (parse_passwd_response(resp.payload, &data)){ + error("Error parsing response of password endpoint"); + goto cleanup_resp; + } + + *mqtt_pass = data.passwd; + *mqtt_usr = data.username; + *mqtt_id = data.client_id; + + rc = 0; +cleanup_resp: + https_req_response_free(&resp); +cleanup: + freez(agent_id); + buffer_free(url); + return rc; +} + +#define JSON_KEY_ENC "encoding" +#define JSON_KEY_AUTH_ENDPOINT "authEndpoint" +#define JSON_KEY_TRP "transports" +#define JSON_KEY_TRP_TYPE "type" +#define JSON_KEY_TRP_ENDPOINT "endpoint" +#define JSON_KEY_BACKOFF "backoff" +#define JSON_KEY_BACKOFF_BASE "base" +#define JSON_KEY_BACKOFF_MAX "maxSeconds" +#define JSON_KEY_BACKOFF_MIN "minSeconds" +#define JSON_KEY_CAPS "capabilities" + +static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *trp) { + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE) + if (trp->type != ACLK_TRP_UNKNOWN) { + error(JSON_KEY_TRP_TYPE " set already"); + goto exit; + } + trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); + if (trp->type == ACLK_TRP_UNKNOWN) { + error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + goto exit; + } + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT) + if (trp->endpoint) { + error(JSON_KEY_TRP_ENDPOINT " set already"); + goto exit; + } + trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); } - debug(D_ACLK, "Password response from cloud: %s", data_buffer); + if (!trp->endpoint) { + error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary"); + goto exit; + } + + if (trp->type == ACLK_TRP_UNKNOWN) { + error ("transport type not set"); + goto exit; + } + + return 0; + +exit: + aclk_transport_desc_t_destroy(trp); + return 1; +} + +static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) { + aclk_transport_desc_t *trp; + json_object *obj; + + if (env->transports) { + error("transports have been set already"); + return 1; + } + + env->transport_count = json_object_array_length(json_array); + + env->transports = callocz(env->transport_count , sizeof(aclk_transport_desc_t *)); + + for (size_t i = 0; i < env->transport_count; i++) { + trp = callocz(1, sizeof(aclk_transport_desc_t)); + obj = json_object_array_get_idx(json_array, i); + if (parse_json_env_transport(obj, trp)) { + error("error parsing transport idx %d", (int)i); + freez(trp); + return 1; + } + env->transports[i] = trp; + } + + return 0; +} + +#define MATCHED_CORRECT 1 +#define MATCHED_ERROR -1 +#define NOT_MATCHED 0 +static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) { + if (!strcmp(json_object_iter_peek_name(it), name)) { + if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) { + error("Could not parse \"%s\". Not an integer as expected.", name); + return MATCHED_ERROR; + } + + *out = json_object_get_int(json_object_iter_peek_value(it)); + + if (*out < min || *out > max) { + error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max); + return MATCHED_ERROR; + } + + return MATCHED_CORRECT; + } + return NOT_MATCHED; +} + +static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) { + struct json_object_iterator it; + struct json_object_iterator itEnd; + int ret; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if ( (ret = parse_json_backoff_int(&it, &backoff->base, JSON_KEY_BACKOFF_BASE, 1, 10)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + if ( (ret = parse_json_backoff_int(&it, &backoff->max_s, JSON_KEY_BACKOFF_MAX, 500, INT_MAX)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + if ( (ret = parse_json_backoff_int(&it, &backoff->min_s, JSON_KEY_BACKOFF_MIN, 0, INT_MAX)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + return 0; +} + +static int parse_json_env_caps(json_object *json, aclk_env_t *env) { + json_object *obj; + const char *str; + + if (env->capabilities) { + error("transports have been set already"); + return 1; + } + + env->capability_count = json_object_array_length(json); + + // empty capabilities list is allowed + if (!env->capability_count) + return 0; + + env->capabilities = callocz(env->capability_count , sizeof(char *)); + + for (size_t i = 0; i < env->capability_count; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_string) { + error("Capability at index %d not a string!", (int)i); + return 1; + } + str = json_object_get_string(obj); + if (!str) { + error("Error parsing capabilities"); + return 1; + } + env->capabilities[i] = strdupz(str); + } + + return 0; +} - struct dictionary_singleton password = { .key = "password", .result = NULL }; - if (json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) +static int parse_json_env(const char *json_str, aclk_env_t *env) { + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http respons of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT) + if (env->auth_endpoint) { + error("authEndpoint set already"); + goto exit; + } + env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC) + if (env->encoding != ACLK_ENC_UNKNOWN) { + error(JSON_KEY_ENC " set already"); + goto exit; + } + env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TRP) + + json_object *now = json_object_iter_peek_value(&it); + parse_json_env_transports(now, env); + + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_BACKOFF)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_object, JSON_KEY_BACKOFF) + + if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) { + env->backoff.base = 0; + error("Error parsing Backoff parameters in env"); + goto exit; + } + + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CAPS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS) + + if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) { + error("Error parsing capabilities list"); + goto exit; + } + + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + // Check all compulsory keys have been set + if (env->transport_count < 1) { + error("env has to return at least one transport"); + goto exit; + } + if (!env->auth_endpoint) { + error(JSON_KEY_AUTH_ENDPOINT " is compulsory"); + goto exit; + } + if (env->encoding == ACLK_ENC_UNKNOWN) { + error(JSON_KEY_ENC " is compulsory"); + goto exit; + } + if (!env->backoff.base) { + error(JSON_KEY_BACKOFF " is compulsory"); + goto exit; + } + + json_object_put(json); + return 0; + +exit: + aclk_env_t_destroy(env); + json_object_put(json); + return 1; +} + +int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { + BUFFER *buf = buffer_create(1024); + + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + + req.request_type = HTTP_REQ_GET; + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) { - freez(password.result); - error("Could not parse the json response with the password: %s", data_buffer); - goto CLEANUP; - } - - if (password.result == NULL ) { - error("Could not retrieve password from auth response"); - goto CLEANUP; - } - if (*mqtt_pass != NULL ) - freez(*mqtt_pass); - *mqtt_pass = password.result; - if (*mqtt_usr != NULL) - freez(*mqtt_usr); - *mqtt_usr = agent_id; - agent_id = NULL; - -CLEANUP: - if (agent_id != NULL) - freez(agent_id); - freez(data_buffer); - return; + error("Agent was not claimed - cannot perform challenge/response"); + buffer_free(buf); + return 1; + } + + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json$claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + freez(agent_id); + + req.host = (char*)aclk_hostname; + req.port = aclk_port; + req.url = buf->buffer; + if (aclk_https_request(&req, &resp)) { + error("Error trying to contact env endpoint"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + if (resp.http_code != 200) { + error("The HTTP code not 200 OK (Got %d)", resp.http_code); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + if (!resp.payload || !resp.payload_size) { + error("Unexpected empty payload as response to /env call"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + if (parse_json_env(resp.payload, env)) { + error ("error parsing /env message"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + info("Getting Cloud /env successful"); + + https_req_response_free(&resp); + buffer_free(buf); + return 0; } diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h index 31e81c5a..d2044f6f 100644 --- a/aclk/aclk_otp.h +++ b/aclk/aclk_otp.h @@ -5,6 +5,9 @@ #include "../daemon/common.h" -void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass); +#include "https_client.h" + +int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); +int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port); #endif /* ACLK_OTP_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 71c63f64..3e2f88e4 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk_query.h" #include "aclk_stats.h" #include "aclk_query_queue.h" @@ -233,23 +235,6 @@ void *aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.version_neg)) { - if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - ACLK_SHARED_STATE_UNLOCK; - info("Waiting for ACLK Version Negotiation message from Cloud"); - sleep(1); - continue; - } - errno = 0; - error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." - " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - aclk_shared_state.version_neg = ACLK_VERSION_MIN; -// When ACLK v3 is implemented you will need this -// aclk_set_rx_handlers(aclk_shared_state.version_neg); - } - ACLK_SHARED_STATE_UNLOCK; - aclk_query_process_msgs(info); QUERY_THREAD_LOCK; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index fcb8d996..3d3ab5e2 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -166,81 +166,6 @@ error: return 1; } -// This handles `version` message from cloud used to negotiate -// protocol version we will use -static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) -{ - UNUSED(raw_payload); - int version = -1; - errno = 0; - - if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { - error( - "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", - ACLK_VERSION_NEG_VERSION, - cloud_to_agent->version); - return 1; - } - if (unlikely(!cloud_to_agent->min_version)) { - error("Min version missing or 0"); - return 1; - } - if (unlikely(!cloud_to_agent->max_version)) { - error("Max version missing or 0"); - return 1; - } - if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { - error( - "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, - cloud_to_agent->min_version); - return 1; - } - - if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { - error( - "Agent too old for this cloud. Minimum version required by cloud %d." - " Maximum version supported by this agent %d.", - cloud_to_agent->min_version, ACLK_VERSION_MAX); - aclk_kill_link = 1; - aclk_disable_runtime = 1; - return 1; - } - if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { - error( - "Cloud version is too old for this agent. Maximum version supported by cloud %d." - " Minimum (oldest) version supported by this agent %d.", - cloud_to_agent->max_version, ACLK_VERSION_MIN); - aclk_kill_link = 1; - return 1; - } - - version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - - ACLK_SHARED_STATE_LOCK; - if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { - errno = 0; - error("The \"version\" message came too late ignoring."); - goto err_cleanup; - } - if (unlikely(aclk_shared_state.version_neg)) { - errno = 0; - error("Version has already been set to %d", aclk_shared_state.version_neg); - goto err_cleanup; - } - aclk_shared_state.version_neg = version; - ACLK_SHARED_STATE_UNLOCK; - - info("Choosing version %d of ACLK", version); - - aclk_set_rx_handlers(version); - - return 0; - -err_cleanup: - ACLK_SHARED_STATE_UNLOCK; - return 1; -} - typedef struct aclk_incoming_msg_type{ char *name; int(*fnc)(struct aclk_request *, char *); @@ -248,20 +173,11 @@ typedef struct aclk_incoming_msg_type{ aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, - { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression; -void aclk_set_rx_handlers(int version) -{ -// ACLK_NG ACLK version support starts at 2 -// TODO ACLK v3 - UNUSED(version); - aclk_incoming_msg_types = aclk_incoming_msg_types_compression; -} - int aclk_handle_cloud_message(char *payload) { struct aclk_request cloud_to_agent; @@ -295,10 +211,6 @@ int aclk_handle_cloud_message(char *payload) goto err_cleanup; } - if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { - error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); - goto err_cleanup; - } for (int i = 0; aclk_incoming_msg_types[i].name; i++) { if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index c9f0bd37..e24252be 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -9,6 +9,5 @@ #include "libnetdata/libnetdata.h" int aclk_handle_cloud_message(char *payload); -void aclk_set_rx_handlers(int version); #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index b61ac05f..a599cfda 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk_stats.h" netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 158fc4e2..144008e4 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -13,8 +13,14 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, { uint16_t packet_id; const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + const char *topic = aclk_get_topic(subtopic); - mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting mesage send"); + return; + } + + mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -30,8 +36,14 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje { uint16_t packet_id; const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + const char *topic = aclk_get_topic(subtopic); - mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting mesage send"); + return 0; + } + + mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -199,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -241,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) // session. if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -265,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) buffer_free(local_buffer); } -void aclk_hello_msg(mqtt_wss_client client) -{ - json_object *tmp, *msg; - - char *msg_id = create_uuid(); - - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; - - //Hello message is versioned separatelly from the rest of the protocol - msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); - - tmp = json_object_new_int(ACLK_VERSION_MIN); - json_object_object_add(msg, "min-version", tmp); - - tmp = json_object_new_int(ACLK_VERSION_MAX); - json_object_object_add(msg, "max-version", tmp); - -#ifdef ACLK_NG - tmp = json_object_new_string("Next Generation"); -#else - tmp = json_object_new_string("Legacy"); -#endif - json_object_object_add(msg, "aclk-implementation", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); -} - void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -340,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) return; } - msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); json_object_object_add(msg, "payload", payload); aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); @@ -352,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) { // we create header here on purpose (and not send message with it already as `msg` param) - // one is version_neg is guaranteed to be done here - // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy + // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy // send message with timestamps already to Query Queue they would be incorrect at time // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg); + json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); json_object_object_add(obj, "payload", msg); aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index cb4d44c9..50c98169 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -10,8 +10,6 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); -void aclk_hello_msg(mqtt_wss_client client); - void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index a5347c46..b8ac6675 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk_util.h" #include <stdio.h> @@ -10,6 +12,48 @@ #define UUID_STR_LEN 37 #endif +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { + if (!strcmp(str, "json")) { + return ACLK_ENC_JSON; + } + if (!strcmp(str, "proto")) { + return ACLK_ENC_PROTO; + } + return ACLK_ENC_UNKNOWN; +} + +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) { + if (!strcmp(str, "MQTTv3")) { + return ACLK_TRP_MQTT_3_1_1; + } + if (!strcmp(str, "MQTTv5")) { + return ACLK_TRP_MQTT_5; + } + return ACLK_TRP_UNKNOWN; +} + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) { + freez(trp_desc->endpoint); +} + +void aclk_env_t_destroy(aclk_env_t *env) { + freez(env->auth_endpoint); + if (env->transports) { + for (size_t i = 0; i < env->transport_count; i++) { + if(env->transports[i]) { + aclk_transport_desc_t_destroy(env->transports[i]); + env->transports[i] = NULL; + } + } + freez(env->transports); + } + if (env->capabilities) { + for (size_t i = 0; i < env->capability_count; i++) + freez(env->capabilities[i]); + freez(env->capabilities); + } +} + #ifdef ACLK_LOG_CONVERSATION_DIR volatile int aclk_conversation_log_counter = 0; #if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) @@ -28,137 +72,246 @@ int aclk_get_conv_log_next() #define ACLK_TOPIC_PREFIX "/agent/" struct aclk_topic { - const char *topic_suffix; + enum aclk_topics topic_id; + // as received from cloud - we keep this for + // eventual topic list update when claim_id changes + char *topic_recvd; + // constructed topic char *topic; }; // This helps to cache finalized topics (assembled with claim_id) // to not have to alloc or create buffer and construct topic every // time message is sent as in old ACLK -static struct aclk_topic aclk_topic_cache[] = { - { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART - { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS - { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA - { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND - { .topic_suffix = NULL, .topic = NULL } -}; +static struct aclk_topic **aclk_topic_cache = NULL; +static size_t aclk_topic_cache_items = 0; void free_topic_cache(void) { - struct aclk_topic *tc = aclk_topic_cache; - while (tc->topic_suffix) { - if (tc->topic) { - freez(tc->topic); - tc->topic = NULL; + if (aclk_topic_cache) { + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + freez(aclk_topic_cache[i]->topic); + freez(aclk_topic_cache[i]->topic_recvd); + freez(aclk_topic_cache[i]); } - tc++; + freez(aclk_topic_cache); + aclk_topic_cache = NULL; + aclk_topic_cache_items = 0; } } -static inline void generate_topic_cache(void) -{ - struct aclk_topic *tc = aclk_topic_cache; - char *ptr; - if (unlikely(!tc->topic)) { - rrdhost_aclk_state_lock(localhost); - while(tc->topic_suffix) { - tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix)); - ptr = tc->topic; - strcpy(ptr, ACLK_TOPIC_PREFIX); - ptr += strlen(ACLK_TOPIC_PREFIX); - strcpy(ptr, localhost->aclk_state.claimed_id); - ptr += (UUID_STR_LEN - 1); - *ptr++ = '/'; - strcpy(ptr, tc->topic_suffix); - tc++; +#define JSON_TOPIC_KEY_TOPIC "topic" +#define JSON_TOPIC_KEY_NAME "name" + +struct topic_name { + enum aclk_topics id; + // cloud name - how is it called + // in answer to /password endpoint + const char *name; +} topic_names[] = { + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } +}; + +enum aclk_topics compulsory_topics[] = { + ACLK_TOPICID_CHART, + ACLK_TOPICID_ALARMS, + ACLK_TOPICID_METADATA, + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_UNKNOWN +}; + +static enum aclk_topics topic_name_to_id(const char *name) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (!strcmp(topic->name, name)) { + return topic->id; } + topic++; + } + return ACLK_TOPICID_UNKNOWN; +} + +static const char *topic_id_to_name(enum aclk_topics tid) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (topic->id == tid) + return topic->name; + topic++; + } + return "unknown"; +} + +#define CLAIM_ID_REPLACE_TAG "#{claim_id}" +static void topic_generate_final(struct aclk_topic *t) { + char *dest; + char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG); + if (!replace_tag) + return; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("This should never be called if agent not claimed"); rrdhost_aclk_state_unlock(localhost); + return; } + + t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id)); + memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd); + dest = t->topic + (replace_tag - t->topic_recvd); + + memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id)); + dest += strlen(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + replace_tag += strlen(CLAIM_ID_REPLACE_TAG); + strcpy(dest, replace_tag); + dest += strlen(replace_tag); + *dest = 0; } -/* - * Build a topic based on sub_topic and final_topic - * if the sub topic starts with / assume that is an absolute topic - * - */ -const char *aclk_get_topic(enum aclk_topics topic) +static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic) { - generate_topic_cache(); + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); + return 1; + } + topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); + if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { + info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + } + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); + return 1; + } + topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } - return aclk_topic_cache[topic].topic; + if (!topic->topic_recvd) { + error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); + return 1; + } + + topic_generate_final(topic); + aclk_topic_cache_items++; + + return 0; } -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) +int aclk_generate_topic_cache(struct json_object *json) { - int pos = 0; - if (!strncmp("https://", url, 8)) { - pos = 8; - } else if (!strncmp("http://", url, 7)) { - error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); + json_object *obj; + + size_t array_size = json_object_array_length(json); + if (!array_size) { + error("Empty topic list!"); return 1; } - int host_end = pos; - while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') - host_end++; - if (url[host_end] == 0) { - *aclk_hostname = strdupz(url + pos); - *aclk_port = 443; - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); - return 0; - } - if (url[host_end] == ':') { - *aclk_hostname = callocz(host_end - pos + 1, 1); - strncpy(*aclk_hostname, url + pos, host_end - pos); - int port_end = host_end + 1; - while (url[port_end] >= '0' && url[port_end] <= '9') - port_end++; - if (port_end - host_end > 6) { - error("Port specified in %s is invalid", url); - freez(*aclk_hostname); - *aclk_hostname = NULL; + + if (aclk_topic_cache) + free_topic_cache(); + + aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *)); + + for (size_t i = 0; i < array_size; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_object) { + error("expected json_type_object"); + return 1; + } + aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic)); + if (topic_cache_add_topic(obj, aclk_topic_cache[i])) { + error("failed to parse topic @idx=%d", (int)i); return 1; } - *aclk_port = atoi(&url[host_end+1]); } - if (url[host_end] == '/') { - *aclk_port = 443; - *aclk_hostname = callocz(1, host_end - pos + 1); - strncpy(*aclk_hostname, url+pos, host_end - pos); + + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { + if (!aclk_get_topic(compulsory_topics[i])) { + error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); + return 1; + } } - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; } /* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ +const char *aclk_get_topic(enum aclk_topics topic) +{ + if (!aclk_topic_cache) { + error("Topic cache not initialized"); + return NULL; + } + + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + if (aclk_topic_cache[i]->topic_id == topic) + return aclk_topic_cache[i]->topic; + } + error("Unknown topic"); + return NULL; +} + +/* * TBEB with randomness * - * @param mode 0 - to reset the delay, - * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * @param reset 1 - to reset the delay, + * 0 - to advance a step and calculate sleep time in ms + * @param min, max in seconds * @returns delay in ms * */ -#define ACLK_MAX_BACKOFF_DELAY 1024 -unsigned long int aclk_reconnect_delay(int mode) -{ - static int fail = -1; - unsigned long int delay; - if (!mode || fail == -1) { - srandom(time(NULL)); - fail = mode - 1; +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) { + static int attempt = -1; + + if (reset) { + attempt = -1; return 0; } - delay = (1 << fail); + attempt++; - if (delay >= ACLK_MAX_BACKOFF_DELAY) { - delay = ACLK_MAX_BACKOFF_DELAY * 1000; - } else { - fail++; - delay *= 1000; - delay += (random() % (MAX(1000, delay/2))); + if (attempt == 0) { + srandom(time(NULL)); + return 0; } + unsigned long int delay = pow(base, attempt - 1); + delay *= MSEC_PER_SEC; + + delay += (random() % (MAX(1000, delay/2))); + + if (delay <= min * MSEC_PER_SEC) + return min; + + if (delay >= max * MSEC_PER_SEC) + return max; + return delay; } @@ -345,3 +498,43 @@ const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) *type = proxy_type; return proxy; } + +#define HTTP_PROXY_PREFIX "http://" +void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type) +{ + ACLK_PROXY_TYPE pt; + const char *ptr = aclk_get_proxy(&pt); + char *tmp; + char *host; + if (pt != PROXY_TYPE_HTTP) + return; + + *port = 0; + + if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) + ptr += strlen(HTTP_PROXY_PREFIX); + + if ((tmp = strchr(ptr, '@'))) + ptr = tmp; + + if ((tmp = strchr(ptr, '/'))) { + host = mallocz((tmp - ptr) + 1); + memcpy(host, ptr, (tmp - ptr)); + host[tmp - ptr] = 0; + } else + host = strdupz(ptr); + + if ((tmp = strchr(host, ':'))) { + *tmp = 0; + tmp++; + *port = atoi(tmp); + } + + if (*port <= 0 || *port > 65535) + *port = 8080; + + *ohost = host; + + if (type) + *type = MQTT_WSS_PROXY_HTTP; +} diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index c7232979..03b22e40 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -3,20 +3,63 @@ #define ACLK_UTIL_H #include "libnetdata/libnetdata.h" +#include "mqtt_wss_client.h" // Helper stuff which should not have any further inside ACLK dependency // and are supposed not to be needed outside of ACLK -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); +typedef enum { + ACLK_ENC_UNKNOWN = 0, + ACLK_ENC_JSON, + ACLK_ENC_PROTO +} aclk_encoding_type_t; + +typedef enum { + ACLK_TRP_UNKNOWN = 0, + ACLK_TRP_MQTT_3_1_1, + ACLK_TRP_MQTT_5 +} aclk_transport_type_t; + +typedef struct { + char *endpoint; + aclk_transport_type_t type; +} aclk_transport_desc_t; + +typedef struct { + int base; + int max_s; + int min_s; +} aclk_backoff_t; + +typedef struct { + char *auth_endpoint; + aclk_encoding_type_t encoding; + + aclk_transport_desc_t **transports; + size_t transport_count; + + char **capabilities; + size_t capability_count; + + aclk_backoff_t backoff; +} aclk_env_t; + +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str); +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str); + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc); +void aclk_env_t_destroy(aclk_env_t *env); enum aclk_topics { - ACLK_TOPICID_CHART = 0, - ACLK_TOPICID_ALARMS = 1, - ACLK_TOPICID_METADATA = 2, - ACLK_TOPICID_COMMAND = 3, + ACLK_TOPICID_UNKNOWN = 0, + ACLK_TOPICID_CHART = 1, + ACLK_TOPICID_ALARMS = 2, + ACLK_TOPICID_METADATA = 3, + ACLK_TOPICID_COMMAND = 4 }; const char *aclk_get_topic(enum aclk_topics topic); +int aclk_generate_topic_cache(struct json_object *json); void free_topic_cache(void); // TODO // aclk_topics_reload //when claim id changes @@ -32,7 +75,8 @@ int aclk_get_conv_log_next(); #endif #endif -unsigned long int aclk_reconnect_delay(int mode); +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max); +#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0) typedef enum aclk_proxy_type { PROXY_TYPE_UNKNOWN = 0, @@ -46,7 +90,8 @@ const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); void safe_log_proxy_censor(char *proxy); -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); +void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type); + #endif /* ACLK_UTIL_H */ diff --git a/aclk/https_client.c b/aclk/https_client.c index 1b9546d7..907f512b 100644 --- a/aclk/https_client.c +++ b/aclk/https_client.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "libnetdata/libnetdata.h" #include "https_client.h" @@ -10,6 +12,19 @@ enum http_parse_state { HTTP_PARSE_CONTENT }; +static const char *http_req_type_to_str(http_req_type_t req) { + switch (req) { + case HTTP_REQ_GET: + return "GET"; + case HTTP_REQ_POST: + return "POST"; + case HTTP_REQ_CONNECT: + return "CONNECT"; + default: + return "unknown"; + } +} + typedef struct { enum http_parse_state state; int content_length; @@ -17,6 +32,13 @@ typedef struct { } http_parse_ctx; #define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 } +static inline void http_parse_ctx_clear(http_parse_ctx *ctx) { + ctx->state = HTTP_PARSE_INITIAL; + ctx->content_length = -1; + ctx->http_code = 0; +} + +#define POLL_TO_MS 100 #define NEED_MORE_DATA 0 #define PARSE_SUCCESS 1 @@ -71,8 +93,6 @@ static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) rbuf_pop(buf, buf_val, idx_end); buf_val[idx_end] = 0; - rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR)); - for (ptr = buf_key; *ptr; ptr++) *ptr = tolower(*ptr); @@ -129,10 +149,10 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); break; case HTTP_PARSE_CONTENT: - if (parse_ctx->content_length < 0) { - error("content-length missing and http headers ended"); - return PARSE_ERROR; - } + // replies like CONNECT etc. do not have content + if (parse_ctx->content_length < 0) + return PARSE_SUCCESS; + if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length) return PARSE_SUCCESS; return NEED_MORE_DATA; @@ -140,107 +160,493 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) } while(1); } -int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload) +typedef struct https_req_ctx { + https_req_t *request; + + int sock; + rbuf_t buf_rx; + + struct pollfd poll_fd; + + SSL_CTX *ssl_ctx; + SSL *ssl; + + size_t written; + + int self_signed_allowed; + + http_parse_ctx parse_ctx; + + time_t req_start_time; +} https_req_ctx_t; + +static int https_req_check_timedout(https_req_ctx_t *ctx) { + if (now_realtime_sec() > ctx->req_start_time + ctx->request->timeout_s) { + error("request timed out"); + return 1; + } + return 0; +} + +static char *_ssl_err_tos(int err) { - struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; - char sport[PORT_STR_MAX_BYTES]; - size_t len = 0; - int rc = 1; + switch(err){ + case SSL_ERROR_SSL: + return "SSL_ERROR_SSL"; + case SSL_ERROR_WANT_READ: + return "SSL_ERROR_WANT_READ"; + case SSL_ERROR_WANT_WRITE: + return "SSL_ERROR_WANT_WRITE"; + case SSL_ERROR_NONE: + return "SSL_ERROR_NONE"; + case SSL_ERROR_ZERO_RETURN: + return "SSL_ERROR_ZERO_RETURN"; + case SSL_ERROR_WANT_CONNECT: + return "SSL_ERROR_WANT_CONNECT"; + case SSL_ERROR_WANT_ACCEPT: + return "SSL_ERROR_WANT_ACCEPT"; + } + return "Unknown!!!"; +} + +static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + ctx->written = 0; + ctx->poll_fd.events = POLLOUT; + + do { + int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + continue; + } + + ret = write(ctx->sock, &data[ctx->written], data_len - ctx->written); + if (ret > 0) { + ctx->written += ret; + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { + error("Error writing to socket"); + return 3; + } + } while (ctx->written < data_len); + + return 0; +} + +static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + ctx->written = 0; + ctx->poll_fd.events |= POLLOUT; + + do { + int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + continue; + } + ctx->poll_fd.events = 0; + + ret = SSL_write(ctx->ssl, &data[ctx->written], data_len - ctx->written); + if (ret > 0) { + ctx->written += ret; + } else { + ret = SSL_get_error(ctx->ssl, ret); + switch (ret) { + case SSL_ERROR_WANT_READ: + ctx->poll_fd.events |= POLLIN; + break; + case SSL_ERROR_WANT_WRITE: + ctx->poll_fd.events |= POLLOUT; + break; + default: + error("SSL_write Err: %s", _ssl_err_tos(ret)); + return 3; + } + } + } while (ctx->written < data_len); + + return 0; +} + +static inline int https_client_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + if (ctx->ssl_ctx) + return ssl_write_all(ctx, data, data_len); + return socket_write_all(ctx, data, data_len); +} + +static int read_parse_response(https_req_ctx_t *ctx) { int ret; char *ptr; - http_parse_ctx parse_ctx = HTTP_PARSE_CTX_INITIALIZER; + size_t size; - rbuf_t buffer = rbuf_create(b_size); - if (!buffer) + ctx->poll_fd.events = POLLIN; + do { + ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + continue; + } + ctx->poll_fd.events = 0; + + ptr = rbuf_get_linear_insert_range(ctx->buf_rx, &size); + + if (ctx->ssl_ctx) + ret = SSL_read(ctx->ssl, ptr, size); + else + ret = read(ctx->sock, ptr, size); + + if (ret > 0) { + rbuf_bump_head(ctx->buf_rx, ret); + } else { + if (ctx->ssl_ctx) { + ret = SSL_get_error(ctx->ssl, ret); + switch (ret) { + case SSL_ERROR_WANT_READ: + ctx->poll_fd.events |= POLLIN; + break; + case SSL_ERROR_WANT_WRITE: + ctx->poll_fd.events |= POLLOUT; + break; + default: + error("SSL_read Err: %s", _ssl_err_tos(ret)); + return 3; + } + } else { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + error("write error"); + return 3; + } + ctx->poll_fd.events |= POLLIN; + } + } + } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx))); + + if (ret != PARSE_SUCCESS) { + error("Error parsing HTTP response"); return 1; + } - snprintf(sport, PORT_STR_MAX_BYTES, "%d", port); + return 0; +} - if (payload != NULL) - len = strlen(payload); +#define TX_BUFFER_SIZE 8192 +#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2) +static int handle_http_request(https_req_ctx_t *ctx) { + BUFFER *hdr = buffer_create(TX_BUFFER_SIZE); + int rc = 0; + + http_parse_ctx_clear(&ctx->parse_ctx); + + // Prepare data to send + switch (ctx->request->request_type) { + case HTTP_REQ_CONNECT: + buffer_strcat(hdr, "CONNECT "); + break; + case HTTP_REQ_GET: + buffer_strcat(hdr, "GET "); + break; + case HTTP_REQ_POST: + buffer_strcat(hdr, "POST "); + break; + default: + error("Unknown HTTPS request type!"); + rc = 1; + goto err_exit; + } + + if (ctx->request->request_type == HTTP_REQ_CONNECT) { + buffer_strcat(hdr, ctx->request->host); + buffer_sprintf(hdr, ":%d", ctx->request->port); + } else { + buffer_strcat(hdr, ctx->request->url); + } + + buffer_strcat(hdr, " HTTP/1.1\x0D\x0A"); - snprintf( - b, - b_size, - "%s %s HTTP/1.1\r\nHost: %s\r\nAccept: application/json\r\nContent-length: %zu\r\nAccept-Language: en-us\r\n" - "User-Agent: Netdata/rocks\r\n\r\n", - (method == HTTP_REQ_GET ? "GET" : "POST"), url, host, len); + //TODO Headers! + if (ctx->request->request_type != HTTP_REQ_CONNECT) { + buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host); + } + buffer_strcat(hdr, "User-Agent: Netdata/rocks newhttpclient\x0D\x0A"); - if (payload != NULL) - strncat(b, payload, b_size - len); + if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) { + buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size); + } - len = strlen(b); + buffer_strcat(hdr, "\x0D\x0A"); - debug(D_ACLK, "Sending HTTPS req (%zu bytes): '%s'", len, b); - int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, sport, &timeout); + // Send the request + if (https_client_write_all(ctx, hdr->buffer, hdr->len)) { + error("Couldn't write HTTP request header into SSL connection"); + rc = 2; + goto err_exit; + } - if (unlikely(sock == -1)) { - error("Handshake failed"); - goto exit_buf; + if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) { + if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) { + error("Couldn't write payload into SSL connection"); + rc = 3; + goto err_exit; + } } - SSL_CTX *ctx = security_initialize_openssl_client(); - if (ctx==NULL) { + // Read The Response + if (read_parse_response(ctx)) { + error("Error reading or parsing response from server"); + rc = 4; + goto err_exit; + } + +err_exit: + buffer_free(hdr); + return rc; +} + +int https_request(https_req_t *request, https_req_response_t *response) { + int rc = 1, ret; + char connect_port_str[PORT_STR_MAX_BYTES]; + + const char *connect_host = request->proxy_host ? request->proxy_host : request->host; + int connect_port = request->proxy_host ? request->proxy_port : request->port; + struct timeval timeout = { .tv_sec = request->timeout_s, .tv_usec = 0 }; + + https_req_ctx_t *ctx = callocz(1, sizeof(https_req_ctx_t)); + ctx->req_start_time = now_realtime_sec(); + + ctx->buf_rx = rbuf_create(RX_BUFFER_SIZE); + if (!ctx->buf_rx) { + error("Couldn't allocate buffer for RX data"); + goto exit_req_ctx; + } + + snprintf(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port); + + ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout); + if (ctx->sock < 0) { + error("Error connecting TCP socket to \"%s\"", connect_host); + goto exit_buf_rx; + } + + if (fcntl(ctx->sock, F_SETFL, fcntl(ctx->sock, F_GETFL, 0) | O_NONBLOCK) == -1) { + error("Error setting O_NONBLOCK to TCP socket."); + goto exit_sock; + } + + ctx->poll_fd.fd = ctx->sock; + + // Do the CONNECT if proxy is used + if (request->proxy_host) { + https_req_t req = HTTPS_REQ_T_INITIALIZER; + req.request_type = HTTP_REQ_CONNECT; + req.timeout_s = request->timeout_s; + req.host = request->host; + req.port = request->port; + req.url = request->url; + ctx->request = &req; + if (handle_http_request(ctx)) { + error("Failed to CONNECT with proxy"); + goto exit_sock; + } + if (ctx->parse_ctx.http_code != 200) { + error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code); + goto exit_sock; + } + info("Proxy accepted CONNECT upgrade"); + } + ctx->request = request; + + ctx->ssl_ctx = security_initialize_openssl_client(); + if (ctx->ssl_ctx==NULL) { error("Cannot allocate SSL context"); goto exit_sock; } - // Certificate chain: not updating the stores - do we need private CA roots? - // Calls to SSL_CTX_load_verify_locations would go here. - SSL *ssl = SSL_new(ctx); - if (ssl==NULL) { + + ctx->ssl = SSL_new(ctx->ssl_ctx); + if (ctx->ssl==NULL) { error("Cannot allocate SSL"); goto exit_CTX; } - SSL_set_fd(ssl, sock); - ret = SSL_connect(ssl); - if (ret != 1) { - error("SSL_connect() failed with err=%d", ret); + + SSL_set_fd(ctx->ssl, ctx->sock); + ret = SSL_connect(ctx->ssl); + if (ret != -1 && ret != 1) { + error("SSL could not connect"); goto exit_SSL; } + if (ret == -1) { + // expected as underlying socket is non blocking! + // consult SSL_connect documentation for details + int ec = SSL_get_error(ctx->ssl, ret); + if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { + error("Failed to start SSL connection"); + goto exit_SSL; + } + } - ret = SSL_write(ssl, b, len); - if (ret <= 0) - { - error("SSL_write() failed with err=%d", ret); + // The actual request here + if (handle_http_request(ctx)) { + error("Couldn't process request"); goto exit_SSL; } + response->http_code = ctx->parse_ctx.http_code; + if (ctx->parse_ctx.content_length > 0) { + response->payload_size = ctx->parse_ctx.content_length; + response->payload = mallocz(response->payload_size + 1); + ret = rbuf_pop(ctx->buf_rx, response->payload, response->payload_size); + if (ret != (int)response->payload_size) { + error("Payload size doesn't match remaining data on the buffer!"); + response->payload_size = ret; + } + // normally we take payload as it is and copy it + // but for convenience in cases where payload is sth. like + // json we add terminating zero so that user of the data + // doesn't have to convert to C string (0 terminated) + // other uses still have correct payload_size and can copy + // only exact data without affixed 0x00 + ((char*)response->payload)[response->payload_size] = 0; // mallocz(response->payload_size + 1); + } + info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code); - b[0] = 0; + rc = 0; - do { - ptr = rbuf_get_linear_insert_range(buffer, &len); - ret = SSL_read(ssl, ptr, len - 1); - if (ret) - rbuf_bump_head(buffer, ret); - if (ret <= 0) - { - error("No response available - SSL_read()=%d", ret); - goto exit_FULL; +exit_SSL: + SSL_free(ctx->ssl); +exit_CTX: + SSL_CTX_free(ctx->ssl_ctx); +exit_sock: + close(ctx->sock); +exit_buf_rx: + rbuf_free(ctx->buf_rx); +exit_req_ctx: + freez(ctx); + return rc; +} + +void https_req_response_free(https_req_response_t *res) { + freez(res->payload); +} + +void https_req_response_init(https_req_response_t *res) { + res->http_code = 0; + res->payload = NULL; + res->payload_size = 0; +} + +static inline char *min_non_null(char *a, char *b) { + if (!a) + return b; + if (!b) + return a; + return (a < b ? a : b); +} + +#define URI_PROTO_SEPARATOR "://" +#define URL_PARSER_LOG_PREFIX "url_parser " + +static int parse_host_port(url_t *url) { + char *ptr = strrchr(url->host, ':'); + if (ptr) { + size_t port_len = strlen(ptr + 1); + if (!port_len) { + error(URL_PARSER_LOG_PREFIX ": specified but no port number"); + return 1; + } + if (port_len > 5 /* MAX port lenght is 5digit long in decimal */) { + error(URL_PARSER_LOG_PREFIX "port # is too long"); + return 1; } - } while (!(ret = parse_http_response(buffer, &parse_ctx))); + *ptr = 0; + if (!strlen(url->host)) { + error(URL_PARSER_LOG_PREFIX "host empty after removing port"); + return 1; + } + url->port = atoi (ptr + 1); + } + return 0; +} - if (ret != PARSE_SUCCESS) { - error("Error parsing HTTP response"); - goto exit_FULL; +static inline void port_by_proto(url_t *url) { + if (url->port) + return; + if (!url->proto) + return; + if (!strcmp(url->proto, "http")) { + url->port = 80; + return; + } + if (!strcmp(url->proto, "https")) { + url->port = 443; + return; } +} - if (parse_ctx.http_code < 200 || parse_ctx.http_code >= 300) { - error("HTTP Response not Success (got %d)", parse_ctx.http_code); - goto exit_FULL; +#define STRDUPZ_2PTR(dest, start, end) \ + { \ + dest = mallocz(1 + end - start); \ + memcpy(dest, start, end - start); \ + dest[end - start] = 0; \ } - len = rbuf_pop(buffer, b, b_size); - b[MIN(len, b_size-1)] = 0; +int url_parse(const char *url, url_t *parsed) { + const char *start = url; + const char *end = strstr(url, URI_PROTO_SEPARATOR); - rc = 0; -exit_FULL: -exit_SSL: - SSL_free(ssl); -exit_CTX: - SSL_CTX_free(ctx); -exit_sock: - close(sock); -exit_buf: - rbuf_free(buffer); - return rc; + if (end) { + if (end == start) { + error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified"); + return 1; + } + + STRDUPZ_2PTR(parsed->proto, start, end) + start = end + strlen(URI_PROTO_SEPARATOR); + } + + end = strchr(start, '/'); + if (!end) + end = start + strlen(start); + + if (start == end) { + error(URL_PARSER_LOG_PREFIX "Host empty"); + return 1; + } + + STRDUPZ_2PTR(parsed->host, start, end); + + if (parse_host_port(parsed)) + return 1; + + if (!*end) { + parsed->path = strdupz("/"); + port_by_proto(parsed); + return 0; + } + + parsed->path = strdupz(end); + port_by_proto(parsed); + return 0; +} + +void url_t_destroy(url_t *url) { + freez(url->host); + freez(url->path); + freez(url->proto); } diff --git a/aclk/https_client.h b/aclk/https_client.h index 0d2e0dba..f7bc3d43 100644 --- a/aclk/https_client.h +++ b/aclk/https_client.h @@ -1,11 +1,78 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #ifndef NETDATA_HTTPS_CLIENT_H #define NETDATA_HTTPS_CLIENT_H +#include "libnetdata/libnetdata.h" + typedef enum http_req_type { - HTTP_REQ_GET, - HTTP_REQ_POST + HTTP_REQ_GET = 0, + HTTP_REQ_POST, + HTTP_REQ_CONNECT } http_req_type_t; -int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload); +typedef struct { + http_req_type_t request_type; + + char *host; + int port; + char *url; + + time_t timeout_s; //timeout in seconds for the network operation (send/recv) + + void *payload; + size_t payload_size; + + char *proxy_host; + int proxy_port; +} https_req_t; + +typedef struct { + int http_code; + + void *payload; + size_t payload_size; +} https_req_response_t; + + +// Non feature complete URL parser +// feel free to extend when needed +// currently implements only what ACLK +// needs +// proto://host[:port]/path +typedef struct { + char *proto; + char *host; + int port; + char* path; +} url_t; + +int url_parse(const char *url, url_t *parsed); +void url_t_destroy(url_t *url); + +void https_req_response_free(https_req_response_t *res); +void https_req_response_init(https_req_response_t *res); + +#define HTTPS_REQ_RESPONSE_T_INITIALIZER \ + { \ + .http_code = 0, \ + .payload = NULL, \ + .payload_size = 0 \ + } + +#define HTTPS_REQ_T_INITIALIZER \ + { \ + .request_type = HTTP_REQ_GET, \ + .host = NULL, \ + .port = 443, \ + .url = NULL, \ + .timeout_s = 30, \ + .payload = NULL, \ + .payload_size = 0, \ + .proxy_host = NULL, \ + .proxy_port = 8080 \ + } + +int https_request(https_req_t *request, https_req_response_t *response); #endif /* NETDATA_HTTPS_CLIENT_H */ diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c index 43455393..96f95545 100644 --- a/aclk/legacy/aclk_common.c +++ b/aclk/legacy/aclk_common.c @@ -100,7 +100,7 @@ static inline void safe_log_proxy_error(char *str, const char *proxy) freez(log); } -static inline int check_socks_enviroment(const char **proxy) +static inline int check_socks_environment(const char **proxy) { char *tmp = getenv("socks_proxy"); @@ -118,7 +118,7 @@ static inline int check_socks_enviroment(const char **proxy) return 1; } -static inline int check_http_enviroment(const char **proxy) +static inline int check_http_environment(const char **proxy) { char *tmp = getenv("http_proxy"); @@ -145,7 +145,7 @@ const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) return proxy; if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { - if (check_socks_enviroment(&proxy) == 0) { + if (check_socks_environment(&proxy) == 0) { #ifdef LWS_WITH_SOCKS5 *type = PROXY_TYPE_SOCKS5; return proxy; @@ -156,7 +156,7 @@ const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) proxy); #endif } - if (check_http_enviroment(&proxy) == 0) + if (check_http_environment(&proxy) == 0) *type = PROXY_TYPE_HTTP; return proxy; } diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c index df221dd6..f73902b3 100644 --- a/aclk/legacy/aclk_lws_wss_client.c +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -428,7 +428,7 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas // Callback servicing is forced when we are closed from above. if (engine_instance->upstream_reconnect_request) { - error("Closing lws connectino due to libmosquitto error."); + error("Closing lws connection due to libmosquitto error."); char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; lws_close_reason( wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error, @@ -609,7 +609,7 @@ void aclk_lws_wss_service_loop() // in case the MQTT connection disconnect while lws transport is still operational // we should drop connection and reconnect // this function should be called when that happens to notify lws of that situation -void aclk_lws_wss_mqtt_layer_disconect_notif() +void aclk_lws_wss_mqtt_layer_disconnect_notif() { if (!engine_instance) return; diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h index 584a3cf4..eb99ee02 100644 --- a/aclk/legacy/aclk_lws_wss_client.h +++ b/aclk/legacy/aclk_lws_wss_client.h @@ -8,9 +8,9 @@ #include "libnetdata/libnetdata.h" // This is as define because ideally the ACLK at high level -// can do mosqitto writes and reads only from one thread +// can do mosquitto writes and reads only from one thread // which is cleaner implementation IMHO -// in such case this mutexes are not necessarry and life +// in such case this mutexes are not necessary and life // is simpler #define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 @@ -78,7 +78,7 @@ int aclk_lws_wss_client_write(void *buf, size_t count); int aclk_lws_wss_client_read(void *buf, size_t count); void aclk_lws_wss_service_loop(); -void aclk_lws_wss_mqtt_layer_disconect_notif(); +void aclk_lws_wss_mqtt_layer_disconnect_notif(); // Notifications inside the layer above void aclk_lws_connection_established(); diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 27ad9ac1..040068e8 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -498,7 +498,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) z_buffer->len += bytes_to_cpy; } while(z_ret != Z_STREAM_END); // so that web_client_build_http_header - // puts correct content lenght into header + // puts correct content length into header buffer_free(w->response.data); w->response.data = z_buffer; z_buffer = NULL; diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c index 2681445b..68dad81e 100644 --- a/aclk/legacy/aclk_rx_msgs.c +++ b/aclk/legacy/aclk_rx_msgs.c @@ -218,7 +218,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { error( - "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", + "Unsupported version of \"version\" message from cloud. Expected %d, Got %d", ACLK_VERSION_NEG_VERSION, cloud_to_agent->version); return 1; @@ -353,7 +353,7 @@ int aclk_handle_cloud_message(char *payload) // see what `aclk_queue_query` parameter `internal` does // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! - // msg handlers (namely aclk_handle_version_responce) + // msg handlers (namely aclk_handle_version_response) // can freely change what aclk_incoming_msg_types points to // so either exit or restart this for loop freez(cloud_to_agent.type_id); diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c index 7124380a..88679cb3 100644 --- a/aclk/legacy/aclk_stats.c +++ b/aclk/legacy/aclk_stats.c @@ -123,7 +123,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s", + "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "KiB/s", "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA); rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -145,7 +145,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s", + "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "KiB/s", "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA); rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index 5767df3a..5ed7e66a 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -653,7 +653,7 @@ static void aclk_graceful_disconnect() aclk_shutting_down = 1; _link_shutdown(); - aclk_lws_wss_mqtt_layer_disconect_notif(); + aclk_lws_wss_mqtt_layer_disconnect_notif(); write_q = 1; event_loop_timeout = now_realtime_sec() + 5; @@ -937,7 +937,7 @@ static void aclk_try_to_connect(char *hostname, int port) { int rc; -// this is usefull for developers working on ACLK +// this is useful for developers working on ACLK // allows connecting agent to any MQTT broker // for debugging, development and testing purposes #ifndef ACLK_DISABLE_CHALLENGE @@ -986,7 +986,7 @@ static inline void aclk_hello_msg() aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; ACLK_SHARED_STATE_UNLOCK; - //Hello message is versioned separatelly from the rest of the protocol + //Hello message is versioned separately from the rest of the protocol aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); @@ -1211,7 +1211,7 @@ exited: /* * this must be last -> if all static threads signal * THREAD_EXITED rrdengine will dealloc the RRDSETs - * and RRDDIMs that are used by still runing stat thread. + * and RRDDIMs that are used by still running stat thread. * see netdata_cleanup_and_exit() for reference */ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; @@ -1555,7 +1555,7 @@ void aclk_single_update_enable() aclk_disable_single_updates = 0; } -// Trigged by a health reload, sends the alarm metadata +// Triggered by a health reload, sends the alarm metadata void aclk_alarm_reload() { if (unlikely(aclk_host_initializing(localhost))) diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h index e777e0b1..bfcfef8e 100644 --- a/aclk/legacy/agent_cloud_link.h +++ b/aclk/legacy/agent_cloud_link.h @@ -24,7 +24,7 @@ #define ACLK_MAX_TOPIC 255 -#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now +#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff strategy for now #define ACLK_DEFAULT_PORT 9002 #define ACLK_DEFAULT_HOST "localhost" @@ -57,7 +57,7 @@ extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); extern char *is_agent_claimed(void); -extern void aclk_lws_wss_mqtt_layer_disconect_notif(); +extern void aclk_lws_wss_mqtt_layer_disconnect_notif(); char *create_uuid(); // callbacks for agent cloud link diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c index 6f38a20d..74f77455 100644 --- a/aclk/legacy/mqtt.c +++ b/aclk/legacy/mqtt.c @@ -55,7 +55,7 @@ void connect_callback(struct mosquitto *mosq, void *obj, int rc) UNUSED(obj); UNUSED(rc); - info("Connection to cloud estabilished"); + info("Connection to cloud established"); aclk_connect(); return; @@ -75,7 +75,7 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) } aclk_disconnect(); - aclk_lws_wss_mqtt_layer_disconect_notif(); + aclk_lws_wss_mqtt_layer_disconnect_notif(); return; } @@ -170,7 +170,7 @@ static int _mqtt_create_connection(char *username, char *password) int rc = mosquitto_threaded_set(mosq, 1); if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); + error("Failed to tune the thread model for libmosquitto (%s)", mosquitto_strerror(rc)); #if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py index 20ab523d..14e99b65 100644 --- a/aclk/legacy/tests/paho-inspection.py +++ b/aclk/legacy/tests/paho-inspection.py @@ -55,5 +55,5 @@ mqttc.connect(sys.argv[1], 8443, 60) #mqttc.publish("/agent/mine","Test1") #mqttc.subscribe("$SYS/#", 0) -print("Connected succesfully, monitoring /agent/#", flush=True) +print("Connected successfully, monitoring /agent/#", flush=True) mqttc.loop_forever() |