diff options
Diffstat (limited to '')
-rw-r--r-- | aclk/README.md | 2 | ||||
-rw-r--r-- | aclk/aclk.c | 100 | ||||
-rw-r--r-- | aclk/aclk.h | 3 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 3 | ||||
-rw-r--r-- | aclk/aclk_api.c | 3 | ||||
-rw-r--r-- | aclk/aclk_api.h | 3 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 404 | ||||
-rw-r--r-- | aclk/aclk_query.c | 64 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 12 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 4 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 39 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 2 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 83 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 3 | ||||
-rw-r--r-- | aclk/aclk_util.c | 11 | ||||
-rw-r--r-- | aclk/aclk_util.h | 6 | ||||
-rw-r--r-- | aclk/schema-wrappers/alarm_stream.cc | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/capability.cc | 11 | ||||
-rw-r--r-- | aclk/schema-wrappers/capability.h | 24 | ||||
-rw-r--r-- | aclk/schema-wrappers/connection.cc | 9 | ||||
-rw-r--r-- | aclk/schema-wrappers/connection.h | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_creation.h | 6 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_info.cc | 18 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_info.h | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 1 |
25 files changed, 456 insertions, 367 deletions
diff --git a/aclk/README.md b/aclk/README.md index 09c0d292..f595726e 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -50,10 +50,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`: [cloud] statistics = yes query thread count = 2 + mqtt5 = no ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). +- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview. ## Disable the ACLK diff --git a/aclk/aclk.c b/aclk/aclk.c index 599b9a09..6426c5b5 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -12,6 +12,7 @@ #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" #include "https_client.h" +#include "schema-wrappers/schema_wrappers.h" #include "aclk_proxy.h" @@ -34,7 +35,7 @@ time_t last_disconnect_time = 0; time_t next_connection_attempt = 0; float last_backoff_value = 0; -int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload +int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload time_t aclk_block_until = 0; @@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) case MQTT_WSS_LOG_ERROR: case MQTT_WSS_LOG_FATAL: case MQTT_WSS_LOG_WARN: - error("%s", str); + error_report("%s", str); return; case MQTT_WSS_LOG_INFO: info("%s", str); @@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) error("Unable to fetch topic for COMMAND (to subscribe)"); @@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) { - topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); else @@ -800,10 +801,12 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; + use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + #ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { #else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { #endif error("Couldn't initialize MQTT_WSS network library"); goto exit; @@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu aclk_queue_query(query); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; @@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd) aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = host->hostname, + .machine_guid = host->machine_guid + }; + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = (uint32_t) host->system_info->hops; - create_query->data.node_creation.hostname = strdupz(host->hostname); - create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); aclk_queue_query(create_query); return; } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = (uint32_t) host->system_info->hops; + node_instance_connection_t node_state_update = { + .hops = host->system_info->hops, + .live = cmd, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = cmd; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd, + + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } @@ -1096,39 +1112,52 @@ void aclk_send_node_instances() while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .live = list->live, + .hops = list->hops, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = list->live; - query->data.node_update.hops = list->hops; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - freez(list->hostname); - info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } else { aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); + node_instance_creation_t node_instance_creation = { + .hops = list->hops, + .hostname = list->hostname, + }; + node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = list->hops; - create_query->data.node_creation.hostname = list->hostname; - create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); - info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid, + info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); + freez(node_instance_creation.machine_guid); aclk_queue_query(create_query); } + freez(list->hostname); list++; } freez(list_head); } +#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { @@ -1208,7 +1237,7 @@ char *ng_aclk_state(void) "Protocols Supported: Legacy\n" #endif ); - buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + json_object_object_add(msg, "mqtt-version", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); json_object_object_add(msg, "received-app-layer-msgs", tmp); diff --git a/aclk/aclk.h b/aclk/aclk.h index 4d854631..41c4e05e 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -43,9 +43,10 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd); - void aclk_send_node_instances(void); +#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index 7df51a7b..a181eb29 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -23,6 +23,9 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) char *payload = generate_alarm_log_entry(&payload_size, log_entry); aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); + + if (!use_mqtt_5) + freez(payload); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 766f7805..a2e738ab 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -16,6 +16,7 @@ int aclk_disable_runtime = 0; int aclk_disable_single_updates = 0; int aclk_stats_enabled; +int use_mqtt_5 = 0; #define ACLK_IMPL_KEY_NAME "aclk implementation" @@ -68,6 +69,8 @@ struct label *add_aclk_host_labels(struct label *label) { break; } + int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO); label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); #ifdef ENABLE_NEW_CLOUD_PROTOCOL diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index 9958b0e1..557b70d7 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -21,6 +21,7 @@ extern int aclk_stats_enabled; extern int aclk_alert_reloaded; extern int aclk_ng; +extern int use_mqtt_5; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); @@ -36,7 +37,9 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int connect); +#endif #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index 658e04f9..c99c6563 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -9,164 +9,6 @@ #include "mqtt_websockets/c-rbuf/include/ringbuffer.h" -struct dictionary_singleton { - char *key; - char *result; -}; - -static int json_extract_singleton(JSON_ENTRY *e) -{ - struct dictionary_singleton *data = e->callback_data; - - switch (e->type) { - case JSON_OBJECT: - case JSON_ARRAY: - break; - case JSON_STRING: - if (!strcmp(e->name, data->key)) { - data->result = strdupz(e->data.string); - break; - } - break; - case JSON_NUMBER: - case JSON_BOOLEAN: - case JSON_NULL: - break; - } - return 0; -} - -// Base-64 decoder. -// Note: This is non-validating, invalid input will be decoded without an error. -// Challenges are packed into json strings so we don't skip newlines. -// Size errors (i.e. invalid input size or insufficient output space) are caught. -static size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) -{ - static char lookup[256]; - static int first_time=1; - if (first_time) - { - first_time = 0; - for(int i=0; i<256; i++) - lookup[i] = -1; - for(int i='A'; i<='Z'; i++) - lookup[i] = i-'A'; - for(int i='a'; i<='z'; i++) - lookup[i] = i-'a' + 26; - for(int i='0'; i<='9'; i++) - lookup[i] = i-'0' + 52; - lookup['+'] = 62; - lookup['/'] = 63; - } - if ((input_size & 3) != 0) - { - error("Can't decode base-64 input length %zu", input_size); - return 0; - } - size_t unpadded_size = (input_size/4) * 3; - if ( unpadded_size > output_size ) - { - error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); - return 0; - } - // Don't check padding within full quantums - for (size_t i = 0 ; i < input_size-4 ; i+=4 ) - { - uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; - output[0] = value >> 16; - output[1] = value >> 8; - output[2] = value; - //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); - output += 3; - input += 4; - } - // Handle padding only in last quantum - if (input[2] == '=') { - uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; - output[0] = value >> 4; - //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); - return unpadded_size-2; - } - else if (input[3] == '=') { - uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; - output[0] = value >> 10; - output[1] = value >> 2; - //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); - return unpadded_size-1; - } - else - { - uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; - output[0] = value >> 16; - output[1] = value >> 8; - output[2] = value; - //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); - return unpadded_size; - } -} - -static size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) -{ - uint32_t value; - static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz" - "0123456789+/"; - if ((input_size/3+1)*4 >= output_size) - { - error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); - return 0; - } - size_t count = 0; - while (input_size>3) - { - value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; - output[0] = lookup[value >> 18]; - output[1] = lookup[(value >> 12) & 0x3f]; - output[2] = lookup[(value >> 6) & 0x3f]; - output[3] = lookup[value & 0x3f]; - //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); - output += 4; - input += 3; - input_size -= 3; - count += 4; - } - switch (input_size) - { - case 2: - value = (input[0] << 10) + (input[1] << 2); - output[0] = lookup[(value >> 12) & 0x3f]; - output[1] = lookup[(value >> 6) & 0x3f]; - output[2] = lookup[value & 0x3f]; - output[3] = '='; - //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); - count += 4; - break; - case 1: - value = input[0] << 4; - output[0] = lookup[(value >> 6) & 0x3f]; - output[1] = lookup[value & 0x3f]; - output[2] = '='; - output[3] = '='; - //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); - count += 4; - break; - case 0: - break; - } - return count; -} - -static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char *decrypted) -{ - int result = RSA_private_decrypt( data_len, enc_data, decrypted, p_key, RSA_PKCS1_OAEP_PADDING); - if (result == -1) { - char err[512]; - ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Decryption of the challenge failed: %s", err); - } - return result; -} - static int aclk_https_request(https_req_t *request, https_req_response_t *response) { int rc; // wrapper for ACLK only which loads ACLK specific proxy settings @@ -426,112 +268,242 @@ exit: } #endif +#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 +static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) +{ + EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); + + if (ctx != NULL) { + memset(ctx, 0, sizeof(*ctx)); + } + return ctx; +} +static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) +{ + OPENSSL_free(ctx); + return; +} +#endif + +#define CHALLENGE_LEN 256 +#define CHALLENGE_LEN_BASE64 344 +inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) +{ + unsigned char remaining_data[CHALLENGE_LEN]; + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_DecodeInit(ctx); + EVP_DecodeUpdate(ctx, out, outl, in, in_len); + int remainder = 0; + EVP_DecodeFinal(ctx, remaining_data, &remainder); + EVP_ENCODE_CTX_free(ctx); + if (remainder) { + error("Unexpected data at EVP_DecodeFinal"); + return 1; + } + return 0; +} + +inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) +{ + int len; + unsigned char *str = out; + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_EncodeInit(ctx); + EVP_EncodeUpdate(ctx, str, outl, in, in_len); + str += *outl; + EVP_EncodeFinal(ctx, str, &len); + *outl += len; + // if we ever expect longer output than what OpenSSL would pack into single line + // we would have to skip the endlines, until then we can just cut the string short + str = (unsigned char*)strchr((char*)out, '\n'); + if (str) + *str = 0; + EVP_ENCODE_CTX_free(ctx); + return 0; +} + #define OTP_URL_PREFIX "/api/v1/auth/node/" -int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) { - // TODO this fnc will be rewritten and simplified in following PRs - // still carries lot of baggage from ACLK Legacy +int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes) +{ int rc = 1; - BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); - https_req_t req = HTTPS_REQ_T_INITIALIZER; https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; - char *agent_id = is_agent_claimed(); - if (agent_id == NULL) - { - error("Agent was not claimed - cannot perform challenge/response"); - goto cleanup; - } + BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); - // GET Challenge req.host = target->host; req.port = target->port; buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id); - req.url = url->buffer; + req.url = (char *)buffer_tostring(url); if (aclk_https_request(&req, &resp)) { error ("ACLK_OTP Challenge failed"); - goto cleanup; + buffer_free(url); + return 1; } if (resp.http_code != 200) { error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code); + buffer_free(url); if (resp.payload_size) aclk_parse_otp_error(resp.payload); goto cleanup_resp; } - info ("ACLK_OTP Got Challenge from Cloud"); + buffer_free(url); - struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; + info ("ACLK_OTP Got Challenge from Cloud"); - if (json_parse(resp.payload, &challenge, json_extract_singleton) != JSON_OK) - { - freez(challenge.result); - error("Could not parse the the challenge"); + json_object *json = json_tokener_parse(resp.payload); + if (!json) { + error ("Couldn't parse HTTP GET challenge payload"); goto cleanup_resp; } - if (challenge.result == NULL) { - error("Could not retrieve challenge JSON key from challenge response"); - goto cleanup_resp; + json_object *challenge_json; + if (!json_object_object_get_ex(json, "challenge", &challenge_json)) { + error ("No key named \"challenge\" in the returned JSON"); + goto cleanup_json; + } + if (!json_object_is_type(challenge_json, json_type_string)) { + error ("\"challenge\" is not a string JSON type"); + goto cleanup_json; + } + const char *challenge_base64; + if (!(challenge_base64 = json_object_get_string(challenge_json))) { + error("Failed to extract challenge from JSON object"); + goto cleanup_json; + } + if (strlen(challenge_base64) != CHALLENGE_LEN_BASE64) { + error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64); + goto cleanup_json; } - // Decrypt the Challenge and Calculate Response - size_t challenge_len = strlen(challenge.result); - unsigned char decoded[512]; - size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); - freez(challenge.result); - - unsigned char plaintext[4096]={}; - int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext); - char encoded[512]; - size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); - encoded[encoded_len] = 0; - debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); - - char response_json[4096]={}; - sprintf(response_json, "{\"response\":\"%s\"}", encoded); - debug(D_ACLK, "Password phase: %s",response_json); + *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3); + base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64)); + if (*challenge_bytes != CHALLENGE_LEN) { + error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); + freez(challenge); + *challenge = NULL; + goto cleanup_json; + } + rc = 0; +cleanup_json: + json_object_put(json); +cleanup_resp: https_req_response_free(&resp); - https_req_response_init(&resp); + return rc; +} - // POST password +int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth) +{ + int len; + int rc = 1; + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + + req.host = target->host; + req.port = target->port; req.request_type = HTTP_REQ_POST; - buffer_flush(url); + + unsigned char base64[CHALLENGE_LEN_BASE64 + 1]; + memset(base64, 0, CHALLENGE_LEN_BASE64 + 1); + + base64_encode_helper(base64, &len, response, response_bytes); + + BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id); - req.url = url->buffer; - req.payload = response_json; - req.payload_size = strlen(response_json); + buffer_sprintf(resp_json, "{\"response\":\"%s\"}", base64); + + req.url = (char *)buffer_tostring(url); + req.payload = (char *)buffer_tostring(resp_json); + req.payload_size = strlen(req.payload); if (aclk_https_request(&req, &resp)) { error ("ACLK_OTP Password error trying to post result to password"); - goto cleanup; + goto cleanup_buffers; } if (resp.http_code != 201) { error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code); if (resp.payload_size) aclk_parse_otp_error(resp.payload); - goto cleanup_resp; + goto cleanup_response; } info ("ACLK_OTP Got Password from Cloud"); - struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL }; - - if (parse_passwd_response(resp.payload, &data)){ + if (parse_passwd_response(resp.payload, mqtt_auth)){ error("Error parsing response of password endpoint"); - goto cleanup_resp; + goto cleanup_response; + } + + rc = 0; + +cleanup_response: + https_req_response_free(&resp); +cleanup_buffers: + buffer_free(resp_json); + buffer_free(url); + return rc; +} + +static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted) +{ + *decrypted = mallocz(RSA_size(p_key)); + int result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING); + if (result == -1) { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) +{ + unsigned char *challenge; + int challenge_bytes; + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) { + error("Agent was not claimed - cannot perform challenge/response"); + return 1; + } + + // Get Challenge + if (aclk_get_otp_challenge(target, agent_id, &challenge, &challenge_bytes)) { + error("Error getting challenge"); + freez(agent_id); + return 1; + } + + // Decrypt Challenge / Get response + unsigned char *response_plaintext; + int response_plaintext_bytes = private_decrypt(p_key, challenge, challenge_bytes, &response_plaintext); + if (response_plaintext_bytes < 0) { + error ("Couldn't decrypt the challenge received"); + freez(response_plaintext); + freez(challenge); + freez(agent_id); + return 1; + } + freez(challenge); + + // Encode and Send Challenge + struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL }; + if (aclk_send_otp_response(agent_id, response_plaintext, response_plaintext_bytes, target, &data)) { + error("Error getting response"); + freez(response_plaintext); + freez(agent_id); + return 1; } *mqtt_pass = data.passwd; *mqtt_usr = data.username; *mqtt_id = data.client_id; - rc = 0; -cleanup_resp: - https_req_response_free(&resp); -cleanup: + freez(response_plaintext); freez(agent_id); - buffer_free(url); - return rc; + return 0; } #define JSON_KEY_ENC "encoding" @@ -860,6 +832,8 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { } if (resp.http_code != 200) { error("The HTTP code not 200 OK (Got %d)", resp.http_code); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); https_req_response_free(&resp); buffer_free(buf); return 1; diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index ae565931..de970fc3 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -111,6 +111,18 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->tv_in = query->created_tv; now_realtime_timeval(&w->tv_ready); + if (query->timeout) { + int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000); + if (in_queue > query->timeout) { + log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout); + retval = 1; + w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); + goto cleanup; + } + } + + RRDHOST *temp_host = NULL; if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; @@ -125,11 +137,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) query_host = node_id_2_rrdhost(nodeid); if (!query_host) { - error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); - retval = 1; - w->response.code = 404; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); - goto cleanup; + temp_host = sql_create_host_by_uuid(nodeid); + if (!temp_host) { + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); + retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); + goto cleanup; + } } } @@ -150,7 +165,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // execute the query - t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); + t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop"); + free_temporary_host(temp_host); size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; sent = size; @@ -276,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu } #ifdef ENABLE_NEW_CLOUD_PROTOCOL -static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_registration(query_thr->client, &query->data.node_creation); - return 0; -} - -static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_state_update(query_thr->client, &query->data.node_update); - return 0; -} - static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed @@ -308,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = { { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, #ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update }, + { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, + { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, @@ -337,6 +337,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que { for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { if (aclk_query_handlers[i].type == query->type) { + worker_is_busy(i); + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); aclk_query_handlers[i].fnc(query_thr, query); if (aclk_stats_enabled) { @@ -347,6 +349,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que ACLK_STATS_UNLOCK; } aclk_query_free(query); + + worker_is_idle(); return; } } @@ -364,21 +368,33 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) return 0; } +static void worker_aclk_register(void) { + worker_register("ACLKQUERY"); + for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { + worker_register_job_name(i, aclk_query_handlers[i].name); + } +} + /** * Main query processing thread */ void *aclk_query_main_thread(void *ptr) { + worker_aclk_register(); + struct aclk_query_thread *query_thr = ptr; while (!netdata_exit) { aclk_query_process_msgs(query_thr); + worker_is_idle(); QUERY_THREAD_LOCK; if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); QUERY_THREAD_UNLOCK; } + + worker_unregister(); return NULL; } diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 74a89922..2422b01e 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -121,16 +121,7 @@ void aclk_query_free(aclk_query_t query) break; case NODE_STATE_UPDATE: - freez((void*)query->data.node_update.claim_id); - freez((void*)query->data.node_update.node_id); - break; - case REGISTER_NODE: - freez((void*)query->data.node_creation.claim_id); - freez((void*)query->data.node_creation.hostname); - freez((void*)query->data.node_creation.machine_guid); - break; - case CHART_DIMS_UPDATE: case CHART_CONFIG_UPDATED: case CHART_RESET: @@ -139,7 +130,8 @@ void aclk_query_free(aclk_query_t query) case ALARM_LOG_HEALTH: case ALARM_PROVIDE_CFG: case ALARM_SNAPSHOT: - freez(query->data.bin_payload.payload); + if (!use_mqtt_5) + freez(query->data.bin_payload.payload); break; default: diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 88976f9e..0b5ef8fa 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -67,7 +67,7 @@ struct aclk_query { struct timeval created_tv; usec_t created; - + int timeout; aclk_query_t next; // TODO maybe remove? @@ -77,8 +77,6 @@ struct aclk_query { struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; struct aclk_query_chart_add_del chart_add_del; - node_instance_creation_t node_creation; - node_instance_connection_t node_update; struct aclk_bin_payload bin_payload; json_object *alarm_update; } data; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 1f2cb27e..27f1bf2d 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -17,6 +17,7 @@ struct aclk_request { char *callback_topic; char *payload; int version; + int timeout; int min_version; int max_version; }; @@ -57,6 +58,10 @@ static int cloud_to_agent_parse(JSON_ENTRY *e) data->version = e->data.number; break; } + if (!strcmp(e->name, "timeout")) { + data->timeout = e->data.number; + break; + } if (!strcmp(e->name, "min-version")) { data->min_version = e->data.number; break; @@ -160,6 +165,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent // aclk_queue_query takes ownership of data pointer query->callback_topic = cloud_to_agent->callback_topic; + query->timeout = cloud_to_agent->timeout; // for clarity and code readability as when we process the request // it would be strange to get URL from `dedup_id` query->data.http_api_v2.query = query->dedup_id; @@ -271,32 +277,39 @@ int create_node_instance_result(const char *msg, size_t msg_len) update_node_id(&host_id, &node_id); aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded - rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); - rrdhost_aclk_state_unlock(localhost); + node_instance_connection_t node_state_update = { + .hops = 1, + .live = 0, + .queryable = 1, + .session_id = aclk_session_newarch, + .node_id = res.node_id + }; RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); - query->data.node_update.live = 0; - if (host) { // not all host must have RRDHOST struct created for them // if they never connected during runtime of agent if (host == localhost) { - query->data.node_update.live = 1; - query->data.node_update.hops = 0; + node_state_update.live = 1; + node_state_update.hops = 0; } else { netdata_mutex_lock(&host->receiver_lock); - query->data.node_update.live = (host->receiver != NULL); + node_state_update.live = (host->receiver != NULL); netdata_mutex_unlock(&host->receiver_lock); - query->data.node_update.hops = host->system_info->hops; + node_state_update.hops = host->system_info->hops; } } - query->data.node_update.node_id = res.node_id; // aclk_query_free will free it - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; + rrdhost_aclk_state_lock(localhost); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + rrdhost_aclk_state_unlock(localhost); + + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); + freez(res.node_id); freez(res.machine_guid); return 0; } diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index a9f0a923..ca053263 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -364,7 +364,7 @@ void aclk_stats_upd_online(int online) { } #ifdef NETDATA_INTERNAL_CHECKS -static usec_t pub_time[UINT16_MAX]; +static usec_t pub_time[UINT16_MAX + 1] = {0}; void aclk_stats_msg_published(uint16_t id) { ACLK_STATS_LOCK; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 185f5d79..3530dccf 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + else + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); - return 500; + return HTTP_RESP_INTERNAL_SERVER_ERROR; } str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); @@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); #endif */ - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - freez(full_msg); - return 503; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - freez(full_msg); - return 403; + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); + else { + rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { + error("Timeout sending binpacked message"); + freez(full_msg); + return HTTP_RESP_BACKEND_FETCH_FAILED; + } + if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { + error("Message is bigger than allowed maximum"); + freez(full_msg); + return HTTP_RESP_FORBIDDEN; + } } + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_put(msg); switch (rc) { - case 403: + case HTTP_RESP_FORBIDDEN: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); break; - case 500: + case HTTP_RESP_INTERNAL_SERVER_ERROR: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); break; - case 503: + case HTTP_RESP_BACKEND_FETCH_FAILED: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); break; } @@ -452,10 +461,22 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { size_t len; uint16_t pid; + + struct capability agent_capabilities[] = { + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, +#ifdef ENABLE_ML + { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, +#endif + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + update_agent_connection_t conn = { .reachable = (reachable ? 1 : 0), .lwt = 0, - .session_id = aclk_session_newarch + .session_id = aclk_session_newarch, + .capabilities = agent_capabilities }; rrdhost_aclk_state_lock(localhost); @@ -478,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - freez(msg); + if (!use_mqtt_5) + freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; @@ -490,7 +512,8 @@ char *aclk_generate_lwt(size_t *size) { update_agent_connection_t conn = { .reachable = 0, .lwt = 1, - .session_id = aclk_session_newarch + .session_id = aclk_session_newarch, + .capabilities = NULL }; rrdhost_aclk_state_lock(localhost); @@ -509,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } - -void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { - size_t len; - char *msg = generate_node_instance_creation(&len, node_creation); - if (!msg) { - error("Error generating nodeinstance::create::v1::CreateNodeInstance"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); - freez(msg); -} - -void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { - size_t len; - char *msg = generate_node_instance_connection(&len, node_connection); - if (!msg) { - error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); - freez(msg); -} #endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 402f13fb..44281eb6 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -28,9 +28,6 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); // new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); char *aclk_generate_lwt(size_t *size); - -void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation); -void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection); #endif #endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 5576a865..43092546 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -65,17 +65,6 @@ int aclk_env_has_capa(const char *capa) #ifdef ACLK_LOG_CONVERSATION_DIR volatile int aclk_conversation_log_counter = 0; -#if !defined(HAVE_C___ATOMIC) -netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_get_conv_log_next() -{ - int ret; - netdata_mutex_lock(&aclk_conversation_log_mutex); - ret = aclk_conversation_log_counter++; - netdata_mutex_unlock(&aclk_conversation_log_mutex); - return ret; -} -#endif #endif #define ACLK_TOPIC_PREFIX "/agent/" diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 7a720207..fb0492ac 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -99,13 +99,7 @@ void free_topic_cache(void); #ifdef ACLK_LOG_CONVERSATION_DIR extern volatile int aclk_conversation_log_counter; -#if defined(HAVE_C___ATOMIC) #define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST) -#else -extern netdata_mutex_t aclk_conversation_log_mutex; -int aclk_get_conv_log_next(); -#define ACLK_GET_CONV_LOG_NEXT() aclk_get_conv_log_next() -#endif #endif unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max); diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index 5868e5d6..338e512d 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -176,8 +176,10 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) *len = PROTO_COMPAT_MSG_SIZE(le); char *bin = (char*)mallocz(*len); - if (!le.SerializeToArray(bin, *len)) + if (!le.SerializeToArray(bin, *len)) { + freez(bin); return NULL; + } return bin; } diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc new file mode 100644 index 00000000..769806f9 --- /dev/null +++ b/aclk/schema-wrappers/capability.cc @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/aclk/v1/lib.pb.h" + +#include "capability.h" + +void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) { + proto_capa->set_name(c_capa->name); + proto_capa->set_enabled(c_capa->enabled); + proto_capa->set_version(c_capa->version); +} diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h new file mode 100644 index 00000000..9517a871 --- /dev/null +++ b/aclk/schema-wrappers/capability.h @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_CAPABILITY_H +#define ACLK_SCHEMA_CAPABILITY_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct capability { + const char *name; + uint32_t version; + int enabled; +}; + +#ifdef __cplusplus +} + +#include "proto/aclk/v1/lib.pb.h" + +void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa); +#endif + +#endif /* ACLK_SCHEMA_CAPABILITY_H */ diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc index e3bbfe31..7520a460 100644 --- a/aclk/schema-wrappers/connection.cc +++ b/aclk/schema-wrappers/connection.cc @@ -28,6 +28,15 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio timestamp->set_seconds(tv.tv_sec); timestamp->set_nanos(tv.tv_usec * 1000); + if (data->capabilities) { + struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + *len = PROTO_COMPAT_MSG_SIZE(connupd); char *msg = (char*)malloc(*len); if (msg) diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h index 8c223869..fcbe6bd5 100644 --- a/aclk/schema-wrappers/connection.h +++ b/aclk/schema-wrappers/connection.h @@ -3,6 +3,8 @@ #ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H #define ACLK_SCHEMA_WRAPPER_CONNECTION_H +#include "capability.h" + #ifdef __cplusplus extern "C" { #endif @@ -15,6 +17,8 @@ typedef struct { unsigned int lwt:1; + struct capability *capabilities; + // TODO in future optional fields // > 15 optional fields: // How long the system was running until connection (only applicable when reachable=true) diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h index 71e45ef5..190ccb4d 100644 --- a/aclk/schema-wrappers/node_creation.h +++ b/aclk/schema-wrappers/node_creation.h @@ -8,9 +8,9 @@ extern "C" { #endif typedef struct { - const char* claim_id; - const char* machine_guid; - const char* hostname; + char* claim_id; + char* machine_guid; + char* hostname; int32_t hops; } node_instance_creation_t; diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc index f6f15ffb..f6698524 100644 --- a/aclk/schema-wrappers/node_info.cc +++ b/aclk/schema-wrappers/node_info.cc @@ -94,6 +94,24 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in ml_info->set_ml_capable(info->ml_info.ml_capable); ml_info->set_ml_enabled(info->ml_info.ml_enabled); + struct capability *capa; + if (info->node_capabilities) { + capa = info->node_capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.mutable_node_info()->add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + if (info->node_instance_capabilities) { + capa = info->node_instance_capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.mutable_node_instance_info()->add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + *len = PROTO_COMPAT_MSG_SIZE(msg); char *bin = (char*)malloc(*len); if (bin) diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h index 41daf94c..e67f3e1d 100644 --- a/aclk/schema-wrappers/node_info.h +++ b/aclk/schema-wrappers/node_info.h @@ -6,6 +6,7 @@ #include <stdlib.h> #include "database/rrd.h" +#include "capability.h" #ifdef __cplusplus extern "C" { @@ -67,6 +68,9 @@ struct update_node_info { int child; struct machine_learning_info ml_info; + + struct capability *node_capabilities; + struct capability *node_instance_capabilities; }; char *generate_update_node_info_message(size_t *len, struct update_node_info *info); diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a3975fca..a3248a69 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -13,5 +13,6 @@ #include "alarm_config.h" #include "alarm_stream.h" #include "node_info.h" +#include "capability.h" #endif /* SCHEMA_WRAPPERS_H */ |