summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
commit89f3604407aff8f4cb2ed958252c61e23c767e24 (patch)
tree7fbf408102cab051557d38193524d8c6e991d070 /aclk
parentAdding upstream version 1.34.1. (diff)
downloadnetdata-89f3604407aff8f4cb2ed958252c61e23c767e24.tar.xz
netdata-89f3604407aff8f4cb2ed958252c61e23c767e24.zip
Adding upstream version 1.35.0.upstream/1.35.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md2
-rw-r--r--aclk/aclk.c100
-rw-r--r--aclk/aclk.h3
-rw-r--r--aclk/aclk_alarm_api.c3
-rw-r--r--aclk/aclk_api.c3
-rw-r--r--aclk/aclk_api.h3
-rw-r--r--aclk/aclk_otp.c404
-rw-r--r--aclk/aclk_query.c64
-rw-r--r--aclk/aclk_query_queue.c12
-rw-r--r--aclk/aclk_query_queue.h4
-rw-r--r--aclk/aclk_rx_msgs.c39
-rw-r--r--aclk/aclk_stats.c2
-rw-r--r--aclk/aclk_tx_msgs.c83
-rw-r--r--aclk/aclk_tx_msgs.h3
-rw-r--r--aclk/aclk_util.c11
-rw-r--r--aclk/aclk_util.h6
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc4
-rw-r--r--aclk/schema-wrappers/capability.cc11
-rw-r--r--aclk/schema-wrappers/capability.h24
-rw-r--r--aclk/schema-wrappers/connection.cc9
-rw-r--r--aclk/schema-wrappers/connection.h4
-rw-r--r--aclk/schema-wrappers/node_creation.h6
-rw-r--r--aclk/schema-wrappers/node_info.cc18
-rw-r--r--aclk/schema-wrappers/node_info.h4
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h1
25 files changed, 456 insertions, 367 deletions
diff --git a/aclk/README.md b/aclk/README.md
index 09c0d2920..f595726e3 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -50,10 +50,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
[cloud]
statistics = yes
query thread count = 2
+ mqtt5 = no
```
- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
+- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview.
## Disable the ACLK
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 599b9a093..6426c5b5e 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -12,6 +12,7 @@
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
#include "https_client.h"
+#include "schema-wrappers/schema_wrappers.h"
#include "aclk_proxy.h"
@@ -34,7 +35,7 @@ time_t last_disconnect_time = 0;
time_t next_connection_attempt = 0;
float last_backoff_value = 0;
-int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
+int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
time_t aclk_block_until = 0;
@@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
case MQTT_WSS_LOG_WARN:
- error("%s", str);
+ error_report("%s", str);
return;
case MQTT_WSS_LOG_INFO:
info("%s", str);
@@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+ char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
error("Unable to fetch topic for COMMAND (to subscribe)");
@@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
- topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
@@ -800,10 +801,12 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
+ use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
#else
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
@@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
aclk_queue_query(query);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
@@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = host->hostname,
+ .machine_guid = host->machine_guid
+ };
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
- create_query->data.node_creation.hostname = strdupz(host->hostname);
- create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
aclk_queue_query(create_query);
return;
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = (uint32_t) host->system_info->hops;
+ node_instance_connection_t node_state_update = {
+ .hops = host->system_info->hops,
+ .live = cmd,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = cmd;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
+
+ info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
}
@@ -1096,39 +1112,52 @@ void aclk_send_node_instances()
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .live = list->live,
+ .hops = list->hops,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = list->live;
- query->data.node_update.hops = list->hops;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- freez(list->hostname);
- info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
+ info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
+ node_instance_creation_t node_instance_creation = {
+ .hops = list->hops,
+ .hostname = list->hostname,
+ };
+ node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = list->hops;
- create_query->data.node_creation.hostname = list->hostname;
- create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
- info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
+ info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
+ freez(node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
+ freez(list->hostname);
list++;
}
freez(list_head);
}
+#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
@@ -1208,7 +1237,7 @@ char *ng_aclk_state(void)
"Protocols Supported: Legacy\n"
#endif
);
- buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
+ buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
@@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ json_object_object_add(msg, "mqtt-version", tmp);
+
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
json_object_object_add(msg, "received-app-layer-msgs", tmp);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 4d8546314..41c4e05e4 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -43,9 +43,10 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd);
-
void aclk_send_node_instances(void);
+#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index 7df51a7b5..a181eb291 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -23,6 +23,9 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
char *payload = generate_alarm_log_entry(&payload_size, log_entry);
aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
+
+ if (!use_mqtt_5)
+ freez(payload);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 766f78053..a2e738ab1 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -16,6 +16,7 @@ int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
+int use_mqtt_5 = 0;
#define ACLK_IMPL_KEY_NAME "aclk implementation"
@@ -68,6 +69,8 @@ struct label *add_aclk_host_labels(struct label *label) {
break;
}
+ int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+ label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index 9958b0e11..557b70d70 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -21,6 +21,7 @@ extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
extern int aclk_ng;
+extern int use_mqtt_5;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
@@ -36,7 +37,9 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int connect);
+#endif
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index 658e04f9b..c99c65637 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -9,164 +9,6 @@
#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
-struct dictionary_singleton {
- char *key;
- char *result;
-};
-
-static int json_extract_singleton(JSON_ENTRY *e)
-{
- struct dictionary_singleton *data = e->callback_data;
-
- switch (e->type) {
- case JSON_OBJECT:
- case JSON_ARRAY:
- break;
- case JSON_STRING:
- if (!strcmp(e->name, data->key)) {
- data->result = strdupz(e->data.string);
- break;
- }
- break;
- case JSON_NUMBER:
- case JSON_BOOLEAN:
- case JSON_NULL:
- break;
- }
- return 0;
-}
-
-// Base-64 decoder.
-// Note: This is non-validating, invalid input will be decoded without an error.
-// Challenges are packed into json strings so we don't skip newlines.
-// Size errors (i.e. invalid input size or insufficient output space) are caught.
-static size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
-{
- static char lookup[256];
- static int first_time=1;
- if (first_time)
- {
- first_time = 0;
- for(int i=0; i<256; i++)
- lookup[i] = -1;
- for(int i='A'; i<='Z'; i++)
- lookup[i] = i-'A';
- for(int i='a'; i<='z'; i++)
- lookup[i] = i-'a' + 26;
- for(int i='0'; i<='9'; i++)
- lookup[i] = i-'0' + 52;
- lookup['+'] = 62;
- lookup['/'] = 63;
- }
- if ((input_size & 3) != 0)
- {
- error("Can't decode base-64 input length %zu", input_size);
- return 0;
- }
- size_t unpadded_size = (input_size/4) * 3;
- if ( unpadded_size > output_size )
- {
- error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
- return 0;
- }
- // Don't check padding within full quantums
- for (size_t i = 0 ; i < input_size-4 ; i+=4 )
- {
- uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
- output[0] = value >> 16;
- output[1] = value >> 8;
- output[2] = value;
- //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
- output += 3;
- input += 4;
- }
- // Handle padding only in last quantum
- if (input[2] == '=') {
- uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
- output[0] = value >> 4;
- //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
- return unpadded_size-2;
- }
- else if (input[3] == '=') {
- uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
- output[0] = value >> 10;
- output[1] = value >> 2;
- //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
- return unpadded_size-1;
- }
- else
- {
- uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
- output[0] = value >> 16;
- output[1] = value >> 8;
- output[2] = value;
- //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
- return unpadded_size;
- }
-}
-
-static size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
-{
- uint32_t value;
- static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz"
- "0123456789+/";
- if ((input_size/3+1)*4 >= output_size)
- {
- error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
- return 0;
- }
- size_t count = 0;
- while (input_size>3)
- {
- value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
- output[0] = lookup[value >> 18];
- output[1] = lookup[(value >> 12) & 0x3f];
- output[2] = lookup[(value >> 6) & 0x3f];
- output[3] = lookup[value & 0x3f];
- //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
- output += 4;
- input += 3;
- input_size -= 3;
- count += 4;
- }
- switch (input_size)
- {
- case 2:
- value = (input[0] << 10) + (input[1] << 2);
- output[0] = lookup[(value >> 12) & 0x3f];
- output[1] = lookup[(value >> 6) & 0x3f];
- output[2] = lookup[value & 0x3f];
- output[3] = '=';
- //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
- count += 4;
- break;
- case 1:
- value = input[0] << 4;
- output[0] = lookup[(value >> 6) & 0x3f];
- output[1] = lookup[value & 0x3f];
- output[2] = '=';
- output[3] = '=';
- //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
- count += 4;
- break;
- case 0:
- break;
- }
- return count;
-}
-
-static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char *decrypted)
-{
- int result = RSA_private_decrypt( data_len, enc_data, decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
- if (result == -1) {
- char err[512];
- ERR_error_string_n(ERR_get_error(), err, sizeof(err));
- error("Decryption of the challenge failed: %s", err);
- }
- return result;
-}
-
static int aclk_https_request(https_req_t *request, https_req_response_t *response) {
int rc;
// wrapper for ACLK only which loads ACLK specific proxy settings
@@ -426,112 +268,242 @@ exit:
}
#endif
+#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
+static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
+{
+ EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
+
+ if (ctx != NULL) {
+ memset(ctx, 0, sizeof(*ctx));
+ }
+ return ctx;
+}
+static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
+{
+ OPENSSL_free(ctx);
+ return;
+}
+#endif
+
+#define CHALLENGE_LEN 256
+#define CHALLENGE_LEN_BASE64 344
+inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
+{
+ unsigned char remaining_data[CHALLENGE_LEN];
+ EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
+ EVP_DecodeInit(ctx);
+ EVP_DecodeUpdate(ctx, out, outl, in, in_len);
+ int remainder = 0;
+ EVP_DecodeFinal(ctx, remaining_data, &remainder);
+ EVP_ENCODE_CTX_free(ctx);
+ if (remainder) {
+ error("Unexpected data at EVP_DecodeFinal");
+ return 1;
+ }
+ return 0;
+}
+
+inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
+{
+ int len;
+ unsigned char *str = out;
+ EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
+ EVP_EncodeInit(ctx);
+ EVP_EncodeUpdate(ctx, str, outl, in, in_len);
+ str += *outl;
+ EVP_EncodeFinal(ctx, str, &len);
+ *outl += len;
+ // if we ever expect longer output than what OpenSSL would pack into single line
+ // we would have to skip the endlines, until then we can just cut the string short
+ str = (unsigned char*)strchr((char*)out, '\n');
+ if (str)
+ *str = 0;
+ EVP_ENCODE_CTX_free(ctx);
+ return 0;
+}
+
#define OTP_URL_PREFIX "/api/v1/auth/node/"
-int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) {
- // TODO this fnc will be rewritten and simplified in following PRs
- // still carries lot of baggage from ACLK Legacy
+int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes)
+{
int rc = 1;
- BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
-
https_req_t req = HTTPS_REQ_T_INITIALIZER;
https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
- char *agent_id = is_agent_claimed();
- if (agent_id == NULL)
- {
- error("Agent was not claimed - cannot perform challenge/response");
- goto cleanup;
- }
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
- // GET Challenge
req.host = target->host;
req.port = target->port;
buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id);
- req.url = url->buffer;
+ req.url = (char *)buffer_tostring(url);
if (aclk_https_request(&req, &resp)) {
error ("ACLK_OTP Challenge failed");
- goto cleanup;
+ buffer_free(url);
+ return 1;
}
if (resp.http_code != 200) {
error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code);
+ buffer_free(url);
if (resp.payload_size)
aclk_parse_otp_error(resp.payload);
goto cleanup_resp;
}
- info ("ACLK_OTP Got Challenge from Cloud");
+ buffer_free(url);
- struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
+ info ("ACLK_OTP Got Challenge from Cloud");
- if (json_parse(resp.payload, &challenge, json_extract_singleton) != JSON_OK)
- {
- freez(challenge.result);
- error("Could not parse the the challenge");
+ json_object *json = json_tokener_parse(resp.payload);
+ if (!json) {
+ error ("Couldn't parse HTTP GET challenge payload");
goto cleanup_resp;
}
- if (challenge.result == NULL) {
- error("Could not retrieve challenge JSON key from challenge response");
- goto cleanup_resp;
+ json_object *challenge_json;
+ if (!json_object_object_get_ex(json, "challenge", &challenge_json)) {
+ error ("No key named \"challenge\" in the returned JSON");
+ goto cleanup_json;
+ }
+ if (!json_object_is_type(challenge_json, json_type_string)) {
+ error ("\"challenge\" is not a string JSON type");
+ goto cleanup_json;
+ }
+ const char *challenge_base64;
+ if (!(challenge_base64 = json_object_get_string(challenge_json))) {
+ error("Failed to extract challenge from JSON object");
+ goto cleanup_json;
+ }
+ if (strlen(challenge_base64) != CHALLENGE_LEN_BASE64) {
+ error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64);
+ goto cleanup_json;
}
- // Decrypt the Challenge and Calculate Response
- size_t challenge_len = strlen(challenge.result);
- unsigned char decoded[512];
- size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
- freez(challenge.result);
-
- unsigned char plaintext[4096]={};
- int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext);
- char encoded[512];
- size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
- encoded[encoded_len] = 0;
- debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
-
- char response_json[4096]={};
- sprintf(response_json, "{\"response\":\"%s\"}", encoded);
- debug(D_ACLK, "Password phase: %s",response_json);
+ *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3);
+ base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64));
+ if (*challenge_bytes != CHALLENGE_LEN) {
+ error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN);
+ freez(challenge);
+ *challenge = NULL;
+ goto cleanup_json;
+ }
+ rc = 0;
+cleanup_json:
+ json_object_put(json);
+cleanup_resp:
https_req_response_free(&resp);
- https_req_response_init(&resp);
+ return rc;
+}
- // POST password
+int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth)
+{
+ int len;
+ int rc = 1;
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ req.host = target->host;
+ req.port = target->port;
req.request_type = HTTP_REQ_POST;
- buffer_flush(url);
+
+ unsigned char base64[CHALLENGE_LEN_BASE64 + 1];
+ memset(base64, 0, CHALLENGE_LEN_BASE64 + 1);
+
+ base64_encode_helper(base64, &len, response, response_bytes);
+
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+ BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+
buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id);
- req.url = url->buffer;
- req.payload = response_json;
- req.payload_size = strlen(response_json);
+ buffer_sprintf(resp_json, "{\"response\":\"%s\"}", base64);
+
+ req.url = (char *)buffer_tostring(url);
+ req.payload = (char *)buffer_tostring(resp_json);
+ req.payload_size = strlen(req.payload);
if (aclk_https_request(&req, &resp)) {
error ("ACLK_OTP Password error trying to post result to password");
- goto cleanup;
+ goto cleanup_buffers;
}
if (resp.http_code != 201) {
error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code);
if (resp.payload_size)
aclk_parse_otp_error(resp.payload);
- goto cleanup_resp;
+ goto cleanup_response;
}
info ("ACLK_OTP Got Password from Cloud");
- struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL };
-
- if (parse_passwd_response(resp.payload, &data)){
+ if (parse_passwd_response(resp.payload, mqtt_auth)){
error("Error parsing response of password endpoint");
- goto cleanup_resp;
+ goto cleanup_response;
+ }
+
+ rc = 0;
+
+cleanup_response:
+ https_req_response_free(&resp);
+cleanup_buffers:
+ buffer_free(resp_json);
+ buffer_free(url);
+ return rc;
+}
+
+static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted)
+{
+ *decrypted = mallocz(RSA_size(p_key));
+ int result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
+ if (result == -1) {
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Decryption of the challenge failed: %s", err);
+ }
+ return result;
+}
+
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
+{
+ unsigned char *challenge;
+ int challenge_bytes;
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL) {
+ error("Agent was not claimed - cannot perform challenge/response");
+ return 1;
+ }
+
+ // Get Challenge
+ if (aclk_get_otp_challenge(target, agent_id, &challenge, &challenge_bytes)) {
+ error("Error getting challenge");
+ freez(agent_id);
+ return 1;
+ }
+
+ // Decrypt Challenge / Get response
+ unsigned char *response_plaintext;
+ int response_plaintext_bytes = private_decrypt(p_key, challenge, challenge_bytes, &response_plaintext);
+ if (response_plaintext_bytes < 0) {
+ error ("Couldn't decrypt the challenge received");
+ freez(response_plaintext);
+ freez(challenge);
+ freez(agent_id);
+ return 1;
+ }
+ freez(challenge);
+
+ // Encode and Send Challenge
+ struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL };
+ if (aclk_send_otp_response(agent_id, response_plaintext, response_plaintext_bytes, target, &data)) {
+ error("Error getting response");
+ freez(response_plaintext);
+ freez(agent_id);
+ return 1;
}
*mqtt_pass = data.passwd;
*mqtt_usr = data.username;
*mqtt_id = data.client_id;
- rc = 0;
-cleanup_resp:
- https_req_response_free(&resp);
-cleanup:
+ freez(response_plaintext);
freez(agent_id);
- buffer_free(url);
- return rc;
+ return 0;
}
#define JSON_KEY_ENC "encoding"
@@ -860,6 +832,8 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
}
if (resp.http_code != 200) {
error("The HTTP code not 200 OK (Got %d)", resp.http_code);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
https_req_response_free(&resp);
buffer_free(buf);
return 1;
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index ae5659310..de970fc3d 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -111,6 +111,18 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->tv_in = query->created_tv;
now_realtime_timeval(&w->tv_ready);
+ if (query->timeout) {
+ int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000);
+ if (in_queue > query->timeout) {
+ log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout);
+ retval = 1;
+ w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
+ goto cleanup;
+ }
+ }
+
+ RRDHOST *temp_host = NULL;
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
@@ -125,11 +137,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
- error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
- retval = 1;
- w->response.code = 404;
- aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
- goto cleanup;
+ temp_host = sql_create_host_by_uuid(nodeid);
+ if (!temp_host) {
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
+ retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
+ goto cleanup;
+ }
}
}
@@ -150,7 +165,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// execute the query
- t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
+ t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop");
+ free_temporary_host(temp_host);
size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
sent = size;
@@ -276,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_registration(query_thr->client, &query->data.node_creation);
- return 0;
-}
-
-static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_state_update(query_thr->client, &query->data.node_update);
- return 0;
-}
-
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
@@ -308,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update },
+ { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
+ { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
{ .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
{ .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
{ .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
@@ -337,6 +337,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
{
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
if (aclk_query_handlers[i].type == query->type) {
+ worker_is_busy(i);
+
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
if (aclk_stats_enabled) {
@@ -347,6 +349,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
ACLK_STATS_UNLOCK;
}
aclk_query_free(query);
+
+ worker_is_idle();
return;
}
}
@@ -364,21 +368,33 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
return 0;
}
+static void worker_aclk_register(void) {
+ worker_register("ACLKQUERY");
+ for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
+ worker_register_job_name(i, aclk_query_handlers[i].name);
+ }
+}
+
/**
* Main query processing thread
*/
void *aclk_query_main_thread(void *ptr)
{
+ worker_aclk_register();
+
struct aclk_query_thread *query_thr = ptr;
while (!netdata_exit) {
aclk_query_process_msgs(query_thr);
+ worker_is_idle();
QUERY_THREAD_LOCK;
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
}
+
+ worker_unregister();
return NULL;
}
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 74a899226..2422b01e1 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -121,16 +121,7 @@ void aclk_query_free(aclk_query_t query)
break;
case NODE_STATE_UPDATE:
- freez((void*)query->data.node_update.claim_id);
- freez((void*)query->data.node_update.node_id);
- break;
-
case REGISTER_NODE:
- freez((void*)query->data.node_creation.claim_id);
- freez((void*)query->data.node_creation.hostname);
- freez((void*)query->data.node_creation.machine_guid);
- break;
-
case CHART_DIMS_UPDATE:
case CHART_CONFIG_UPDATED:
case CHART_RESET:
@@ -139,7 +130,8 @@ void aclk_query_free(aclk_query_t query)
case ALARM_LOG_HEALTH:
case ALARM_PROVIDE_CFG:
case ALARM_SNAPSHOT:
- freez(query->data.bin_payload.payload);
+ if (!use_mqtt_5)
+ freez(query->data.bin_payload.payload);
break;
default:
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 88976f9eb..0b5ef8faa 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -67,7 +67,7 @@ struct aclk_query {
struct timeval created_tv;
usec_t created;
-
+ int timeout;
aclk_query_t next;
// TODO maybe remove?
@@ -77,8 +77,6 @@ struct aclk_query {
struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_query_chart_add_del chart_add_del;
- node_instance_creation_t node_creation;
- node_instance_connection_t node_update;
struct aclk_bin_payload bin_payload;
json_object *alarm_update;
} data;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 1f2cb27ef..27f1bf2dc 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -17,6 +17,7 @@ struct aclk_request {
char *callback_topic;
char *payload;
int version;
+ int timeout;
int min_version;
int max_version;
};
@@ -57,6 +58,10 @@ static int cloud_to_agent_parse(JSON_ENTRY *e)
data->version = e->data.number;
break;
}
+ if (!strcmp(e->name, "timeout")) {
+ data->timeout = e->data.number;
+ break;
+ }
if (!strcmp(e->name, "min-version")) {
data->min_version = e->data.number;
break;
@@ -160,6 +165,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
// aclk_queue_query takes ownership of data pointer
query->callback_topic = cloud_to_agent->callback_topic;
+ query->timeout = cloud_to_agent->timeout;
// for clarity and code readability as when we process the request
// it would be strange to get URL from `dedup_id`
query->data.http_api_v2.query = query->dedup_id;
@@ -271,32 +277,39 @@ int create_node_instance_result(const char *msg, size_t msg_len)
update_node_id(&host_id, &node_id);
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
- rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
- rrdhost_aclk_state_unlock(localhost);
+ node_instance_connection_t node_state_update = {
+ .hops = 1,
+ .live = 0,
+ .queryable = 1,
+ .session_id = aclk_session_newarch,
+ .node_id = res.node_id
+ };
RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
- query->data.node_update.live = 0;
-
if (host) {
// not all host must have RRDHOST struct created for them
// if they never connected during runtime of agent
if (host == localhost) {
- query->data.node_update.live = 1;
- query->data.node_update.hops = 0;
+ node_state_update.live = 1;
+ node_state_update.hops = 0;
} else {
netdata_mutex_lock(&host->receiver_lock);
- query->data.node_update.live = (host->receiver != NULL);
+ node_state_update.live = (host->receiver != NULL);
netdata_mutex_unlock(&host->receiver_lock);
- query->data.node_update.hops = host->system_info->hops;
+ node_state_update.hops = host->system_info->hops;
}
}
- query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+
aclk_queue_query(query);
+ freez(res.node_id);
freez(res.machine_guid);
return 0;
}
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index a9f0a923c..ca0532638 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -364,7 +364,7 @@ void aclk_stats_upd_online(int online) {
}
#ifdef NETDATA_INTERNAL_CHECKS
-static usec_t pub_time[UINT16_MAX];
+static usec_t pub_time[UINT16_MAX + 1] = {0};
void aclk_stats_msg_published(uint16_t id)
{
ACLK_STATS_LOCK;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 185f5d796..3530dccff 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else
+ mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
- return 500;
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
@@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return 503;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return 403;
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else {
+ rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
+ error("Timeout sending binpacked message");
+ freez(full_msg);
+ return HTTP_RESP_BACKEND_FETCH_FAILED;
+ }
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
+ error("Message is bigger than allowed maximum");
+ freez(full_msg);
+ return HTTP_RESP_FORBIDDEN;
+ }
}
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_put(msg);
switch (rc) {
- case 403:
+ case HTTP_RESP_FORBIDDEN:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
break;
- case 500:
+ case HTTP_RESP_INTERNAL_SERVER_ERROR:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
break;
- case 503:
+ case HTTP_RESP_BACKEND_FETCH_FAILED:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
break;
}
@@ -452,10 +461,22 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
uint16_t pid;
+
+ struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+#ifdef ENABLE_ML
+ { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
+#endif
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
- .session_id = aclk_session_newarch
+ .session_id = aclk_session_newarch,
+ .capabilities = agent_capabilities
};
rrdhost_aclk_state_lock(localhost);
@@ -478,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- freez(msg);
+ if (!use_mqtt_5)
+ freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;
@@ -490,7 +512,8 @@ char *aclk_generate_lwt(size_t *size) {
update_agent_connection_t conn = {
.reachable = 0,
.lwt = 1,
- .session_id = aclk_session_newarch
+ .session_id = aclk_session_newarch,
+ .capabilities = NULL
};
rrdhost_aclk_state_lock(localhost);
@@ -509,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
-
-void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
- size_t len;
- char *msg = generate_node_instance_creation(&len, node_creation);
- if (!msg) {
- error("Error generating nodeinstance::create::v1::CreateNodeInstance");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
- freez(msg);
-}
-
-void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
- size_t len;
- char *msg = generate_node_instance_connection(&len, node_connection);
- if (!msg) {
- error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
- freez(msg);
-}
#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index 402f13fb6..44281eb68 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -28,9 +28,6 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);
-
-void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation);
-void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection);
#endif
#endif
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 5576a865a..430925460 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -65,17 +65,6 @@ int aclk_env_has_capa(const char *capa)
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
-#if !defined(HAVE_C___ATOMIC)
-netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER;
-int aclk_get_conv_log_next()
-{
- int ret;
- netdata_mutex_lock(&aclk_conversation_log_mutex);
- ret = aclk_conversation_log_counter++;
- netdata_mutex_unlock(&aclk_conversation_log_mutex);
- return ret;
-}
-#endif
#endif
#define ACLK_TOPIC_PREFIX "/agent/"
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 7a7202076..fb0492ac8 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -99,13 +99,7 @@ void free_topic_cache(void);
#ifdef ACLK_LOG_CONVERSATION_DIR
extern volatile int aclk_conversation_log_counter;
-#if defined(HAVE_C___ATOMIC)
#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST)
-#else
-extern netdata_mutex_t aclk_conversation_log_mutex;
-int aclk_get_conv_log_next();
-#define ACLK_GET_CONV_LOG_NEXT() aclk_get_conv_log_next()
-#endif
#endif
unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
index 5868e5d67..338e512d8 100644
--- a/aclk/schema-wrappers/alarm_stream.cc
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -176,8 +176,10 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
*len = PROTO_COMPAT_MSG_SIZE(le);
char *bin = (char*)mallocz(*len);
- if (!le.SerializeToArray(bin, *len))
+ if (!le.SerializeToArray(bin, *len)) {
+ freez(bin);
return NULL;
+ }
return bin;
}
diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc
new file mode 100644
index 000000000..769806f90
--- /dev/null
+++ b/aclk/schema-wrappers/capability.cc
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/aclk/v1/lib.pb.h"
+
+#include "capability.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) {
+ proto_capa->set_name(c_capa->name);
+ proto_capa->set_enabled(c_capa->enabled);
+ proto_capa->set_version(c_capa->version);
+}
diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h
new file mode 100644
index 000000000..9517a8716
--- /dev/null
+++ b/aclk/schema-wrappers/capability.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_CAPABILITY_H
+#define ACLK_SCHEMA_CAPABILITY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct capability {
+ const char *name;
+ uint32_t version;
+ int enabled;
+};
+
+#ifdef __cplusplus
+}
+
+#include "proto/aclk/v1/lib.pb.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa);
+#endif
+
+#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
index e3bbfe31f..7520a4600 100644
--- a/aclk/schema-wrappers/connection.cc
+++ b/aclk/schema-wrappers/connection.cc
@@ -28,6 +28,15 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
timestamp->set_seconds(tv.tv_sec);
timestamp->set_nanos(tv.tv_usec * 1000);
+ if (data->capabilities) {
+ struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(connupd);
char *msg = (char*)malloc(*len);
if (msg)
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
index 8c223869a..fcbe6bd59 100644
--- a/aclk/schema-wrappers/connection.h
+++ b/aclk/schema-wrappers/connection.h
@@ -3,6 +3,8 @@
#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H
#define ACLK_SCHEMA_WRAPPER_CONNECTION_H
+#include "capability.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -15,6 +17,8 @@ typedef struct {
unsigned int lwt:1;
+ struct capability *capabilities;
+
// TODO in future optional fields
// > 15 optional fields:
// How long the system was running until connection (only applicable when reachable=true)
diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h
index 71e45ef55..190ccb4d6 100644
--- a/aclk/schema-wrappers/node_creation.h
+++ b/aclk/schema-wrappers/node_creation.h
@@ -8,9 +8,9 @@ extern "C" {
#endif
typedef struct {
- const char* claim_id;
- const char* machine_guid;
- const char* hostname;
+ char* claim_id;
+ char* machine_guid;
+ char* hostname;
int32_t hops;
} node_instance_creation_t;
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
index f6f15ffb2..f66985246 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/aclk/schema-wrappers/node_info.cc
@@ -94,6 +94,24 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
ml_info->set_ml_capable(info->ml_info.ml_capable);
ml_info->set_ml_enabled(info->ml_info.ml_enabled);
+ struct capability *capa;
+ if (info->node_capabilities) {
+ capa = info->node_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+ if (info->node_instance_capabilities) {
+ capa = info->node_instance_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_instance_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
index 41daf94c8..e67f3e1da 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/aclk/schema-wrappers/node_info.h
@@ -6,6 +6,7 @@
#include <stdlib.h>
#include "database/rrd.h"
+#include "capability.h"
#ifdef __cplusplus
extern "C" {
@@ -67,6 +68,9 @@ struct update_node_info {
int child;
struct machine_learning_info ml_info;
+
+ struct capability *node_capabilities;
+ struct capability *node_instance_capabilities;
};
char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a3975fca3..a3248a69b 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -13,5 +13,6 @@
#include "alarm_config.h"
#include "alarm_stream.h"
#include "node_info.h"
+#include "capability.h"
#endif /* SCHEMA_WRAPPERS_H */