diff options
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 122 |
1 files changed, 119 insertions, 3 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 144008e4d..237c1bdd2 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -1,14 +1,18 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "aclk_tx_msgs.h" -#include "../daemon/common.h" +#include "daemon/common.h" #include "aclk_util.h" #include "aclk_stats.h" +#include "aclk.h" #ifndef __GNUC__ #pragma region aclk_tx_msgs helper functions #endif +// version for aclk legacy (old cloud arch) +#define ACLK_VERSION 2 + static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) { uint16_t packet_id; @@ -16,7 +20,7 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, const char *topic = aclk_get_topic(subtopic); if (unlikely(!topic)) { - error("Couldn't get topic. Aborting mesage send"); + error("Couldn't get topic. Aborting message send"); return; } @@ -32,6 +36,37 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, #endif } +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ +#ifndef ACLK_LOG_CONVERSATION_DIR + UNUSED(msgname); +#endif + uint16_t packet_id; + const char *topic = aclk_get_topic(subtopic); + + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting message send."); + return 0; + } + + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname); + FILE *fptr; + if (fptr = fopen(filename,"w")) { + fwrite(msg, msg_len, 1, fptr); + fclose(fptr); + } +#endif + + return packet_id; +} + static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) { uint16_t packet_id; @@ -39,7 +74,7 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje const char *topic = aclk_get_topic(subtopic); if (unlikely(!topic)) { - error("Couldn't get topic. Aborting mesage send"); + error("Couldn't get topic. Aborting message send"); return 0; } @@ -368,6 +403,87 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) return pid; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +// new protobuf msgs +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { + size_t len; + uint16_t pid; + update_agent_connection_t conn = { + .reachable = (reachable ? 1 : 0), + .lwt = 0, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return 0; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(&len, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) { + error("Error generating agent::v1::UpdateAgentConnection payload"); + return 0; + } + + pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); + freez(msg); + return pid; +} + +char *aclk_generate_lwt(size_t *size) { + update_agent_connection_t conn = { + .reachable = 0, + .lwt = 1, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return NULL; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(size, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) + error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); + + return msg; +} + +void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { + size_t len; + char *msg = generate_node_instance_creation(&len, node_creation); + if (!msg) { + error("Error generating nodeinstance::create::v1::CreateNodeInstance"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + freez(msg); +} + +void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { + size_t len; + char *msg = generate_node_instance_connection(&len, node_connection); + if (!msg) { + error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + freez(msg); +} +#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ + #ifndef __GNUC__ #pragma endregion #endif |