summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:04 +0000
commite970e0b37b8bd7f246feb3f70c4136418225e434 (patch)
tree0b67c0ca45f56f2f9d9c5c2e725279ecdf52d2eb /aclk/aclk_tx_msgs.c
parentAdding upstream version 1.31.0. (diff)
downloadnetdata-e970e0b37b8bd7f246feb3f70c4136418225e434.tar.xz
netdata-e970e0b37b8bd7f246feb3f70c4136418225e434.zip
Adding upstream version 1.32.0.upstream/1.32.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c122
1 files changed, 119 insertions, 3 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 144008e4d..237c1bdd2 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -1,14 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_tx_msgs.h"
-#include "../daemon/common.h"
+#include "daemon/common.h"
#include "aclk_util.h"
#include "aclk_stats.h"
+#include "aclk.h"
#ifndef __GNUC__
#pragma region aclk_tx_msgs helper functions
#endif
+// version for aclk legacy (old cloud arch)
+#define ACLK_VERSION 2
+
static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
@@ -16,7 +20,7 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
const char *topic = aclk_get_topic(subtopic);
if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting mesage send");
+ error("Couldn't get topic. Aborting message send");
return;
}
@@ -32,6 +36,37 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
#endif
}
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+#ifndef ACLK_LOG_CONVERSATION_DIR
+ UNUSED(msgname);
+#endif
+ uint16_t packet_id;
+ const char *topic = aclk_get_topic(subtopic);
+
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting message send.");
+ return 0;
+ }
+
+ mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
+ FILE *fptr;
+ if (fptr = fopen(filename,"w")) {
+ fwrite(msg, msg_len, 1, fptr);
+ fclose(fptr);
+ }
+#endif
+
+ return packet_id;
+}
+
static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
@@ -39,7 +74,7 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje
const char *topic = aclk_get_topic(subtopic);
if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting mesage send");
+ error("Couldn't get topic. Aborting message send");
return 0;
}
@@ -368,6 +403,87 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
return pid;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+// new protobuf msgs
+uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
+ size_t len;
+ uint16_t pid;
+ update_agent_connection_t conn = {
+ .reachable = (reachable ? 1 : 0),
+ .lwt = 0,
+ .session_id = aclk_session_newarch
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return 0;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(&len, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg) {
+ error("Error generating agent::v1::UpdateAgentConnection payload");
+ return 0;
+ }
+
+ pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
+ freez(msg);
+ return pid;
+}
+
+char *aclk_generate_lwt(size_t *size) {
+ update_agent_connection_t conn = {
+ .reachable = 0,
+ .lwt = 1,
+ .session_id = aclk_session_newarch
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return NULL;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(size, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg)
+ error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
+
+ return msg;
+}
+
+void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
+ size_t len;
+ char *msg = generate_node_instance_creation(&len, node_creation);
+ if (!msg) {
+ error("Error generating nodeinstance::create::v1::CreateNodeInstance");
+ return;
+ }
+
+ aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
+ freez(msg);
+}
+
+void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
+ size_t len;
+ char *msg = generate_node_instance_connection(&len, node_connection);
+ if (!msg) {
+ error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
+ return;
+ }
+
+ aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ freez(msg);
+}
+#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
+
#ifndef __GNUC__
#pragma endregion
#endif