summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-05-19 12:33:27 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-05-19 12:33:27 +0000
commit841395dd16f470e3c051a0a4fff5b91efc983c30 (patch)
tree4115f6eedcddda75067130b80acaff9e51612f49 /aclk/aclk.c
parentAdding upstream version 1.30.1. (diff)
downloadnetdata-841395dd16f470e3c051a0a4fff5b91efc983c30.tar.xz
netdata-841395dd16f470e3c051a0a4fff5b91efc983c30.zip
Adding upstream version 1.31.0.upstream/1.31.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c226
1 files changed, 151 insertions, 75 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 889fa1e4..35549cfe 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk.h"
#include "aclk_stats.h"
@@ -9,6 +11,7 @@
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
+#include "https_client.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
@@ -26,9 +29,13 @@ int aclk_kill_link = 0;
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+time_t aclk_block_until = 0;
+
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
+aclk_env_t *aclk_env = NULL;
+
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -38,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_shared_state aclk_shared_state = {
.agent_state = AGENT_INITIALIZING,
.last_popcorn_interrupt = 0,
- .version_neg = 0,
- .version_neg_wait_till = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
@@ -138,8 +143,7 @@ static int wait_till_agent_claimed(void)
*/
static int wait_till_agent_claim_ready()
{
- int port;
- char *hostname = NULL;
+ url_t url;
while (!netdata_exit) {
if (wait_till_agent_claimed())
return 1;
@@ -154,15 +158,14 @@ static int wait_till_agent_claim_ready()
// We just check configuration is valid here
// TODO make it without malloc/free
- if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
error("Agent is claimed but the configuration is invalid, please fix");
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
sleep(5);
continue;
}
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
if (!load_private_key()) {
sleep(5);
@@ -198,6 +201,11 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
{
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
+ const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+ if (!cmd_topic) {
+ error("Error retrieving command topic");
+ return;
+ }
if (msglen > RX_MSGLEN_MAX - 1)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -221,7 +229,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
- if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic))
+ if (strcmp(cmd_topic, topic))
error("Received message on unexpected topic %s", topic);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
@@ -235,7 +243,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
- aclk_reconnect_delay(0);
+ aclk_tbeb_reset();
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_puback(packet_id);
@@ -320,15 +328,20 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_session_sec = now / USEC_PER_SEC;
aclk_session_us = now % USEC_PER_SEC;
- mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1);
+ const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+
+ if (!topic)
+ error("Unable to fetch topic for COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- aclk_hello_msg(client);
+
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
- error("Sending `connect` payload immediatelly as popcorning was finished already.");
+ error("Sending `connect` payload immediately as popcorning was finished already.");
queue_connect_payloads();
}
ACLK_SHARED_STATE_UNLOCK;
@@ -393,16 +406,41 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
mqtt_wss_disconnect(client, 1000);
}
+static unsigned long aclk_reconnect_delay() {
+ unsigned long recon_delay;
+ time_t now;
+
+ if (aclk_disable_runtime) {
+ aclk_tbeb_reset();
+ return 60 * MSEC_PER_SEC;
+ }
+
+ now = now_monotonic_sec();
+ if (aclk_block_until) {
+ if (now < aclk_block_until) {
+ recon_delay = aclk_block_until - now;
+ recon_delay *= MSEC_PER_SEC;
+ aclk_block_until = 0;
+ aclk_tbeb_reset();
+ return recon_delay;
+ }
+ aclk_block_until = 0;
+ }
+
+ if (!aclk_env || !aclk_env->backoff.base)
+ return aclk_tbeb_delay(0, 2, 0, 1024);
+
+ return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
+}
+
/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
* @return 0 - Go ahead and connect (delay expired)
* 1 - netdata_exit
*/
#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
static int aclk_block_till_recon_allowed() {
- // Handle reconnect exponential backoff
- // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss
- // but has been modifed slightly (more randomness)
- unsigned long recon_delay = aclk_reconnect_delay(1);
+ unsigned long recon_delay = aclk_reconnect_delay();
+
info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
@@ -420,44 +458,22 @@ static int aclk_block_till_recon_allowed() {
return 0;
}
-#define HTTP_PROXY_PREFIX "http://"
-static void set_proxy(struct mqtt_wss_proxy *out)
-{
- ACLK_PROXY_TYPE pt;
- const char *ptr = aclk_get_proxy(&pt);
- char *tmp;
- char *host;
- if (pt != PROXY_TYPE_HTTP)
- return;
-
- out->port = 0;
-
- if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
- ptr += strlen(HTTP_PROXY_PREFIX);
-
- if ((tmp = strchr(ptr, '@')))
- ptr = tmp;
-
- if ((tmp = strchr(ptr, '/'))) {
- host = mallocz((tmp - ptr) + 1);
- memcpy(host, ptr, (tmp - ptr));
- host[tmp - ptr] = 0;
- } else
- host = strdupz(ptr);
-
- if ((tmp = strchr(host, ':'))) {
- *tmp = 0;
- tmp++;
- out->port = atoi(tmp);
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 3
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ return i;
+ }
}
-
- if (out->port <= 0 || out->port > 65535)
- out->port = 8080;
-
- out->host = host;
-
- out->type = MQTT_WSS_PROXY_HTTP;
+ return -1;
}
+#endif
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
@@ -473,12 +489,13 @@ static void set_proxy(struct mqtt_wss_proxy *out)
#endif
static int aclk_attempt_to_connect(mqtt_wss_client client)
{
- char *aclk_hostname = NULL;
- int aclk_port;
+ int ret;
+
+ url_t base_url;
#ifndef ACLK_DISABLE_CHALLENGE
- char *mqtt_otp_user = NULL;
- char *mqtt_otp_pass = NULL;
+ url_t auth_url;
+ url_t mqtt_url;
#endif
json_object *lwt;
@@ -494,48 +511,103 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
info("Attempting connection now");
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &base_url)) {
error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
continue;
}
- struct mqtt_wss_proxy proxy_conf;
- proxy_conf.type = MQTT_WSS_DIRECT;
- set_proxy(&proxy_conf);
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
.clientid = "anon",
.username = "anon",
.password = "anon",
- .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA),
+ .will_topic = "lwt",
.will_msg = NULL,
.will_flags = MQTT_WSS_PUB_QOS2,
.keep_alive = 60
};
+
#ifndef ACLK_DISABLE_CHALLENGE
- aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
- mqtt_conn_params.clientid = mqtt_otp_user;
- mqtt_conn_params.username = mqtt_otp_user;
- mqtt_conn_params.password = mqtt_otp_pass;
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if (ret) {
+ error("Failed to Get ACLK environment");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
+ url_t_destroy(&auth_url);
+ if (ret) {
+ error("Error passing Challenge/Response to get OTP");
+ continue;
+ }
+
+ // aclk_get_topic moved here as during OTP we
+ // generate the topic cache
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (!mqtt_conn_params.will_topic) {
+ error("Couldn't get LWT topic. Will not send LWT.");
+ continue;
+ }
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
-
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
- if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
- json_object_put(lwt);
- freez(aclk_hostname);
- aclk_hostname = NULL;
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+
+ freez((char*)mqtt_conn_params.clientid);
+ freez((char*)mqtt_conn_params.password);
+ freez((char*)mqtt_conn_params.username);
+#endif
+
+ json_object_put(lwt);
+
+ if (!ret) {
info("MQTTWSS connection succeeded");
mqtt_connected_actions(client);
return 0;
}
- freez(aclk_hostname);
- aclk_hostname = NULL;
- json_object_put(lwt);
error("Connect failed\n");
}
@@ -637,6 +709,10 @@ exit_full:
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}