diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-05-19 12:33:27 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-05-19 12:33:27 +0000 |
commit | 841395dd16f470e3c051a0a4fff5b91efc983c30 (patch) | |
tree | 4115f6eedcddda75067130b80acaff9e51612f49 /aclk/aclk.c | |
parent | Adding upstream version 1.30.1. (diff) | |
download | netdata-841395dd16f470e3c051a0a4fff5b91efc983c30.tar.xz netdata-841395dd16f470e3c051a0a4fff5b91efc983c30.zip |
Adding upstream version 1.31.0.upstream/1.31.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 226 |
1 files changed, 151 insertions, 75 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 889fa1e4..35549cfe 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #include "aclk.h" #include "aclk_stats.h" @@ -9,6 +11,7 @@ #include "aclk_util.h" #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" +#include "https_client.h" #ifdef ACLK_LOG_CONVERSATION_DIR #include <sys/types.h> @@ -26,9 +29,13 @@ int aclk_kill_link = 0; int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +time_t aclk_block_until = 0; + usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer +aclk_env_t *aclk_env = NULL; + mqtt_wss_client mqttwss_client; netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; @@ -38,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_shared_state aclk_shared_state = { .agent_state = AGENT_INITIALIZING, .last_popcorn_interrupt = 0, - .version_neg = 0, - .version_neg_wait_till = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -138,8 +143,7 @@ static int wait_till_agent_claimed(void) */ static int wait_till_agent_claim_ready() { - int port; - char *hostname = NULL; + url_t url; while (!netdata_exit) { if (wait_till_agent_claimed()) return 1; @@ -154,15 +158,14 @@ static int wait_till_agent_claim_ready() // We just check configuration is valid here // TODO make it without malloc/free - if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) { + memset(&url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &url)) { error("Agent is claimed but the configuration is invalid, please fix"); - freez(hostname); - hostname = NULL; + url_t_destroy(&url); sleep(5); continue; } - freez(hostname); - hostname = NULL; + url_t_destroy(&url); if (!load_private_key()) { sleep(5); @@ -198,6 +201,11 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int { char cmsg[RX_MSGLEN_MAX]; size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); + const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + if (!cmd_topic) { + error("Error retrieving command topic"); + return; + } if (msglen > RX_MSGLEN_MAX - 1) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -221,7 +229,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg); - if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic)) + if (strcmp(cmd_topic, topic)) error("Received message on unexpected topic %s", topic); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { @@ -235,7 +243,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) - aclk_reconnect_delay(0); + aclk_tbeb_reset(); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_puback(packet_id); @@ -320,15 +328,20 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_session_sec = now / USEC_PER_SEC; aclk_session_us = now % USEC_PER_SEC; - mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1); + const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + + if (!topic) + error("Unable to fetch topic for COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - aclk_hello_msg(client); + ACLK_SHARED_STATE_LOCK; if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - error("Sending `connect` payload immediatelly as popcorning was finished already."); + error("Sending `connect` payload immediately as popcorning was finished already."); queue_connect_payloads(); } ACLK_SHARED_STATE_UNLOCK; @@ -393,16 +406,41 @@ void aclk_graceful_disconnect(mqtt_wss_client client) mqtt_wss_disconnect(client, 1000); } +static unsigned long aclk_reconnect_delay() { + unsigned long recon_delay; + time_t now; + + if (aclk_disable_runtime) { + aclk_tbeb_reset(); + return 60 * MSEC_PER_SEC; + } + + now = now_monotonic_sec(); + if (aclk_block_until) { + if (now < aclk_block_until) { + recon_delay = aclk_block_until - now; + recon_delay *= MSEC_PER_SEC; + aclk_block_until = 0; + aclk_tbeb_reset(); + return recon_delay; + } + aclk_block_until = 0; + } + + if (!aclk_env || !aclk_env->backoff.base) + return aclk_tbeb_delay(0, 2, 0, 1024); + + return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s); +} + /* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled * @return 0 - Go ahead and connect (delay expired) * 1 - netdata_exit */ #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4) static int aclk_block_till_recon_allowed() { - // Handle reconnect exponential backoff - // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss - // but has been modifed slightly (more randomness) - unsigned long recon_delay = aclk_reconnect_delay(1); + unsigned long recon_delay = aclk_reconnect_delay(); + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) @@ -420,44 +458,22 @@ static int aclk_block_till_recon_allowed() { return 0; } -#define HTTP_PROXY_PREFIX "http://" -static void set_proxy(struct mqtt_wss_proxy *out) -{ - ACLK_PROXY_TYPE pt; - const char *ptr = aclk_get_proxy(&pt); - char *tmp; - char *host; - if (pt != PROXY_TYPE_HTTP) - return; - - out->port = 0; - - if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) - ptr += strlen(HTTP_PROXY_PREFIX); - - if ((tmp = strchr(ptr, '@'))) - ptr = tmp; - - if ((tmp = strchr(ptr, '/'))) { - host = mallocz((tmp - ptr) + 1); - memcpy(host, ptr, (tmp - ptr)); - host[tmp - ptr] = 0; - } else - host = strdupz(ptr); - - if ((tmp = strchr(host, ':'))) { - *tmp = 0; - tmp++; - out->port = atoi(tmp); +#ifndef ACLK_DISABLE_CHALLENGE +/* Cloud returns transport list ordered with highest + * priority first. This function selects highest prio + * transport that we can actually use (support) + */ +static int aclk_get_transport_idx(aclk_env_t *env) { + for (size_t i = 0; i < env->transport_count; i++) { + // currently we support only MQTT 3 + // therefore select first transport that matches + if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + return i; + } } - - if (out->port <= 0 || out->port > 65535) - out->port = 8080; - - out->host = host; - - out->type = MQTT_WSS_PROXY_HTTP; + return -1; } +#endif /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client @@ -473,12 +489,13 @@ static void set_proxy(struct mqtt_wss_proxy *out) #endif static int aclk_attempt_to_connect(mqtt_wss_client client) { - char *aclk_hostname = NULL; - int aclk_port; + int ret; + + url_t base_url; #ifndef ACLK_DISABLE_CHALLENGE - char *mqtt_otp_user = NULL; - char *mqtt_otp_pass = NULL; + url_t auth_url; + url_t mqtt_url; #endif json_object *lwt; @@ -494,48 +511,103 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) return 1; info("Attempting connection now"); - if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { + memset(&base_url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &base_url)) { error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); + url_t_destroy(&base_url); continue; } - struct mqtt_wss_proxy proxy_conf; - proxy_conf.type = MQTT_WSS_DIRECT; - set_proxy(&proxy_conf); + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); struct mqtt_connect_params mqtt_conn_params = { .clientid = "anon", .username = "anon", .password = "anon", - .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA), + .will_topic = "lwt", .will_msg = NULL, .will_flags = MQTT_WSS_PUB_QOS2, .keep_alive = 60 }; + #ifndef ACLK_DISABLE_CHALLENGE - aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass); - mqtt_conn_params.clientid = mqtt_otp_user; - mqtt_conn_params.username = mqtt_otp_user; - mqtt_conn_params.password = mqtt_otp_pass; + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } + aclk_env = callocz(1, sizeof(aclk_env_t)); + + ret = aclk_get_env(aclk_env, base_url.host, base_url.port); + url_t_destroy(&base_url); + if (ret) { + error("Failed to Get ACLK environment"); + // delay handled by aclk_block_till_recon_allowed + continue; + } + + memset(&auth_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + url_t_destroy(&auth_url); + continue; + } + + ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); + url_t_destroy(&auth_url); + if (ret) { + error("Error passing Challenge/Response to get OTP"); + continue; + } + + // aclk_get_topic moved here as during OTP we + // generate the topic cache + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { + error("Couldn't get LWT topic. Will not send LWT."); + continue; + } + + // Do the MQTT connection + ret = aclk_get_transport_idx(aclk_env); + if (ret < 0) { + error("Cloud /env endpoint didn't return any transport usable by this Agent."); + continue; + } + + memset(&mqtt_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + url_t_destroy(&mqtt_url); + continue; + } #endif lwt = aclk_generate_disconnect(NULL); mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); - if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) { - json_object_put(lwt); - freez(aclk_hostname); - aclk_hostname = NULL; + +#ifdef ACLK_DISABLE_CHALLENGE + ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&base_url); +#else + ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&mqtt_url); + + freez((char*)mqtt_conn_params.clientid); + freez((char*)mqtt_conn_params.password); + freez((char*)mqtt_conn_params.username); +#endif + + json_object_put(lwt); + + if (!ret) { info("MQTTWSS connection succeeded"); mqtt_connected_actions(client); return 0; } - freez(aclk_hostname); - aclk_hostname = NULL; - json_object_put(lwt); error("Connect failed\n"); } @@ -637,6 +709,10 @@ exit_full: free_topic_cache(); mqtt_wss_destroy(mqttwss_client); exit: + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } |