diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
commit | 3c315f0fff93aa072472abc10815963ac0035268 (patch) | |
tree | a95f6a96e0e7bd139c010f8dc60b40e5b3062a99 /aclk/aclk_tx_msgs.c | |
parent | Adding upstream version 1.35.1. (diff) | |
download | netdata-3c315f0fff93aa072472abc10815963ac0035268.tar.xz netdata-3c315f0fff93aa072472abc10815963ac0035268.zip |
Adding upstream version 1.36.0.upstream/1.36.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 233 |
1 files changed, 6 insertions, 227 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 3530dccff..822a90fa2 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -6,6 +6,8 @@ #include "aclk_stats.h" #include "aclk.h" +#include "schema-wrappers/proto_2_json.h" + #ifndef __GNUC__ #pragma region aclk_tx_msgs helper functions #endif @@ -13,29 +15,6 @@ // version for aclk legacy (old cloud arch) #define ACLK_VERSION 2 -static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return; - } - - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif -} - uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR @@ -56,42 +35,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); + char *json = protomsg_to_json(msg, msg_len, msgname); + log_aclk_message_bin(json, strlen(json), 1, topic, msgname); + freez(json); #endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname); - FILE *fptr; - if (fptr = fopen(filename,"w")) { - fwrite(msg, msg_len, 1, fptr); - fclose(fptr); - } -#endif - - return packet_id; -} - -static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return 0; - } - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif return packet_id; } @@ -231,17 +179,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time return obj; } -static char *create_uuid() -{ - uuid_t uuid; - char *uuid_str = mallocz(36 + 1); - - uuid_generate(uuid); - uuid_unparse(uuid, uuid_str); - - return uuid_str; -} - #ifndef __GNUC__ #pragma endregion #endif @@ -250,90 +187,6 @@ static char *create_uuid() #pragma region aclk_tx_msgs message generators #endif -/* - * This will send the /api/v1/info - */ -#define BUFFER_INITIAL_SIZE (1024 * 16) -void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - web_client_api_request_v1_info_fill_buffer(host, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "info", tmp); - - buffer_flush(local_buffer); - - charts2json(host, local_buffer, 1, 0); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "charts", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - -// TODO should include header instead -void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); - -void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - - if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - health_alarms2json(localhost, local_buffer, 1); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "configured-alarms", tmp); - - buffer_flush(local_buffer); - - health_active_log_alarms_2json(localhost, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "alarms-active", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -384,80 +237,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg } } -void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) -{ - json_object *msg, *payload; - BUFFER *tmp_buffer; - RRDSET *st; - - st = rrdset_find(host, chart); - if (!st) - st = rrdset_find_byname(host, chart); - if (!st) { - info("FAILED to find chart %s", chart); - return; - } - - tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE); - rrdset2json(st, tmp_buffer, NULL, NULL, 1); - payload = json_tokener_parse(tmp_buffer->buffer); - if (!payload) { - error("Failed to parse JSON from rrdset2json"); - buffer_free(tmp_buffer); - return; - } - - msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(msg, "payload", payload); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); - - buffer_free(tmp_buffer); - json_object_put(msg); -} - -void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) -{ - // we create header here on purpose (and not send message with it already as `msg` param) - // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy - // send message with timestamps already to Query Queue they would be incorrect at time - // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(obj, "payload", msg); - - aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); - json_object_put(obj); -} - -/* - * Will generate disconnect message. - * @param message if NULL it will generate LWT message (unexpected). - * Otherwise string pointed to by this parameter will be used as - * reason. - */ -json_object *aclk_generate_disconnect(const char *message) -{ - json_object *tmp, *msg; - - msg = create_hdr("disconnect", NULL, 0, 0, 2); - - tmp = json_object_new_string(message ? message : "unexpected"); - json_object_object_add(msg, "payload", tmp); - - return msg; -} - -int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) -{ - int pid; - json_object *msg = aclk_generate_disconnect(message); - pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA); - json_object_put(msg); - return pid; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -// new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { size_t len; uint16_t pid; @@ -469,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, #endif { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, { .name = NULL, .version = 0, .enabled = 0 } }; @@ -532,7 +312,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ #pragma endregion |