summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c226
-rw-r--r--aclk/aclk.h28
-rw-r--r--aclk/aclk_otp.c746
-rw-r--r--aclk/aclk_otp.h5
-rw-r--r--aclk/aclk_query.c19
-rw-r--r--aclk/aclk_rx_msgs.c88
-rw-r--r--aclk/aclk_rx_msgs.h1
-rw-r--r--aclk/aclk_stats.c2
-rw-r--r--aclk/aclk_tx_msgs.c64
-rw-r--r--aclk/aclk_tx_msgs.h2
-rw-r--r--aclk/aclk_util.c365
-rw-r--r--aclk/aclk_util.h59
-rw-r--r--aclk/https_client.c556
-rw-r--r--aclk/https_client.h73
-rw-r--r--aclk/legacy/aclk_common.c8
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c4
-rw-r--r--aclk/legacy/aclk_lws_wss_client.h6
-rw-r--r--aclk/legacy/aclk_query.c2
-rw-r--r--aclk/legacy/aclk_rx_msgs.c4
-rw-r--r--aclk/legacy/aclk_stats.c4
-rw-r--r--aclk/legacy/agent_cloud_link.c10
-rw-r--r--aclk/legacy/agent_cloud_link.h4
-rw-r--r--aclk/legacy/mqtt.c6
-rw-r--r--aclk/legacy/tests/paho-inspection.py2
24 files changed, 1779 insertions, 505 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 889fa1e4..35549cfe 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk.h"
#include "aclk_stats.h"
@@ -9,6 +11,7 @@
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
+#include "https_client.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
@@ -26,9 +29,13 @@ int aclk_kill_link = 0;
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+time_t aclk_block_until = 0;
+
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
+aclk_env_t *aclk_env = NULL;
+
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -38,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_shared_state aclk_shared_state = {
.agent_state = AGENT_INITIALIZING,
.last_popcorn_interrupt = 0,
- .version_neg = 0,
- .version_neg_wait_till = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
@@ -138,8 +143,7 @@ static int wait_till_agent_claimed(void)
*/
static int wait_till_agent_claim_ready()
{
- int port;
- char *hostname = NULL;
+ url_t url;
while (!netdata_exit) {
if (wait_till_agent_claimed())
return 1;
@@ -154,15 +158,14 @@ static int wait_till_agent_claim_ready()
// We just check configuration is valid here
// TODO make it without malloc/free
- if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
error("Agent is claimed but the configuration is invalid, please fix");
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
sleep(5);
continue;
}
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
if (!load_private_key()) {
sleep(5);
@@ -198,6 +201,11 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
{
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
+ const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+ if (!cmd_topic) {
+ error("Error retrieving command topic");
+ return;
+ }
if (msglen > RX_MSGLEN_MAX - 1)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -221,7 +229,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
- if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic))
+ if (strcmp(cmd_topic, topic))
error("Received message on unexpected topic %s", topic);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
@@ -235,7 +243,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
- aclk_reconnect_delay(0);
+ aclk_tbeb_reset();
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_puback(packet_id);
@@ -320,15 +328,20 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_session_sec = now / USEC_PER_SEC;
aclk_session_us = now % USEC_PER_SEC;
- mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1);
+ const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+
+ if (!topic)
+ error("Unable to fetch topic for COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- aclk_hello_msg(client);
+
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
- error("Sending `connect` payload immediatelly as popcorning was finished already.");
+ error("Sending `connect` payload immediately as popcorning was finished already.");
queue_connect_payloads();
}
ACLK_SHARED_STATE_UNLOCK;
@@ -393,16 +406,41 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
mqtt_wss_disconnect(client, 1000);
}
+static unsigned long aclk_reconnect_delay() {
+ unsigned long recon_delay;
+ time_t now;
+
+ if (aclk_disable_runtime) {
+ aclk_tbeb_reset();
+ return 60 * MSEC_PER_SEC;
+ }
+
+ now = now_monotonic_sec();
+ if (aclk_block_until) {
+ if (now < aclk_block_until) {
+ recon_delay = aclk_block_until - now;
+ recon_delay *= MSEC_PER_SEC;
+ aclk_block_until = 0;
+ aclk_tbeb_reset();
+ return recon_delay;
+ }
+ aclk_block_until = 0;
+ }
+
+ if (!aclk_env || !aclk_env->backoff.base)
+ return aclk_tbeb_delay(0, 2, 0, 1024);
+
+ return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
+}
+
/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
* @return 0 - Go ahead and connect (delay expired)
* 1 - netdata_exit
*/
#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
static int aclk_block_till_recon_allowed() {
- // Handle reconnect exponential backoff
- // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss
- // but has been modifed slightly (more randomness)
- unsigned long recon_delay = aclk_reconnect_delay(1);
+ unsigned long recon_delay = aclk_reconnect_delay();
+
info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
@@ -420,44 +458,22 @@ static int aclk_block_till_recon_allowed() {
return 0;
}
-#define HTTP_PROXY_PREFIX "http://"
-static void set_proxy(struct mqtt_wss_proxy *out)
-{
- ACLK_PROXY_TYPE pt;
- const char *ptr = aclk_get_proxy(&pt);
- char *tmp;
- char *host;
- if (pt != PROXY_TYPE_HTTP)
- return;
-
- out->port = 0;
-
- if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
- ptr += strlen(HTTP_PROXY_PREFIX);
-
- if ((tmp = strchr(ptr, '@')))
- ptr = tmp;
-
- if ((tmp = strchr(ptr, '/'))) {
- host = mallocz((tmp - ptr) + 1);
- memcpy(host, ptr, (tmp - ptr));
- host[tmp - ptr] = 0;
- } else
- host = strdupz(ptr);
-
- if ((tmp = strchr(host, ':'))) {
- *tmp = 0;
- tmp++;
- out->port = atoi(tmp);
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 3
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ return i;
+ }
}
-
- if (out->port <= 0 || out->port > 65535)
- out->port = 8080;
-
- out->host = host;
-
- out->type = MQTT_WSS_PROXY_HTTP;
+ return -1;
}
+#endif
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
@@ -473,12 +489,13 @@ static void set_proxy(struct mqtt_wss_proxy *out)
#endif
static int aclk_attempt_to_connect(mqtt_wss_client client)
{
- char *aclk_hostname = NULL;
- int aclk_port;
+ int ret;
+
+ url_t base_url;
#ifndef ACLK_DISABLE_CHALLENGE
- char *mqtt_otp_user = NULL;
- char *mqtt_otp_pass = NULL;
+ url_t auth_url;
+ url_t mqtt_url;
#endif
json_object *lwt;
@@ -494,48 +511,103 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
info("Attempting connection now");
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &base_url)) {
error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
continue;
}
- struct mqtt_wss_proxy proxy_conf;
- proxy_conf.type = MQTT_WSS_DIRECT;
- set_proxy(&proxy_conf);
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
.clientid = "anon",
.username = "anon",
.password = "anon",
- .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA),
+ .will_topic = "lwt",
.will_msg = NULL,
.will_flags = MQTT_WSS_PUB_QOS2,
.keep_alive = 60
};
+
#ifndef ACLK_DISABLE_CHALLENGE
- aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
- mqtt_conn_params.clientid = mqtt_otp_user;
- mqtt_conn_params.username = mqtt_otp_user;
- mqtt_conn_params.password = mqtt_otp_pass;
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if (ret) {
+ error("Failed to Get ACLK environment");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
+ url_t_destroy(&auth_url);
+ if (ret) {
+ error("Error passing Challenge/Response to get OTP");
+ continue;
+ }
+
+ // aclk_get_topic moved here as during OTP we
+ // generate the topic cache
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (!mqtt_conn_params.will_topic) {
+ error("Couldn't get LWT topic. Will not send LWT.");
+ continue;
+ }
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
-
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
- if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
- json_object_put(lwt);
- freez(aclk_hostname);
- aclk_hostname = NULL;
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+
+ freez((char*)mqtt_conn_params.clientid);
+ freez((char*)mqtt_conn_params.password);
+ freez((char*)mqtt_conn_params.username);
+#endif
+
+ json_object_put(lwt);
+
+ if (!ret) {
info("MQTTWSS connection succeeded");
mqtt_connected_actions(client);
return 0;
}
- freez(aclk_hostname);
- aclk_hostname = NULL;
- json_object_put(lwt);
error("Connect failed\n");
}
@@ -637,6 +709,10 @@ exit_full:
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 29626c7f..b02b93d7 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -9,23 +9,8 @@ typedef struct aclk_rrdhost_state {
#include "../daemon/common.h"
#include "aclk_util.h"
-// minimum and maximum supported version of ACLK
-// in this version of agent
-#define ACLK_VERSION_MIN 2
-#define ACLK_VERSION_MAX 2
-
-// Version negotiation messages have they own versioning
-// this is also used for LWT message as we set that up
-// before version negotiation
-#define ACLK_VERSION_NEG_VERSION 1
-
-// Maximum time to wait for version negotiation before aborting
-// and defaulting to oldest supported version
-#define VERSION_NEG_TIMEOUT 3
-
-#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
-#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
-#endif
+// version for aclk legacy (old cloud arch)
+#define ACLK_VERSION 2
// Define ACLK Feature Version Boundaries Here
#define ACLK_V_COMPRESSION 2
@@ -40,9 +25,13 @@ extern int aclk_disable_single_updates;
extern int aclk_kill_link;
extern int aclk_connected;
+extern time_t aclk_block_until;
+
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
+extern aclk_env_t *aclk_env;
+
void *aclk_main(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
@@ -68,11 +57,6 @@ extern struct aclk_shared_state {
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
- // read only while ACLK connected
- // protect by lock otherwise
- int version_neg;
- usec_t version_neg_wait_till;
-
// To wait for `disconnect` message PUBACK
// when shuting down
// at the same time if > 0 we know link is
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index fcb9d600..411a5f89 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -3,12 +3,16 @@
#include "aclk_otp.h"
-#include "https_client.h"
-
#include "../daemon/common.h"
#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h"
+// CentOS 7 has older version that doesn't define this
+// same goes for MacOS
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
struct dictionary_singleton {
char *key;
char *result;
@@ -167,54 +171,321 @@ static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, u
return result;
}
-// aclk_get_mqtt_otp is slightly modified original code from @amoss
-void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass)
-{
- char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- debug(D_ACLK, "Performing challenge-response sequence");
- if (*mqtt_pass != NULL)
- {
- freez(*mqtt_pass);
- *mqtt_pass = NULL;
+static int aclk_https_request(https_req_t *request, https_req_response_t *response) {
+ int rc;
+ // wrapper for ACLK only which loads ACLK specific proxy settings
+ // then only calls https_request
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+
+ if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) {
+ request->proxy_host = (char*)proxy_conf.host; // TODO make it const as well
+ request->proxy_port = proxy_conf.port;
+ }
+
+ rc = https_request(request, response);
+ freez((char*)proxy_conf.host);
+ return rc;
+}
+
+struct auth_data {
+ char *client_id;
+ char *username;
+ char *passwd;
+};
+
+#define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \
+ if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \
+ error("value of key \"%s\" should be %s", name, #type); \
+ goto exit; \
+ }
+
+#define JSON_KEY_CLIENTID "clientID"
+#define JSON_KEY_USER "username"
+#define JSON_KEY_PASS "password"
+#define JSON_KEY_TOPICS "topics"
+
+static int parse_passwd_response(const char *json_str, struct auth_data *auth) {
+ int rc = 1;
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http respons of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CLIENTID)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_CLIENTID)
+
+ auth->client_id = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_USER)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_USER)
+
+ auth->username = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_PASS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_PASS)
+
+ auth->passwd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TOPICS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS)
+
+ if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) {
+ error("Failed to generate topic cache!");
+ goto exit;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+ error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (!auth->client_id) {
+ error(JSON_KEY_CLIENTID " is compulsory key in /password response");
+ goto exit;
+ }
+ if (!auth->passwd) {
+ error(JSON_KEY_PASS " is compulsory in /password response");
+ goto exit;
+ }
+ if (!auth->username) {
+ error(JSON_KEY_USER " is compulsory in /password response");
+ goto exit;
+ }
+
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+
+#define JSON_KEY_ERTRY "errorNonRetryable"
+#define JSON_KEY_EDELAY "errorRetryDelaySeconds"
+#define JSON_KEY_EEC "errorCode"
+#define JSON_KEY_EMSGKEY "errorMsgKey"
+#define JSON_KEY_EMSG "errorMessage"
+#if JSON_C_MINOR_VERSION >= 13
+static const char *get_json_str_by_path(json_object *json, const char *path) {
+ json_object *ptr;
+ if (json_pointer_get(json, path, &ptr)) {
+ error("Missing compulsory key \"%s\" in error response", path);
+ return NULL;
+ }
+ if (json_object_get_type(ptr) != json_type_string) {
+ error("Value of Key \"%s\" in error response should be string", path);
+ return NULL;
+ }
+ return json_object_get_string(ptr);
+}
+
+static int aclk_parse_otp_error(const char *json_str) {
+ int rc = 1;
+ json_object *json, *ptr;
+ const char *ec;
+ const char *ek;
+ const char *emsg;
+ int block_retry = -1, backoff = -1;
+
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http response of /env endpoint");
+ return 1;
+ }
+
+ if ((ec = get_json_str_by_path(json, "/" JSON_KEY_EEC)) == NULL)
+ goto exit;
+
+ if ((ek = get_json_str_by_path(json, "/" JSON_KEY_EMSGKEY)) == NULL)
+ goto exit;
+
+ if ((emsg = get_json_str_by_path(json, "/" JSON_KEY_EMSG)) == NULL)
+ goto exit;
+
+ // optional field
+ if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) {
+ if (json_object_get_type(ptr) != json_type_boolean) {
+ error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type");
+ goto exit;
+ }
+ block_retry = json_object_get_boolean(ptr);
+ }
+
+ // optional field
+ if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) {
+ if (json_object_get_type(ptr) != json_type_int) {
+ error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type");
+ goto exit;
+ }
+ backoff = json_object_get_int(ptr);
+ }
+
+ if (block_retry > 0)
+ aclk_disable_runtime = 1;
+
+ if (backoff > 0)
+ aclk_block_until = now_monotonic_sec() + backoff;
+
+ error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+#else
+static int aclk_parse_otp_error(const char *json_str) {
+ int rc = 1;
+ int block_retry = -1, backoff = -1;
+
+ const char *ec = NULL;
+ const char *ek = NULL;
+ const char *emsg = NULL;
+
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http respons of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSG)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSG)
+
+ emsg = json_object_get_string(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSGKEY)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSGKEY)
+
+ ek = json_object_get_string(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EEC)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EEC)
+
+ ec = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) {
+ error("value of key " JSON_KEY_EDELAY " should be integer");
+ goto exit;
+ }
+
+ backoff = json_object_get_int(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) {
+ error("value of key " JSON_KEY_ERTRY " should be integer");
+ goto exit;
+ }
+
+ block_retry = json_object_get_boolean(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
}
- // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
- // TODO - target host?
+
+ if (block_retry > 0)
+ aclk_disable_runtime = 1;
+
+ if (backoff > 0)
+ aclk_block_until = now_monotonic_sec() + backoff;
+
+ error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+#endif
+
+#define OTP_URL_PREFIX "/api/v1/auth/node/"
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) {
+ // TODO this fnc will be rewritten and simplified in following PRs
+ // still carries lot of baggage from ACLK Legacy
+ int rc = 1;
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
{
error("Agent was not claimed - cannot perform challenge/response");
- goto CLEANUP;
+ goto cleanup;
}
- char url[1024];
- sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
- info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
- if (https_request(HTTP_REQ_GET, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
- {
- error("Challenge failed: %s", data_buffer);
- goto CLEANUP;
+
+ // GET Challenge
+ req.host = target->host;
+ req.port = target->port;
+ buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id);
+ req.url = url->buffer;
+
+ if (aclk_https_request(&req, &resp)) {
+ error ("ACLK_OTP Challenge failed");
+ goto cleanup;
}
+ if (resp.http_code != 200) {
+ error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
+ goto cleanup_resp;
+ }
+ info ("ACLK_OTP Got Challenge from Cloud");
+
struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
- debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
- if (json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
+ if (json_parse(resp.payload, &challenge, json_extract_singleton) != JSON_OK)
{
freez(challenge.result);
- error("Could not parse the json response with the challenge: %s", data_buffer);
- goto CLEANUP;
+ error("Could not parse the the challenge");
+ goto cleanup_resp;
}
if (challenge.result == NULL) {
- error("Could not retrieve challenge from auth response: %s", data_buffer);
- goto CLEANUP;
+ error("Could not retrieve challenge JSON key from challenge response");
+ goto cleanup_resp;
}
-
+ // Decrypt the Challenge and Calculate Response
size_t challenge_len = strlen(challenge.result);
unsigned char decoded[512];
size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
+ freez(challenge.result);
unsigned char plaintext[4096]={};
int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext);
- freez(challenge.result);
char encoded[512];
size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
encoded[encoded_len] = 0;
@@ -223,39 +494,394 @@ void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_us
char response_json[4096]={};
sprintf(response_json, "{\"response\":\"%s\"}", encoded);
debug(D_ACLK, "Password phase: %s",response_json);
- // TODO - host
- sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
- if (https_request(HTTP_REQ_POST, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
- {
- error("Challenge-response failed: %s", data_buffer);
- goto CLEANUP;
+
+ https_req_response_free(&resp);
+ https_req_response_init(&resp);
+
+ // POST password
+ req.request_type = HTTP_REQ_POST;
+ buffer_flush(url);
+ buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id);
+ req.url = url->buffer;
+ req.payload = response_json;
+ req.payload_size = strlen(response_json);
+
+ if (aclk_https_request(&req, &resp)) {
+ error ("ACLK_OTP Password error trying to post result to password");
+ goto cleanup;
+ }
+ if (resp.http_code != 201) {
+ error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
+ goto cleanup_resp;
+ }
+ info ("ACLK_OTP Got Password from Cloud");
+
+ struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL };
+
+ if (parse_passwd_response(resp.payload, &data)){
+ error("Error parsing response of password endpoint");
+ goto cleanup_resp;
+ }
+
+ *mqtt_pass = data.passwd;
+ *mqtt_usr = data.username;
+ *mqtt_id = data.client_id;
+
+ rc = 0;
+cleanup_resp:
+ https_req_response_free(&resp);
+cleanup:
+ freez(agent_id);
+ buffer_free(url);
+ return rc;
+}
+
+#define JSON_KEY_ENC "encoding"
+#define JSON_KEY_AUTH_ENDPOINT "authEndpoint"
+#define JSON_KEY_TRP "transports"
+#define JSON_KEY_TRP_TYPE "type"
+#define JSON_KEY_TRP_ENDPOINT "endpoint"
+#define JSON_KEY_BACKOFF "backoff"
+#define JSON_KEY_BACKOFF_BASE "base"
+#define JSON_KEY_BACKOFF_MAX "maxSeconds"
+#define JSON_KEY_BACKOFF_MIN "minSeconds"
+#define JSON_KEY_CAPS "capabilities"
+
+static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *trp) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE)
+ if (trp->type != ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " set already");
+ goto exit;
+ }
+ trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ goto exit;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT)
+ if (trp->endpoint) {
+ error(JSON_KEY_TRP_ENDPOINT " set already");
+ goto exit;
+ }
+ trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
}
- debug(D_ACLK, "Password response from cloud: %s", data_buffer);
+ if (!trp->endpoint) {
+ error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary");
+ goto exit;
+ }
+
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error ("transport type not set");
+ goto exit;
+ }
+
+ return 0;
+
+exit:
+ aclk_transport_desc_t_destroy(trp);
+ return 1;
+}
+
+static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) {
+ aclk_transport_desc_t *trp;
+ json_object *obj;
+
+ if (env->transports) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->transport_count = json_object_array_length(json_array);
+
+ env->transports = callocz(env->transport_count , sizeof(aclk_transport_desc_t *));
+
+ for (size_t i = 0; i < env->transport_count; i++) {
+ trp = callocz(1, sizeof(aclk_transport_desc_t));
+ obj = json_object_array_get_idx(json_array, i);
+ if (parse_json_env_transport(obj, trp)) {
+ error("error parsing transport idx %d", (int)i);
+ freez(trp);
+ return 1;
+ }
+ env->transports[i] = trp;
+ }
+
+ return 0;
+}
+
+#define MATCHED_CORRECT 1
+#define MATCHED_ERROR -1
+#define NOT_MATCHED 0
+static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) {
+ if (!strcmp(json_object_iter_peek_name(it), name)) {
+ if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) {
+ error("Could not parse \"%s\". Not an integer as expected.", name);
+ return MATCHED_ERROR;
+ }
+
+ *out = json_object_get_int(json_object_iter_peek_value(it));
+
+ if (*out < min || *out > max) {
+ error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max);
+ return MATCHED_ERROR;
+ }
+
+ return MATCHED_CORRECT;
+ }
+ return NOT_MATCHED;
+}
+
+static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+ int ret;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if ( (ret = parse_json_backoff_int(&it, &backoff->base, JSON_KEY_BACKOFF_BASE, 1, 10)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->max_s, JSON_KEY_BACKOFF_MAX, 500, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->min_s, JSON_KEY_BACKOFF_MIN, 0, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ return 0;
+}
+
+static int parse_json_env_caps(json_object *json, aclk_env_t *env) {
+ json_object *obj;
+ const char *str;
+
+ if (env->capabilities) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->capability_count = json_object_array_length(json);
+
+ // empty capabilities list is allowed
+ if (!env->capability_count)
+ return 0;
+
+ env->capabilities = callocz(env->capability_count , sizeof(char *));
+
+ for (size_t i = 0; i < env->capability_count; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_string) {
+ error("Capability at index %d not a string!", (int)i);
+ return 1;
+ }
+ str = json_object_get_string(obj);
+ if (!str) {
+ error("Error parsing capabilities");
+ return 1;
+ }
+ env->capabilities[i] = strdupz(str);
+ }
+
+ return 0;
+}
- struct dictionary_singleton password = { .key = "password", .result = NULL };
- if (json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
+static int parse_json_env(const char *json_str, aclk_env_t *env) {
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http respons of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT)
+ if (env->auth_endpoint) {
+ error("authEndpoint set already");
+ goto exit;
+ }
+ env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC)
+ if (env->encoding != ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " set already");
+ goto exit;
+ }
+ env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TRP)
+
+ json_object *now = json_object_iter_peek_value(&it);
+ parse_json_env_transports(now, env);
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_BACKOFF)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_object, JSON_KEY_BACKOFF)
+
+ if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) {
+ env->backoff.base = 0;
+ error("Error parsing Backoff parameters in env");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CAPS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS)
+
+ if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) {
+ error("Error parsing capabilities list");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ // Check all compulsory keys have been set
+ if (env->transport_count < 1) {
+ error("env has to return at least one transport");
+ goto exit;
+ }
+ if (!env->auth_endpoint) {
+ error(JSON_KEY_AUTH_ENDPOINT " is compulsory");
+ goto exit;
+ }
+ if (env->encoding == ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " is compulsory");
+ goto exit;
+ }
+ if (!env->backoff.base) {
+ error(JSON_KEY_BACKOFF " is compulsory");
+ goto exit;
+ }
+
+ json_object_put(json);
+ return 0;
+
+exit:
+ aclk_env_t_destroy(env);
+ json_object_put(json);
+ return 1;
+}
+
+int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
+ BUFFER *buf = buffer_create(1024);
+
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ req.request_type = HTTP_REQ_GET;
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
{
- freez(password.result);
- error("Could not parse the json response with the password: %s", data_buffer);
- goto CLEANUP;
- }
-
- if (password.result == NULL ) {
- error("Could not retrieve password from auth response");
- goto CLEANUP;
- }
- if (*mqtt_pass != NULL )
- freez(*mqtt_pass);
- *mqtt_pass = password.result;
- if (*mqtt_usr != NULL)
- freez(*mqtt_usr);
- *mqtt_usr = agent_id;
- agent_id = NULL;
-
-CLEANUP:
- if (agent_id != NULL)
- freez(agent_id);
- freez(data_buffer);
- return;
+ error("Agent was not claimed - cannot perform challenge/response");
+ buffer_free(buf);
+ return 1;
+ }
+
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json$claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ freez(agent_id);
+
+ req.host = (char*)aclk_hostname;
+ req.port = aclk_port;
+ req.url = buf->buffer;
+ if (aclk_https_request(&req, &resp)) {
+ error("Error trying to contact env endpoint");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+ if (resp.http_code != 200) {
+ error("The HTTP code not 200 OK (Got %d)", resp.http_code);
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (!resp.payload || !resp.payload_size) {
+ error("Unexpected empty payload as response to /env call");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (parse_json_env(resp.payload, env)) {
+ error ("error parsing /env message");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ info("Getting Cloud /env successful");
+
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 0;
}
diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h
index 31e81c5a..d2044f6f 100644
--- a/aclk/aclk_otp.h
+++ b/aclk/aclk_otp.h
@@ -5,6 +5,9 @@
#include "../daemon/common.h"
-void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass);
+#include "https_client.h"
+
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
+int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);
#endif /* ACLK_OTP_H */
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 71c63f64..3e2f88e4 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_query_queue.h"
@@ -233,23 +235,6 @@ void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(!aclk_shared_state.version_neg)) {
- if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
- ACLK_SHARED_STATE_UNLOCK;
- info("Waiting for ACLK Version Negotiation message from Cloud");
- sleep(1);
- continue;
- }
- errno = 0;
- error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
- " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
- aclk_shared_state.version_neg = ACLK_VERSION_MIN;
-// When ACLK v3 is implemented you will need this
-// aclk_set_rx_handlers(aclk_shared_state.version_neg);
- }
- ACLK_SHARED_STATE_UNLOCK;
-
aclk_query_process_msgs(info);
QUERY_THREAD_LOCK;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index fcb8d996..3d3ab5e2 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -166,81 +166,6 @@ error:
return 1;
}
-// This handles `version` message from cloud used to negotiate
-// protocol version we will use
-static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
-{
- UNUSED(raw_payload);
- int version = -1;
- errno = 0;
-
- if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
- error(
- "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
- ACLK_VERSION_NEG_VERSION,
- cloud_to_agent->version);
- return 1;
- }
- if (unlikely(!cloud_to_agent->min_version)) {
- error("Min version missing or 0");
- return 1;
- }
- if (unlikely(!cloud_to_agent->max_version)) {
- error("Max version missing or 0");
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
- error(
- "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
- cloud_to_agent->min_version);
- return 1;
- }
-
- if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
- error(
- "Agent too old for this cloud. Minimum version required by cloud %d."
- " Maximum version supported by this agent %d.",
- cloud_to_agent->min_version, ACLK_VERSION_MAX);
- aclk_kill_link = 1;
- aclk_disable_runtime = 1;
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
- error(
- "Cloud version is too old for this agent. Maximum version supported by cloud %d."
- " Minimum (oldest) version supported by this agent %d.",
- cloud_to_agent->max_version, ACLK_VERSION_MIN);
- aclk_kill_link = 1;
- return 1;
- }
-
- version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
-
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
- errno = 0;
- error("The \"version\" message came too late ignoring.");
- goto err_cleanup;
- }
- if (unlikely(aclk_shared_state.version_neg)) {
- errno = 0;
- error("Version has already been set to %d", aclk_shared_state.version_neg);
- goto err_cleanup;
- }
- aclk_shared_state.version_neg = version;
- ACLK_SHARED_STATE_UNLOCK;
-
- info("Choosing version %d of ACLK", version);
-
- aclk_set_rx_handlers(version);
-
- return 0;
-
-err_cleanup:
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
-}
-
typedef struct aclk_incoming_msg_type{
char *name;
int(*fnc)(struct aclk_request *, char *);
@@ -248,20 +173,11 @@ typedef struct aclk_incoming_msg_type{
aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-void aclk_set_rx_handlers(int version)
-{
-// ACLK_NG ACLK version support starts at 2
-// TODO ACLK v3
- UNUSED(version);
- aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-}
-
int aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
@@ -295,10 +211,6 @@ int aclk_handle_cloud_message(char *payload)
goto err_cleanup;
}
- if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
- error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
- goto err_cleanup;
- }
for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index c9f0bd37..e24252be 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -9,6 +9,5 @@
#include "libnetdata/libnetdata.h"
int aclk_handle_cloud_message(char *payload);
-void aclk_set_rx_handlers(int version);
#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index b61ac05f..a599cfda 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk_stats.h"
netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 158fc4e2..144008e4 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -13,8 +13,14 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ const char *topic = aclk_get_topic(subtopic);
- mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting mesage send");
+ return;
+ }
+
+ mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -30,8 +36,14 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ const char *topic = aclk_get_topic(subtopic);
- mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting mesage send");
+ return 0;
+ }
+
+ mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -199,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -241,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
// session.
if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -265,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
buffer_free(local_buffer);
}
-void aclk_hello_msg(mqtt_wss_client client)
-{
- json_object *tmp, *msg;
-
- char *msg_id = create_uuid();
-
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- ACLK_SHARED_STATE_UNLOCK;
-
- //Hello message is versioned separatelly from the rest of the protocol
- msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
-
- tmp = json_object_new_int(ACLK_VERSION_MIN);
- json_object_object_add(msg, "min-version", tmp);
-
- tmp = json_object_new_int(ACLK_VERSION_MAX);
- json_object_object_add(msg, "max-version", tmp);
-
-#ifdef ACLK_NG
- tmp = json_object_new_string("Next Generation");
-#else
- tmp = json_object_new_string("Legacy");
-#endif
- json_object_object_add(msg, "aclk-implementation", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
-
- json_object_put(msg);
- freez(msg_id);
-}
-
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -340,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
return;
}
- msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(msg, "payload", payload);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
@@ -352,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
{
// we create header here on purpose (and not send message with it already as `msg` param)
- // one is version_neg is guaranteed to be done here
- // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
+ // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
// send message with timestamps already to Query Queue they would be incorrect at time
// when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg);
+ json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(obj, "payload", msg);
aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index cb4d44c9..50c98169 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -10,8 +10,6 @@
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
-void aclk_hello_msg(mqtt_wss_client client);
-
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index a5347c46..b8ac6675 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "aclk_util.h"
#include <stdio.h>
@@ -10,6 +12,48 @@
#define UUID_STR_LEN 37
#endif
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
+ if (!strcmp(str, "json")) {
+ return ACLK_ENC_JSON;
+ }
+ if (!strcmp(str, "proto")) {
+ return ACLK_ENC_PROTO;
+ }
+ return ACLK_ENC_UNKNOWN;
+}
+
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
+ if (!strcmp(str, "MQTTv3")) {
+ return ACLK_TRP_MQTT_3_1_1;
+ }
+ if (!strcmp(str, "MQTTv5")) {
+ return ACLK_TRP_MQTT_5;
+ }
+ return ACLK_TRP_UNKNOWN;
+}
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
+ freez(trp_desc->endpoint);
+}
+
+void aclk_env_t_destroy(aclk_env_t *env) {
+ freez(env->auth_endpoint);
+ if (env->transports) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ if(env->transports[i]) {
+ aclk_transport_desc_t_destroy(env->transports[i]);
+ env->transports[i] = NULL;
+ }
+ }
+ freez(env->transports);
+ }
+ if (env->capabilities) {
+ for (size_t i = 0; i < env->capability_count; i++)
+ freez(env->capabilities[i]);
+ freez(env->capabilities);
+ }
+}
+
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
@@ -28,137 +72,246 @@ int aclk_get_conv_log_next()
#define ACLK_TOPIC_PREFIX "/agent/"
struct aclk_topic {
- const char *topic_suffix;
+ enum aclk_topics topic_id;
+ // as received from cloud - we keep this for
+ // eventual topic list update when claim_id changes
+ char *topic_recvd;
+ // constructed topic
char *topic;
};
// This helps to cache finalized topics (assembled with claim_id)
// to not have to alloc or create buffer and construct topic every
// time message is sent as in old ACLK
-static struct aclk_topic aclk_topic_cache[] = {
- { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART
- { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS
- { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA
- { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND
- { .topic_suffix = NULL, .topic = NULL }
-};
+static struct aclk_topic **aclk_topic_cache = NULL;
+static size_t aclk_topic_cache_items = 0;
void free_topic_cache(void)
{
- struct aclk_topic *tc = aclk_topic_cache;
- while (tc->topic_suffix) {
- if (tc->topic) {
- freez(tc->topic);
- tc->topic = NULL;
+ if (aclk_topic_cache) {
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ freez(aclk_topic_cache[i]->topic);
+ freez(aclk_topic_cache[i]->topic_recvd);
+ freez(aclk_topic_cache[i]);
}
- tc++;
+ freez(aclk_topic_cache);
+ aclk_topic_cache = NULL;
+ aclk_topic_cache_items = 0;
}
}
-static inline void generate_topic_cache(void)
-{
- struct aclk_topic *tc = aclk_topic_cache;
- char *ptr;
- if (unlikely(!tc->topic)) {
- rrdhost_aclk_state_lock(localhost);
- while(tc->topic_suffix) {
- tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix));
- ptr = tc->topic;
- strcpy(ptr, ACLK_TOPIC_PREFIX);
- ptr += strlen(ACLK_TOPIC_PREFIX);
- strcpy(ptr, localhost->aclk_state.claimed_id);
- ptr += (UUID_STR_LEN - 1);
- *ptr++ = '/';
- strcpy(ptr, tc->topic_suffix);
- tc++;
+#define JSON_TOPIC_KEY_TOPIC "topic"
+#define JSON_TOPIC_KEY_NAME "name"
+
+struct topic_name {
+ enum aclk_topics id;
+ // cloud name - how is it called
+ // in answer to /password endpoint
+ const char *name;
+} topic_names[] = {
+ { .id = ACLK_TOPICID_CHART, .name = "chart" },
+ { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
+ { .id = ACLK_TOPICID_METADATA, .name = "meta" },
+ { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
+ { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
+};
+
+enum aclk_topics compulsory_topics[] = {
+ ACLK_TOPICID_CHART,
+ ACLK_TOPICID_ALARMS,
+ ACLK_TOPICID_METADATA,
+ ACLK_TOPICID_COMMAND,
+ ACLK_TOPICID_UNKNOWN
+};
+
+static enum aclk_topics topic_name_to_id(const char *name) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (!strcmp(topic->name, name)) {
+ return topic->id;
}
+ topic++;
+ }
+ return ACLK_TOPICID_UNKNOWN;
+}
+
+static const char *topic_id_to_name(enum aclk_topics tid) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (topic->id == tid)
+ return topic->name;
+ topic++;
+ }
+ return "unknown";
+}
+
+#define CLAIM_ID_REPLACE_TAG "#{claim_id}"
+static void topic_generate_final(struct aclk_topic *t) {
+ char *dest;
+ char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
+ if (!replace_tag)
+ return;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("This should never be called if agent not claimed");
rrdhost_aclk_state_unlock(localhost);
+ return;
}
+
+ t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id));
+ memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd);
+ dest = t->topic + (replace_tag - t->topic_recvd);
+
+ memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id));
+ dest += strlen(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ replace_tag += strlen(CLAIM_ID_REPLACE_TAG);
+ strcpy(dest, replace_tag);
+ dest += strlen(replace_tag);
+ *dest = 0;
}
-/*
- * Build a topic based on sub_topic and final_topic
- * if the sub topic starts with / assume that is an absolute topic
- *
- */
-const char *aclk_get_topic(enum aclk_topics topic)
+static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic)
{
- generate_topic_cache();
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (topic->topic_id == ACLK_TOPICID_UNKNOWN) {
+ info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
- return aclk_topic_cache[topic].topic;
+ if (!topic->topic_recvd) {
+ error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC);
+ return 1;
+ }
+
+ topic_generate_final(topic);
+ aclk_topic_cache_items++;
+
+ return 0;
}
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
+int aclk_generate_topic_cache(struct json_object *json)
{
- int pos = 0;
- if (!strncmp("https://", url, 8)) {
- pos = 8;
- } else if (!strncmp("http://", url, 7)) {
- error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
+ json_object *obj;
+
+ size_t array_size = json_object_array_length(json);
+ if (!array_size) {
+ error("Empty topic list!");
return 1;
}
- int host_end = pos;
- while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
- host_end++;
- if (url[host_end] == 0) {
- *aclk_hostname = strdupz(url + pos);
- *aclk_port = 443;
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
- }
- if (url[host_end] == ':') {
- *aclk_hostname = callocz(host_end - pos + 1, 1);
- strncpy(*aclk_hostname, url + pos, host_end - pos);
- int port_end = host_end + 1;
- while (url[port_end] >= '0' && url[port_end] <= '9')
- port_end++;
- if (port_end - host_end > 6) {
- error("Port specified in %s is invalid", url);
- freez(*aclk_hostname);
- *aclk_hostname = NULL;
+
+ if (aclk_topic_cache)
+ free_topic_cache();
+
+ aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *));
+
+ for (size_t i = 0; i < array_size; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_object) {
+ error("expected json_type_object");
+ return 1;
+ }
+ aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic));
+ if (topic_cache_add_topic(obj, aclk_topic_cache[i])) {
+ error("failed to parse topic @idx=%d", (int)i);
return 1;
}
- *aclk_port = atoi(&url[host_end+1]);
}
- if (url[host_end] == '/') {
- *aclk_port = 443;
- *aclk_hostname = callocz(1, host_end - pos + 1);
- strncpy(*aclk_hostname, url+pos, host_end - pos);
+
+ for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
+ if (!aclk_get_topic(compulsory_topics[i])) {
+ error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
+ return 1;
+ }
}
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+
return 0;
}
/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+const char *aclk_get_topic(enum aclk_topics topic)
+{
+ if (!aclk_topic_cache) {
+ error("Topic cache not initialized");
+ return NULL;
+ }
+
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ if (aclk_topic_cache[i]->topic_id == topic)
+ return aclk_topic_cache[i]->topic;
+ }
+ error("Unknown topic");
+ return NULL;
+}
+
+/*
* TBEB with randomness
*
- * @param mode 0 - to reset the delay,
- * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * @param reset 1 - to reset the delay,
+ * 0 - to advance a step and calculate sleep time in ms
+ * @param min, max in seconds
* @returns delay in ms
*
*/
-#define ACLK_MAX_BACKOFF_DELAY 1024
-unsigned long int aclk_reconnect_delay(int mode)
-{
- static int fail = -1;
- unsigned long int delay;
- if (!mode || fail == -1) {
- srandom(time(NULL));
- fail = mode - 1;
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) {
+ static int attempt = -1;
+
+ if (reset) {
+ attempt = -1;
return 0;
}
- delay = (1 << fail);
+ attempt++;
- if (delay >= ACLK_MAX_BACKOFF_DELAY) {
- delay = ACLK_MAX_BACKOFF_DELAY * 1000;
- } else {
- fail++;
- delay *= 1000;
- delay += (random() % (MAX(1000, delay/2)));
+ if (attempt == 0) {
+ srandom(time(NULL));
+ return 0;
}
+ unsigned long int delay = pow(base, attempt - 1);
+ delay *= MSEC_PER_SEC;
+
+ delay += (random() % (MAX(1000, delay/2)));
+
+ if (delay <= min * MSEC_PER_SEC)
+ return min;
+
+ if (delay >= max * MSEC_PER_SEC)
+ return max;
+
return delay;
}
@@ -345,3 +498,43 @@ const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
*type = proxy_type;
return proxy;
}
+
+#define HTTP_PROXY_PREFIX "http://"
+void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type)
+{
+ ACLK_PROXY_TYPE pt;
+ const char *ptr = aclk_get_proxy(&pt);
+ char *tmp;
+ char *host;
+ if (pt != PROXY_TYPE_HTTP)
+ return;
+
+ *port = 0;
+
+ if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
+ ptr += strlen(HTTP_PROXY_PREFIX);
+
+ if ((tmp = strchr(ptr, '@')))
+ ptr = tmp;
+
+ if ((tmp = strchr(ptr, '/'))) {
+ host = mallocz((tmp - ptr) + 1);
+ memcpy(host, ptr, (tmp - ptr));
+ host[tmp - ptr] = 0;
+ } else
+ host = strdupz(ptr);
+
+ if ((tmp = strchr(host, ':'))) {
+ *tmp = 0;
+ tmp++;
+ *port = atoi(tmp);
+ }
+
+ if (*port <= 0 || *port > 65535)
+ *port = 8080;
+
+ *ohost = host;
+
+ if (type)
+ *type = MQTT_WSS_PROXY_HTTP;
+}
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index c7232979..03b22e40 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -3,20 +3,63 @@
#define ACLK_UTIL_H
#include "libnetdata/libnetdata.h"
+#include "mqtt_wss_client.h"
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
+typedef enum {
+ ACLK_ENC_UNKNOWN = 0,
+ ACLK_ENC_JSON,
+ ACLK_ENC_PROTO
+} aclk_encoding_type_t;
+
+typedef enum {
+ ACLK_TRP_UNKNOWN = 0,
+ ACLK_TRP_MQTT_3_1_1,
+ ACLK_TRP_MQTT_5
+} aclk_transport_type_t;
+
+typedef struct {
+ char *endpoint;
+ aclk_transport_type_t type;
+} aclk_transport_desc_t;
+
+typedef struct {
+ int base;
+ int max_s;
+ int min_s;
+} aclk_backoff_t;
+
+typedef struct {
+ char *auth_endpoint;
+ aclk_encoding_type_t encoding;
+
+ aclk_transport_desc_t **transports;
+ size_t transport_count;
+
+ char **capabilities;
+ size_t capability_count;
+
+ aclk_backoff_t backoff;
+} aclk_env_t;
+
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str);
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str);
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
+void aclk_env_t_destroy(aclk_env_t *env);
enum aclk_topics {
- ACLK_TOPICID_CHART = 0,
- ACLK_TOPICID_ALARMS = 1,
- ACLK_TOPICID_METADATA = 2,
- ACLK_TOPICID_COMMAND = 3,
+ ACLK_TOPICID_UNKNOWN = 0,
+ ACLK_TOPICID_CHART = 1,
+ ACLK_TOPICID_ALARMS = 2,
+ ACLK_TOPICID_METADATA = 3,
+ ACLK_TOPICID_COMMAND = 4
};
const char *aclk_get_topic(enum aclk_topics topic);
+int aclk_generate_topic_cache(struct json_object *json);
void free_topic_cache(void);
// TODO
// aclk_topics_reload //when claim id changes
@@ -32,7 +75,8 @@ int aclk_get_conv_log_next();
#endif
#endif
-unsigned long int aclk_reconnect_delay(int mode);
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
+#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
@@ -46,7 +90,8 @@ const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
+void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type);
+
#endif /* ACLK_UTIL_H */
diff --git a/aclk/https_client.c b/aclk/https_client.c
index 1b9546d7..907f512b 100644
--- a/aclk/https_client.c
+++ b/aclk/https_client.c
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#include "libnetdata/libnetdata.h"
#include "https_client.h"
@@ -10,6 +12,19 @@ enum http_parse_state {
HTTP_PARSE_CONTENT
};
+static const char *http_req_type_to_str(http_req_type_t req) {
+ switch (req) {
+ case HTTP_REQ_GET:
+ return "GET";
+ case HTTP_REQ_POST:
+ return "POST";
+ case HTTP_REQ_CONNECT:
+ return "CONNECT";
+ default:
+ return "unknown";
+ }
+}
+
typedef struct {
enum http_parse_state state;
int content_length;
@@ -17,6 +32,13 @@ typedef struct {
} http_parse_ctx;
#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
+static inline void http_parse_ctx_clear(http_parse_ctx *ctx) {
+ ctx->state = HTTP_PARSE_INITIAL;
+ ctx->content_length = -1;
+ ctx->http_code = 0;
+}
+
+#define POLL_TO_MS 100
#define NEED_MORE_DATA 0
#define PARSE_SUCCESS 1
@@ -71,8 +93,6 @@ static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx)
rbuf_pop(buf, buf_val, idx_end);
buf_val[idx_end] = 0;
- rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
-
for (ptr = buf_key; *ptr; ptr++)
*ptr = tolower(*ptr);
@@ -129,10 +149,10 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
break;
case HTTP_PARSE_CONTENT:
- if (parse_ctx->content_length < 0) {
- error("content-length missing and http headers ended");
- return PARSE_ERROR;
- }
+ // replies like CONNECT etc. do not have content
+ if (parse_ctx->content_length < 0)
+ return PARSE_SUCCESS;
+
if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length)
return PARSE_SUCCESS;
return NEED_MORE_DATA;
@@ -140,107 +160,493 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
} while(1);
}
-int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload)
+typedef struct https_req_ctx {
+ https_req_t *request;
+
+ int sock;
+ rbuf_t buf_rx;
+
+ struct pollfd poll_fd;
+
+ SSL_CTX *ssl_ctx;
+ SSL *ssl;
+
+ size_t written;
+
+ int self_signed_allowed;
+
+ http_parse_ctx parse_ctx;
+
+ time_t req_start_time;
+} https_req_ctx_t;
+
+static int https_req_check_timedout(https_req_ctx_t *ctx) {
+ if (now_realtime_sec() > ctx->req_start_time + ctx->request->timeout_s) {
+ error("request timed out");
+ return 1;
+ }
+ return 0;
+}
+
+static char *_ssl_err_tos(int err)
{
- struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
- char sport[PORT_STR_MAX_BYTES];
- size_t len = 0;
- int rc = 1;
+ switch(err){
+ case SSL_ERROR_SSL:
+ return "SSL_ERROR_SSL";
+ case SSL_ERROR_WANT_READ:
+ return "SSL_ERROR_WANT_READ";
+ case SSL_ERROR_WANT_WRITE:
+ return "SSL_ERROR_WANT_WRITE";
+ case SSL_ERROR_NONE:
+ return "SSL_ERROR_NONE";
+ case SSL_ERROR_ZERO_RETURN:
+ return "SSL_ERROR_ZERO_RETURN";
+ case SSL_ERROR_WANT_CONNECT:
+ return "SSL_ERROR_WANT_CONNECT";
+ case SSL_ERROR_WANT_ACCEPT:
+ return "SSL_ERROR_WANT_ACCEPT";
+ }
+ return "Unknown!!!";
+}
+
+static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ ctx->written = 0;
+ ctx->poll_fd.events = POLLOUT;
+
+ do {
+ int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ continue;
+ }
+
+ ret = write(ctx->sock, &data[ctx->written], data_len - ctx->written);
+ if (ret > 0) {
+ ctx->written += ret;
+ } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("Error writing to socket");
+ return 3;
+ }
+ } while (ctx->written < data_len);
+
+ return 0;
+}
+
+static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ ctx->written = 0;
+ ctx->poll_fd.events |= POLLOUT;
+
+ do {
+ int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ continue;
+ }
+ ctx->poll_fd.events = 0;
+
+ ret = SSL_write(ctx->ssl, &data[ctx->written], data_len - ctx->written);
+ if (ret > 0) {
+ ctx->written += ret;
+ } else {
+ ret = SSL_get_error(ctx->ssl, ret);
+ switch (ret) {
+ case SSL_ERROR_WANT_READ:
+ ctx->poll_fd.events |= POLLIN;
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ ctx->poll_fd.events |= POLLOUT;
+ break;
+ default:
+ error("SSL_write Err: %s", _ssl_err_tos(ret));
+ return 3;
+ }
+ }
+ } while (ctx->written < data_len);
+
+ return 0;
+}
+
+static inline int https_client_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ if (ctx->ssl_ctx)
+ return ssl_write_all(ctx, data, data_len);
+ return socket_write_all(ctx, data, data_len);
+}
+
+static int read_parse_response(https_req_ctx_t *ctx) {
int ret;
char *ptr;
- http_parse_ctx parse_ctx = HTTP_PARSE_CTX_INITIALIZER;
+ size_t size;
- rbuf_t buffer = rbuf_create(b_size);
- if (!buffer)
+ ctx->poll_fd.events = POLLIN;
+ do {
+ ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ continue;
+ }
+ ctx->poll_fd.events = 0;
+
+ ptr = rbuf_get_linear_insert_range(ctx->buf_rx, &size);
+
+ if (ctx->ssl_ctx)
+ ret = SSL_read(ctx->ssl, ptr, size);
+ else
+ ret = read(ctx->sock, ptr, size);
+
+ if (ret > 0) {
+ rbuf_bump_head(ctx->buf_rx, ret);
+ } else {
+ if (ctx->ssl_ctx) {
+ ret = SSL_get_error(ctx->ssl, ret);
+ switch (ret) {
+ case SSL_ERROR_WANT_READ:
+ ctx->poll_fd.events |= POLLIN;
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ ctx->poll_fd.events |= POLLOUT;
+ break;
+ default:
+ error("SSL_read Err: %s", _ssl_err_tos(ret));
+ return 3;
+ }
+ } else {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("write error");
+ return 3;
+ }
+ ctx->poll_fd.events |= POLLIN;
+ }
+ }
+ } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx)));
+
+ if (ret != PARSE_SUCCESS) {
+ error("Error parsing HTTP response");
return 1;
+ }
- snprintf(sport, PORT_STR_MAX_BYTES, "%d", port);
+ return 0;
+}
- if (payload != NULL)
- len = strlen(payload);
+#define TX_BUFFER_SIZE 8192
+#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2)
+static int handle_http_request(https_req_ctx_t *ctx) {
+ BUFFER *hdr = buffer_create(TX_BUFFER_SIZE);
+ int rc = 0;
+
+ http_parse_ctx_clear(&ctx->parse_ctx);
+
+ // Prepare data to send
+ switch (ctx->request->request_type) {
+ case HTTP_REQ_CONNECT:
+ buffer_strcat(hdr, "CONNECT ");
+ break;
+ case HTTP_REQ_GET:
+ buffer_strcat(hdr, "GET ");
+ break;
+ case HTTP_REQ_POST:
+ buffer_strcat(hdr, "POST ");
+ break;
+ default:
+ error("Unknown HTTPS request type!");
+ rc = 1;
+ goto err_exit;
+ }
+
+ if (ctx->request->request_type == HTTP_REQ_CONNECT) {
+ buffer_strcat(hdr, ctx->request->host);
+ buffer_sprintf(hdr, ":%d", ctx->request->port);
+ } else {
+ buffer_strcat(hdr, ctx->request->url);
+ }
+
+ buffer_strcat(hdr, " HTTP/1.1\x0D\x0A");
- snprintf(
- b,
- b_size,
- "%s %s HTTP/1.1\r\nHost: %s\r\nAccept: application/json\r\nContent-length: %zu\r\nAccept-Language: en-us\r\n"
- "User-Agent: Netdata/rocks\r\n\r\n",
- (method == HTTP_REQ_GET ? "GET" : "POST"), url, host, len);
+ //TODO Headers!
+ if (ctx->request->request_type != HTTP_REQ_CONNECT) {
+ buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host);
+ }
+ buffer_strcat(hdr, "User-Agent: Netdata/rocks newhttpclient\x0D\x0A");
- if (payload != NULL)
- strncat(b, payload, b_size - len);
+ if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size);
+ }
- len = strlen(b);
+ buffer_strcat(hdr, "\x0D\x0A");
- debug(D_ACLK, "Sending HTTPS req (%zu bytes): '%s'", len, b);
- int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, sport, &timeout);
+ // Send the request
+ if (https_client_write_all(ctx, hdr->buffer, hdr->len)) {
+ error("Couldn't write HTTP request header into SSL connection");
+ rc = 2;
+ goto err_exit;
+ }
- if (unlikely(sock == -1)) {
- error("Handshake failed");
- goto exit_buf;
+ if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) {
+ error("Couldn't write payload into SSL connection");
+ rc = 3;
+ goto err_exit;
+ }
}
- SSL_CTX *ctx = security_initialize_openssl_client();
- if (ctx==NULL) {
+ // Read The Response
+ if (read_parse_response(ctx)) {
+ error("Error reading or parsing response from server");
+ rc = 4;
+ goto err_exit;
+ }
+
+err_exit:
+ buffer_free(hdr);
+ return rc;
+}
+
+int https_request(https_req_t *request, https_req_response_t *response) {
+ int rc = 1, ret;
+ char connect_port_str[PORT_STR_MAX_BYTES];
+
+ const char *connect_host = request->proxy_host ? request->proxy_host : request->host;
+ int connect_port = request->proxy_host ? request->proxy_port : request->port;
+ struct timeval timeout = { .tv_sec = request->timeout_s, .tv_usec = 0 };
+
+ https_req_ctx_t *ctx = callocz(1, sizeof(https_req_ctx_t));
+ ctx->req_start_time = now_realtime_sec();
+
+ ctx->buf_rx = rbuf_create(RX_BUFFER_SIZE);
+ if (!ctx->buf_rx) {
+ error("Couldn't allocate buffer for RX data");
+ goto exit_req_ctx;
+ }
+
+ snprintf(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port);
+
+ ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout);
+ if (ctx->sock < 0) {
+ error("Error connecting TCP socket to \"%s\"", connect_host);
+ goto exit_buf_rx;
+ }
+
+ if (fcntl(ctx->sock, F_SETFL, fcntl(ctx->sock, F_GETFL, 0) | O_NONBLOCK) == -1) {
+ error("Error setting O_NONBLOCK to TCP socket.");
+ goto exit_sock;
+ }
+
+ ctx->poll_fd.fd = ctx->sock;
+
+ // Do the CONNECT if proxy is used
+ if (request->proxy_host) {
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ req.request_type = HTTP_REQ_CONNECT;
+ req.timeout_s = request->timeout_s;
+ req.host = request->host;
+ req.port = request->port;
+ req.url = request->url;
+ ctx->request = &req;
+ if (handle_http_request(ctx)) {
+ error("Failed to CONNECT with proxy");
+ goto exit_sock;
+ }
+ if (ctx->parse_ctx.http_code != 200) {
+ error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code);
+ goto exit_sock;
+ }
+ info("Proxy accepted CONNECT upgrade");
+ }
+ ctx->request = request;
+
+ ctx->ssl_ctx = security_initialize_openssl_client();
+ if (ctx->ssl_ctx==NULL) {
error("Cannot allocate SSL context");
goto exit_sock;
}
- // Certificate chain: not updating the stores - do we need private CA roots?
- // Calls to SSL_CTX_load_verify_locations would go here.
- SSL *ssl = SSL_new(ctx);
- if (ssl==NULL) {
+
+ ctx->ssl = SSL_new(ctx->ssl_ctx);
+ if (ctx->ssl==NULL) {
error("Cannot allocate SSL");
goto exit_CTX;
}
- SSL_set_fd(ssl, sock);
- ret = SSL_connect(ssl);
- if (ret != 1) {
- error("SSL_connect() failed with err=%d", ret);
+
+ SSL_set_fd(ctx->ssl, ctx->sock);
+ ret = SSL_connect(ctx->ssl);
+ if (ret != -1 && ret != 1) {
+ error("SSL could not connect");
goto exit_SSL;
}
+ if (ret == -1) {
+ // expected as underlying socket is non blocking!
+ // consult SSL_connect documentation for details
+ int ec = SSL_get_error(ctx->ssl, ret);
+ if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
+ error("Failed to start SSL connection");
+ goto exit_SSL;
+ }
+ }
- ret = SSL_write(ssl, b, len);
- if (ret <= 0)
- {
- error("SSL_write() failed with err=%d", ret);
+ // The actual request here
+ if (handle_http_request(ctx)) {
+ error("Couldn't process request");
goto exit_SSL;
}
+ response->http_code = ctx->parse_ctx.http_code;
+ if (ctx->parse_ctx.content_length > 0) {
+ response->payload_size = ctx->parse_ctx.content_length;
+ response->payload = mallocz(response->payload_size + 1);
+ ret = rbuf_pop(ctx->buf_rx, response->payload, response->payload_size);
+ if (ret != (int)response->payload_size) {
+ error("Payload size doesn't match remaining data on the buffer!");
+ response->payload_size = ret;
+ }
+ // normally we take payload as it is and copy it
+ // but for convenience in cases where payload is sth. like
+ // json we add terminating zero so that user of the data
+ // doesn't have to convert to C string (0 terminated)
+ // other uses still have correct payload_size and can copy
+ // only exact data without affixed 0x00
+ ((char*)response->payload)[response->payload_size] = 0; // mallocz(response->payload_size + 1);
+ }
+ info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code);
- b[0] = 0;
+ rc = 0;
- do {
- ptr = rbuf_get_linear_insert_range(buffer, &len);
- ret = SSL_read(ssl, ptr, len - 1);
- if (ret)
- rbuf_bump_head(buffer, ret);
- if (ret <= 0)
- {
- error("No response available - SSL_read()=%d", ret);
- goto exit_FULL;
+exit_SSL:
+ SSL_free(ctx->ssl);
+exit_CTX:
+ SSL_CTX_free(ctx->ssl_ctx);
+exit_sock:
+ close(ctx->sock);
+exit_buf_rx:
+ rbuf_free(ctx->buf_rx);
+exit_req_ctx:
+ freez(ctx);
+ return rc;
+}
+
+void https_req_response_free(https_req_response_t *res) {
+ freez(res->payload);
+}
+
+void https_req_response_init(https_req_response_t *res) {
+ res->http_code = 0;
+ res->payload = NULL;
+ res->payload_size = 0;
+}
+
+static inline char *min_non_null(char *a, char *b) {
+ if (!a)
+ return b;
+ if (!b)
+ return a;
+ return (a < b ? a : b);
+}
+
+#define URI_PROTO_SEPARATOR "://"
+#define URL_PARSER_LOG_PREFIX "url_parser "
+
+static int parse_host_port(url_t *url) {
+ char *ptr = strrchr(url->host, ':');
+ if (ptr) {
+ size_t port_len = strlen(ptr + 1);
+ if (!port_len) {
+ error(URL_PARSER_LOG_PREFIX ": specified but no port number");
+ return 1;
+ }
+ if (port_len > 5 /* MAX port lenght is 5digit long in decimal */) {
+ error(URL_PARSER_LOG_PREFIX "port # is too long");
+ return 1;
}
- } while (!(ret = parse_http_response(buffer, &parse_ctx)));
+ *ptr = 0;
+ if (!strlen(url->host)) {
+ error(URL_PARSER_LOG_PREFIX "host empty after removing port");
+ return 1;
+ }
+ url->port = atoi (ptr + 1);
+ }
+ return 0;
+}
- if (ret != PARSE_SUCCESS) {
- error("Error parsing HTTP response");
- goto exit_FULL;
+static inline void port_by_proto(url_t *url) {
+ if (url->port)
+ return;
+ if (!url->proto)
+ return;
+ if (!strcmp(url->proto, "http")) {
+ url->port = 80;
+ return;
+ }
+ if (!strcmp(url->proto, "https")) {
+ url->port = 443;
+ return;
}
+}
- if (parse_ctx.http_code < 200 || parse_ctx.http_code >= 300) {
- error("HTTP Response not Success (got %d)", parse_ctx.http_code);
- goto exit_FULL;
+#define STRDUPZ_2PTR(dest, start, end) \
+ { \
+ dest = mallocz(1 + end - start); \
+ memcpy(dest, start, end - start); \
+ dest[end - start] = 0; \
}
- len = rbuf_pop(buffer, b, b_size);
- b[MIN(len, b_size-1)] = 0;
+int url_parse(const char *url, url_t *parsed) {
+ const char *start = url;
+ const char *end = strstr(url, URI_PROTO_SEPARATOR);
- rc = 0;
-exit_FULL:
-exit_SSL:
- SSL_free(ssl);
-exit_CTX:
- SSL_CTX_free(ctx);
-exit_sock:
- close(sock);
-exit_buf:
- rbuf_free(buffer);
- return rc;
+ if (end) {
+ if (end == start) {
+ error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->proto, start, end)
+ start = end + strlen(URI_PROTO_SEPARATOR);
+ }
+
+ end = strchr(start, '/');
+ if (!end)
+ end = start + strlen(start);
+
+ if (start == end) {
+ error(URL_PARSER_LOG_PREFIX "Host empty");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->host, start, end);
+
+ if (parse_host_port(parsed))
+ return 1;
+
+ if (!*end) {
+ parsed->path = strdupz("/");
+ port_by_proto(parsed);
+ return 0;
+ }
+
+ parsed->path = strdupz(end);
+ port_by_proto(parsed);
+ return 0;
+}
+
+void url_t_destroy(url_t *url) {
+ freez(url->host);
+ freez(url->path);
+ freez(url->proto);
}
diff --git a/aclk/https_client.h b/aclk/https_client.h
index 0d2e0dba..f7bc3d43 100644
--- a/aclk/https_client.h
+++ b/aclk/https_client.h
@@ -1,11 +1,78 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#ifndef NETDATA_HTTPS_CLIENT_H
#define NETDATA_HTTPS_CLIENT_H
+#include "libnetdata/libnetdata.h"
+
typedef enum http_req_type {
- HTTP_REQ_GET,
- HTTP_REQ_POST
+ HTTP_REQ_GET = 0,
+ HTTP_REQ_POST,
+ HTTP_REQ_CONNECT
} http_req_type_t;
-int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload);
+typedef struct {
+ http_req_type_t request_type;
+
+ char *host;
+ int port;
+ char *url;
+
+ time_t timeout_s; //timeout in seconds for the network operation (send/recv)
+
+ void *payload;
+ size_t payload_size;
+
+ char *proxy_host;
+ int proxy_port;
+} https_req_t;
+
+typedef struct {
+ int http_code;
+
+ void *payload;
+ size_t payload_size;
+} https_req_response_t;
+
+
+// Non feature complete URL parser
+// feel free to extend when needed
+// currently implements only what ACLK
+// needs
+// proto://host[:port]/path
+typedef struct {
+ char *proto;
+ char *host;
+ int port;
+ char* path;
+} url_t;
+
+int url_parse(const char *url, url_t *parsed);
+void url_t_destroy(url_t *url);
+
+void https_req_response_free(https_req_response_t *res);
+void https_req_response_init(https_req_response_t *res);
+
+#define HTTPS_REQ_RESPONSE_T_INITIALIZER \
+ { \
+ .http_code = 0, \
+ .payload = NULL, \
+ .payload_size = 0 \
+ }
+
+#define HTTPS_REQ_T_INITIALIZER \
+ { \
+ .request_type = HTTP_REQ_GET, \
+ .host = NULL, \
+ .port = 443, \
+ .url = NULL, \
+ .timeout_s = 30, \
+ .payload = NULL, \
+ .payload_size = 0, \
+ .proxy_host = NULL, \
+ .proxy_port = 8080 \
+ }
+
+int https_request(https_req_t *request, https_req_response_t *response);
#endif /* NETDATA_HTTPS_CLIENT_H */
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
index 43455393..96f95545 100644
--- a/aclk/legacy/aclk_common.c
+++ b/aclk/legacy/aclk_common.c
@@ -100,7 +100,7 @@ static inline void safe_log_proxy_error(char *str, const char *proxy)
freez(log);
}
-static inline int check_socks_enviroment(const char **proxy)
+static inline int check_socks_environment(const char **proxy)
{
char *tmp = getenv("socks_proxy");
@@ -118,7 +118,7 @@ static inline int check_socks_enviroment(const char **proxy)
return 1;
}
-static inline int check_http_enviroment(const char **proxy)
+static inline int check_http_environment(const char **proxy)
{
char *tmp = getenv("http_proxy");
@@ -145,7 +145,7 @@ const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
return proxy;
if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
- if (check_socks_enviroment(&proxy) == 0) {
+ if (check_socks_environment(&proxy) == 0) {
#ifdef LWS_WITH_SOCKS5
*type = PROXY_TYPE_SOCKS5;
return proxy;
@@ -156,7 +156,7 @@ const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
proxy);
#endif
}
- if (check_http_enviroment(&proxy) == 0)
+ if (check_http_environment(&proxy) == 0)
*type = PROXY_TYPE_HTTP;
return proxy;
}
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
index df221dd6..f73902b3 100644
--- a/aclk/legacy/aclk_lws_wss_client.c
+++ b/aclk/legacy/aclk_lws_wss_client.c
@@ -428,7 +428,7 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
// Callback servicing is forced when we are closed from above.
if (engine_instance->upstream_reconnect_request) {
- error("Closing lws connectino due to libmosquitto error.");
+ error("Closing lws connection due to libmosquitto error.");
char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
lws_close_reason(
wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error,
@@ -609,7 +609,7 @@ void aclk_lws_wss_service_loop()
// in case the MQTT connection disconnect while lws transport is still operational
// we should drop connection and reconnect
// this function should be called when that happens to notify lws of that situation
-void aclk_lws_wss_mqtt_layer_disconect_notif()
+void aclk_lws_wss_mqtt_layer_disconnect_notif()
{
if (!engine_instance)
return;
diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h
index 584a3cf4..eb99ee02 100644
--- a/aclk/legacy/aclk_lws_wss_client.h
+++ b/aclk/legacy/aclk_lws_wss_client.h
@@ -8,9 +8,9 @@
#include "libnetdata/libnetdata.h"
// This is as define because ideally the ACLK at high level
-// can do mosqitto writes and reads only from one thread
+// can do mosquitto writes and reads only from one thread
// which is cleaner implementation IMHO
-// in such case this mutexes are not necessarry and life
+// in such case this mutexes are not necessary and life
// is simpler
#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
@@ -78,7 +78,7 @@ int aclk_lws_wss_client_write(void *buf, size_t count);
int aclk_lws_wss_client_read(void *buf, size_t count);
void aclk_lws_wss_service_loop();
-void aclk_lws_wss_mqtt_layer_disconect_notif();
+void aclk_lws_wss_mqtt_layer_disconnect_notif();
// Notifications inside the layer above
void aclk_lws_connection_established();
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
index 27ad9ac1..040068e8 100644
--- a/aclk/legacy/aclk_query.c
+++ b/aclk/legacy/aclk_query.c
@@ -498,7 +498,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
z_buffer->len += bytes_to_cpy;
} while(z_ret != Z_STREAM_END);
// so that web_client_build_http_header
- // puts correct content lenght into header
+ // puts correct content length into header
buffer_free(w->response.data);
w->response.data = z_buffer;
z_buffer = NULL;
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 2681445b..68dad81e 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -218,7 +218,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
error(
- "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
+ "Unsupported version of \"version\" message from cloud. Expected %d, Got %d",
ACLK_VERSION_NEG_VERSION,
cloud_to_agent->version);
return 1;
@@ -353,7 +353,7 @@ int aclk_handle_cloud_message(char *payload)
// see what `aclk_queue_query` parameter `internal` does
// NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
- // msg handlers (namely aclk_handle_version_responce)
+ // msg handlers (namely aclk_handle_version_response)
// can freely change what aclk_incoming_msg_types points to
// so either exit or restart this for loop
freez(cloud_to_agent.type_id);
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
index 7124380a..88679cb3 100644
--- a/aclk/legacy/aclk_stats.c
+++ b/aclk/legacy/aclk_stats.c
@@ -123,7 +123,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s",
+ "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "KiB/s",
"netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -145,7 +145,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s",
+ "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "KiB/s",
"netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
index 5767df3a..5ed7e66a 100644
--- a/aclk/legacy/agent_cloud_link.c
+++ b/aclk/legacy/agent_cloud_link.c
@@ -653,7 +653,7 @@ static void aclk_graceful_disconnect()
aclk_shutting_down = 1;
_link_shutdown();
- aclk_lws_wss_mqtt_layer_disconect_notif();
+ aclk_lws_wss_mqtt_layer_disconnect_notif();
write_q = 1;
event_loop_timeout = now_realtime_sec() + 5;
@@ -937,7 +937,7 @@ static void aclk_try_to_connect(char *hostname, int port)
{
int rc;
-// this is usefull for developers working on ACLK
+// this is useful for developers working on ACLK
// allows connecting agent to any MQTT broker
// for debugging, development and testing purposes
#ifndef ACLK_DISABLE_CHALLENGE
@@ -986,7 +986,7 @@ static inline void aclk_hello_msg()
aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
ACLK_SHARED_STATE_UNLOCK;
- //Hello message is versioned separatelly from the rest of the protocol
+ //Hello message is versioned separately from the rest of the protocol
aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
@@ -1211,7 +1211,7 @@ exited:
/*
* this must be last -> if all static threads signal
* THREAD_EXITED rrdengine will dealloc the RRDSETs
- * and RRDDIMs that are used by still runing stat thread.
+ * and RRDDIMs that are used by still running stat thread.
* see netdata_cleanup_and_exit() for reference
*/
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
@@ -1555,7 +1555,7 @@ void aclk_single_update_enable()
aclk_disable_single_updates = 0;
}
-// Trigged by a health reload, sends the alarm metadata
+// Triggered by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
if (unlikely(aclk_host_initializing(localhost)))
diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h
index e777e0b1..bfcfef8e 100644
--- a/aclk/legacy/agent_cloud_link.h
+++ b/aclk/legacy/agent_cloud_link.h
@@ -24,7 +24,7 @@
#define ACLK_MAX_TOPIC 255
-#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now
+#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff strategy for now
#define ACLK_DEFAULT_PORT 9002
#define ACLK_DEFAULT_HOST "localhost"
@@ -57,7 +57,7 @@ extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id);
extern char *is_agent_claimed(void);
-extern void aclk_lws_wss_mqtt_layer_disconect_notif();
+extern void aclk_lws_wss_mqtt_layer_disconnect_notif();
char *create_uuid();
// callbacks for agent cloud link
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c
index 6f38a20d..74f77455 100644
--- a/aclk/legacy/mqtt.c
+++ b/aclk/legacy/mqtt.c
@@ -55,7 +55,7 @@ void connect_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(obj);
UNUSED(rc);
- info("Connection to cloud estabilished");
+ info("Connection to cloud established");
aclk_connect();
return;
@@ -75,7 +75,7 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
}
aclk_disconnect();
- aclk_lws_wss_mqtt_layer_disconect_notif();
+ aclk_lws_wss_mqtt_layer_disconnect_notif();
return;
}
@@ -170,7 +170,7 @@ static int _mqtt_create_connection(char *username, char *password)
int rc = mosquitto_threaded_set(mosq, 1);
if (unlikely(rc != MOSQ_ERR_SUCCESS))
- error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
+ error("Failed to tune the thread model for libmosquitto (%s)", mosquitto_strerror(rc));
#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py
index 20ab523d..14e99b65 100644
--- a/aclk/legacy/tests/paho-inspection.py
+++ b/aclk/legacy/tests/paho-inspection.py
@@ -55,5 +55,5 @@ mqttc.connect(sys.argv[1], 8443, 60)
#mqttc.publish("/agent/mine","Test1")
#mqttc.subscribe("$SYS/#", 0)
-print("Connected succesfully, monitoring /agent/#", flush=True)
+print("Connected successfully, monitoring /agent/#", flush=True)
mqttc.loop_forever()