summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c276
1 files changed, 276 insertions, 0 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
new file mode 100644
index 0000000..532b964
--- /dev/null
+++ b/aclk/aclk_tx_msgs.c
@@ -0,0 +1,276 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_tx_msgs.h"
+#include "daemon/common.h"
+#include "aclk_util.h"
+#include "aclk_stats.h"
+#include "aclk.h"
+#include "aclk_capas.h"
+
+#include "schema-wrappers/proto_2_json.h"
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs helper functions
+#endif
+
+// version for aclk legacy (old cloud arch)
+#define ACLK_VERSION 2
+
+static void freez_aclk_publish5a(void *ptr) {
+ freez(ptr);
+}
+static void freez_aclk_publish5b(void *ptr) {
+ freez(ptr);
+}
+
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+#ifndef ACLK_LOG_CONVERSATION_DIR
+ UNUSED(msgname);
+#endif
+ uint16_t packet_id;
+ const char *topic = aclk_get_topic(subtopic);
+
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting message send.");
+ return 0;
+ }
+
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+
+ if (aclklog_enabled) {
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
+ }
+
+ return packet_id;
+}
+
+// json_object_put returns int unfortunately :D
+// we need void(*fnc)(void *);
+static void json_object_put_wrapper(void *jsonobj)
+{
+ json_object_put(jsonobj);
+}
+
+#define TOPIC_MAX_LEN 512
+#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
+{
+ uint16_t packet_id;
+ const char *str;
+ char *full_msg = NULL;
+ int len;
+
+ if (unlikely(!topic || topic[0] != '/')) {
+ error ("Full topic required!");
+ json_object_put(msg);
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ }
+
+ str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ len = strlen(str);
+
+ if (payload_len) {
+ full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
+
+ memcpy(full_msg, str, len);
+ json_object_put(msg);
+ msg = NULL;
+ memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
+ len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
+ memcpy(&full_msg[len], payload, payload_len);
+ len += payload_len;
+ }
+
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+
+ return 0;
+}
+
+/*
+ * Creates universal header common for all ACLK messages. User gets ownership of json object created.
+ * Usually this is freed by send function after message has been sent.
+ */
+static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+ json_object *tmp;
+ json_object *obj = json_object_new_object();
+
+ tmp = json_object_new_string(type);
+ json_object_object_add(obj, "type", tmp);
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
+
+ tmp = json_object_new_string(msg_id);
+ json_object_object_add(obj, "msg-id", tmp);
+
+ tmp = json_object_new_int64(ts_secs);
+ json_object_object_add(obj, "timestamp", tmp);
+
+// TODO handle this somehow on older json-c
+// tmp = json_object_new_uint64(ts_us);
+// probably jso->_to_json_string -> custom function
+// jso->o.c_uint64 -> map this with pointer to signed int
+// commit that implements json_object_new_uint64 is 3c3b592
+// between 0.14 and 0.15
+ tmp = json_object_new_int64(ts_us);
+ json_object_object_add(obj, "timestamp-offset-usec", tmp);
+
+ tmp = json_object_new_int64(aclk_session_sec);
+ json_object_object_add(obj, "connect", tmp);
+
+// TODO handle this somehow see above
+// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
+ tmp = json_object_new_int64(aclk_session_us);
+ json_object_object_add(obj, "connect-offset-usec", tmp);
+
+ tmp = json_object_new_int(version);
+ json_object_object_add(obj, "version", tmp);
+
+ return obj;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs message generators
+#endif
+
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ tmp = json_object_new_int(ec);
+ json_object_object_add(msg, "error-code", tmp);
+
+ tmp = json_object_new_string(emsg);
+ json_object_object_add(msg, "error-description", tmp);
+
+ if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
+ error("Failed to send cancelation message for http reply");
+ }
+}
+
+void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+
+ tmp = json_object_new_int64(t_exec);
+ json_object_object_add(msg, "t-exec", tmp);
+
+ tmp = json_object_new_int64(created);
+ json_object_object_add(msg, "t-rx", tmp);
+
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
+
+ switch (rc) {
+ case HTTP_RESP_FORBIDDEN:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
+ break;
+ case HTTP_RESP_INTERNAL_SERVER_ERROR:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
+ break;
+ case HTTP_RESP_BACKEND_FETCH_FAILED:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
+ break;
+ }
+}
+
+uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
+ size_t len;
+ uint16_t pid;
+
+ update_agent_connection_t conn = {
+ .reachable = (reachable ? 1 : 0),
+ .lwt = 0,
+ .session_id = aclk_session_newarch,
+ .capabilities = aclk_get_agent_capas()
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return 0;
+ }
+ if (localhost->aclk_state.prev_claimed_id)
+ conn.claim_id = localhost->aclk_state.prev_claimed_id;
+ else
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(&len, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg) {
+ error("Error generating agent::v1::UpdateAgentConnection payload");
+ return 0;
+ }
+
+ pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
+ if (localhost->aclk_state.prev_claimed_id) {
+ freez(localhost->aclk_state.prev_claimed_id);
+ localhost->aclk_state.prev_claimed_id = NULL;
+ }
+ return pid;
+}
+
+char *aclk_generate_lwt(size_t *size) {
+ update_agent_connection_t conn = {
+ .reachable = 0,
+ .lwt = 1,
+ .session_id = aclk_session_newarch,
+ .capabilities = NULL
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return NULL;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(size, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg)
+ error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
+
+ return msg;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif