summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/aclk.c100
1 files changed, 66 insertions, 34 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 599b9a093..6426c5b5e 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -12,6 +12,7 @@
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
#include "https_client.h"
+#include "schema-wrappers/schema_wrappers.h"
#include "aclk_proxy.h"
@@ -34,7 +35,7 @@ time_t last_disconnect_time = 0;
time_t next_connection_attempt = 0;
float last_backoff_value = 0;
-int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
+int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
time_t aclk_block_until = 0;
@@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
case MQTT_WSS_LOG_WARN:
- error("%s", str);
+ error_report("%s", str);
return;
case MQTT_WSS_LOG_INFO:
info("%s", str);
@@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+ char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
error("Unable to fetch topic for COMMAND (to subscribe)");
@@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
- topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
@@ -800,10 +801,12 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
+ use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
#else
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
@@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
aclk_queue_query(query);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
@@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = host->hostname,
+ .machine_guid = host->machine_guid
+ };
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
- create_query->data.node_creation.hostname = strdupz(host->hostname);
- create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
aclk_queue_query(create_query);
return;
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = (uint32_t) host->system_info->hops;
+ node_instance_connection_t node_state_update = {
+ .hops = host->system_info->hops,
+ .live = cmd,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = cmd;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
+
+ info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
}
@@ -1096,39 +1112,52 @@ void aclk_send_node_instances()
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .live = list->live,
+ .hops = list->hops,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = list->live;
- query->data.node_update.hops = list->hops;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- freez(list->hostname);
- info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
+ info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
+ node_instance_creation_t node_instance_creation = {
+ .hops = list->hops,
+ .hostname = list->hostname,
+ };
+ node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = list->hops;
- create_query->data.node_creation.hostname = list->hostname;
- create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
- info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
+ info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
+ freez(node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
+ freez(list->hostname);
list++;
}
freez(list_head);
}
+#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
@@ -1208,7 +1237,7 @@ char *ng_aclk_state(void)
"Protocols Supported: Legacy\n"
#endif
);
- buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
+ buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
@@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ json_object_object_add(msg, "mqtt-version", tmp);
+
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
json_object_object_add(msg, "received-app-layer-msgs", tmp);