diff options
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 94 |
1 files changed, 26 insertions, 68 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 822a90fa2..532b964ad 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_util.h" #include "aclk_stats.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -15,6 +16,13 @@ // version for aclk legacy (old cloud arch) #define ACLK_VERSION 2 +static void freez_aclk_publish5a(void *ptr) { + freez(ptr); +} +static void freez_aclk_publish5b(void *ptr) { + freez(ptr); +} + uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR @@ -28,43 +36,27 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); - else - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); - char *json = protomsg_to_json(msg, msg_len, msgname); - log_aclk_message_bin(json, strlen(json), 1, topic, msgname); - freez(json); #endif + if (aclklog_enabled) { + char *json = protomsg_to_json(msg, msg_len, msgname); + log_aclk_message_bin(json, strlen(json), 1, topic, msgname); + freez(json); + } + return packet_id; } -/* UNUSED now but can be used soon MVP1? -static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic) +// json_object_put returns int unfortunately :D +// we need void(*fnc)(void *); +static void json_object_put_wrapper(void *jsonobj) { - if (unlikely(!topic || topic[0] != '/')) { - error ("Full topic required!"); - return; - } - - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - - mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif + json_object_put(jsonobj); } -*/ #define TOPIC_MAX_LEN 512 #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" @@ -73,10 +65,11 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec uint16_t packet_id; const char *str; char *full_msg = NULL; - int len, rc; + int len; if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); + json_object_put(msg); return HTTP_RESP_INTERNAL_SERVER_ERROR; } @@ -87,40 +80,20 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); memcpy(full_msg, str, len); + json_object_put(msg); + msg = NULL; memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); len += strlen(V2_BIN_PAYLOAD_SEPARATOR); memcpy(&full_msg[len], payload, payload_len); len += payload_len; } -/* TODO -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif */ - - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); - else { - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - freez(full_msg); - return HTTP_RESP_BACKEND_FETCH_FAILED; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - freez(full_msg); - return HTTP_RESP_FORBIDDEN; - } - } + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif - freez(full_msg); + return 0; } @@ -203,7 +176,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { error("Failed to send cancelation message for http reply"); } - json_object_put(msg); } void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) @@ -222,7 +194,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_object_add(msg, "http-code", tmp); int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); - json_object_put(msg); switch (rc) { case HTTP_RESP_FORBIDDEN: @@ -241,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable size_t len; uint16_t pid; - struct capability agent_capabilities[] = { - { .name = "json", .version = 2, .enabled = 0 }, - { .name = "proto", .version = 1, .enabled = 1 }, -#ifdef ENABLE_ML - { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, -#endif - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - update_agent_connection_t conn = { .reachable = (reachable ? 1 : 0), .lwt = 0, .session_id = aclk_session_newarch, - .capabilities = agent_capabilities + .capabilities = aclk_get_agent_capas() }; rrdhost_aclk_state_lock(localhost); @@ -279,8 +239,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - if (!use_mqtt_5) - freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; |