diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 100 |
1 files changed, 66 insertions, 34 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 599b9a093..6426c5b5e 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -12,6 +12,7 @@ #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" #include "https_client.h" +#include "schema-wrappers/schema_wrappers.h" #include "aclk_proxy.h" @@ -34,7 +35,7 @@ time_t last_disconnect_time = 0; time_t next_connection_attempt = 0; float last_backoff_value = 0; -int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload +int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload time_t aclk_block_until = 0; @@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) case MQTT_WSS_LOG_ERROR: case MQTT_WSS_LOG_FATAL: case MQTT_WSS_LOG_WARN: - error("%s", str); + error_report("%s", str); return; case MQTT_WSS_LOG_INFO: info("%s", str); @@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) error("Unable to fetch topic for COMMAND (to subscribe)"); @@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) { - topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); else @@ -800,10 +801,12 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; + use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + #ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { #else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { #endif error("Couldn't initialize MQTT_WSS network library"); goto exit; @@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu aclk_queue_query(query); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; @@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd) aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = host->hostname, + .machine_guid = host->machine_guid + }; + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = (uint32_t) host->system_info->hops; - create_query->data.node_creation.hostname = strdupz(host->hostname); - create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); aclk_queue_query(create_query); return; } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = (uint32_t) host->system_info->hops; + node_instance_connection_t node_state_update = { + .hops = host->system_info->hops, + .live = cmd, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = cmd; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd, + + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } @@ -1096,39 +1112,52 @@ void aclk_send_node_instances() while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .live = list->live, + .hops = list->hops, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = list->live; - query->data.node_update.hops = list->hops; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - freez(list->hostname); - info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } else { aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); + node_instance_creation_t node_instance_creation = { + .hops = list->hops, + .hostname = list->hostname, + }; + node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = list->hops; - create_query->data.node_creation.hostname = list->hostname; - create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); - info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid, + info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); + freez(node_instance_creation.machine_guid); aclk_queue_query(create_query); } + freez(list->hostname); list++; } freez(list_head); } +#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { @@ -1208,7 +1237,7 @@ char *ng_aclk_state(void) "Protocols Supported: Legacy\n" #endif ); - buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + json_object_object_add(msg, "mqtt-version", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); json_object_object_add(msg, "received-app-layer-msgs", tmp); |