summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c94
1 files changed, 26 insertions, 68 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 822a90fa2..532b964ad 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_util.h"
#include "aclk_stats.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -15,6 +16,13 @@
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
+static void freez_aclk_publish5a(void *ptr) {
+ freez(ptr);
+}
+static void freez_aclk_publish5b(void *ptr) {
+ freez(ptr);
+}
+
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
@@ -28,43 +36,27 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
- else
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
- char *json = protomsg_to_json(msg, msg_len, msgname);
- log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
- freez(json);
#endif
+ if (aclklog_enabled) {
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
+ }
+
return packet_id;
}
-/* UNUSED now but can be used soon MVP1?
-static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
+// json_object_put returns int unfortunately :D
+// we need void(*fnc)(void *);
+static void json_object_put_wrapper(void *jsonobj)
{
- if (unlikely(!topic || topic[0] != '/')) {
- error ("Full topic required!");
- return;
- }
-
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
-
- mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published();
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
+ json_object_put(jsonobj);
}
-*/
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
@@ -73,10 +65,11 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
uint16_t packet_id;
const char *str;
char *full_msg = NULL;
- int len, rc;
+ int len;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
+ json_object_put(msg);
return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
@@ -87,40 +80,20 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
memcpy(full_msg, str, len);
+ json_object_put(msg);
+ msg = NULL;
memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
memcpy(&full_msg[len], payload, payload_len);
len += payload_len;
}
-/* TODO
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif */
-
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
- else {
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return HTTP_RESP_BACKEND_FETCH_FAILED;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return HTTP_RESP_FORBIDDEN;
- }
- }
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
- freez(full_msg);
+
return 0;
}
@@ -203,7 +176,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char
if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
error("Failed to send cancelation message for http reply");
}
- json_object_put(msg);
}
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
@@ -222,7 +194,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_object_add(msg, "http-code", tmp);
int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
- json_object_put(msg);
switch (rc) {
case HTTP_RESP_FORBIDDEN:
@@ -241,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
size_t len;
uint16_t pid;
- struct capability agent_capabilities[] = {
- { .name = "json", .version = 2, .enabled = 0 },
- { .name = "proto", .version = 1, .enabled = 1 },
-#ifdef ENABLE_ML
- { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
-#endif
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
-
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
.session_id = aclk_session_newarch,
- .capabilities = agent_capabilities
+ .capabilities = aclk_get_agent_capas()
};
rrdhost_aclk_state_lock(localhost);
@@ -279,8 +239,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- if (!use_mqtt_5)
- freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;