summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c233
1 files changed, 6 insertions, 227 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 3530dccff..822a90fa2 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -6,6 +6,8 @@
#include "aclk_stats.h"
#include "aclk.h"
+#include "schema-wrappers/proto_2_json.h"
+
#ifndef __GNUC__
#pragma region aclk_tx_msgs helper functions
#endif
@@ -13,29 +15,6 @@
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
-static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
-{
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
-
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return;
- }
-
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
-}
-
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
@@ -56,42 +35,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
- FILE *fptr;
- if (fptr = fopen(filename,"w")) {
- fwrite(msg, msg_len, 1, fptr);
- fclose(fptr);
- }
-#endif
-
- return packet_id;
-}
-
-static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
-{
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
-
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return 0;
- }
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
return packet_id;
}
@@ -231,17 +179,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time
return obj;
}
-static char *create_uuid()
-{
- uuid_t uuid;
- char *uuid_str = mallocz(36 + 1);
-
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
-
- return uuid_str;
-}
-
#ifndef __GNUC__
#pragma endregion
#endif
@@ -250,90 +187,6 @@ static char *create_uuid()
#pragma region aclk_tx_msgs message generators
#endif
-/*
- * This will send the /api/v1/info
- */
-#define BUFFER_INITIAL_SIZE (1024 * 16)
-void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
-{
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
- if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
-
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
-
- web_client_api_request_v1_info_fill_buffer(host, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "info", tmp);
-
- buffer_flush(local_buffer);
-
- charts2json(host, local_buffer, 1, 0);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "charts", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
-
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
-}
-
-// TODO should include header instead
-void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-
-void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
-{
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
-
- if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
-
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
-
- health_alarms2json(localhost, local_buffer, 1);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "configured-alarms", tmp);
-
- buffer_flush(local_buffer);
-
- health_active_log_alarms_2json(localhost, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "alarms-active", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
-
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
-}
-
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -384,80 +237,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
}
}
-void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
-{
- json_object *msg, *payload;
- BUFFER *tmp_buffer;
- RRDSET *st;
-
- st = rrdset_find(host, chart);
- if (!st)
- st = rrdset_find_byname(host, chart);
- if (!st) {
- info("FAILED to find chart %s", chart);
- return;
- }
-
- tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- rrdset2json(st, tmp_buffer, NULL, NULL, 1);
- payload = json_tokener_parse(tmp_buffer->buffer);
- if (!payload) {
- error("Failed to parse JSON from rrdset2json");
- buffer_free(tmp_buffer);
- return;
- }
-
- msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(msg, "payload", payload);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
-
- buffer_free(tmp_buffer);
- json_object_put(msg);
-}
-
-void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
-{
- // we create header here on purpose (and not send message with it already as `msg` param)
- // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
- // send message with timestamps already to Query Queue they would be incorrect at time
- // when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(obj, "payload", msg);
-
- aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
- json_object_put(obj);
-}
-
-/*
- * Will generate disconnect message.
- * @param message if NULL it will generate LWT message (unexpected).
- * Otherwise string pointed to by this parameter will be used as
- * reason.
- */
-json_object *aclk_generate_disconnect(const char *message)
-{
- json_object *tmp, *msg;
-
- msg = create_hdr("disconnect", NULL, 0, 0, 2);
-
- tmp = json_object_new_string(message ? message : "unexpected");
- json_object_object_add(msg, "payload", tmp);
-
- return msg;
-}
-
-int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
-{
- int pid;
- json_object *msg = aclk_generate_disconnect(message);
- pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
- json_object_put(msg);
- return pid;
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
uint16_t pid;
@@ -469,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
{ .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
#endif
{ .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
{ .name = NULL, .version = 0, .enabled = 0 }
};
@@ -532,7 +312,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
-#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
#pragma endregion