summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c64
1 files changed, 21 insertions, 43 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 158fc4e26..144008e4d 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -13,8 +13,14 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ const char *topic = aclk_get_topic(subtopic);
- mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting mesage send");
+ return;
+ }
+
+ mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -30,8 +36,14 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ const char *topic = aclk_get_topic(subtopic);
- mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting mesage send");
+ return 0;
+ }
+
+ mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -199,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -241,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
// session.
if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -265,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
buffer_free(local_buffer);
}
-void aclk_hello_msg(mqtt_wss_client client)
-{
- json_object *tmp, *msg;
-
- char *msg_id = create_uuid();
-
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- ACLK_SHARED_STATE_UNLOCK;
-
- //Hello message is versioned separatelly from the rest of the protocol
- msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
-
- tmp = json_object_new_int(ACLK_VERSION_MIN);
- json_object_object_add(msg, "min-version", tmp);
-
- tmp = json_object_new_int(ACLK_VERSION_MAX);
- json_object_object_add(msg, "max-version", tmp);
-
-#ifdef ACLK_NG
- tmp = json_object_new_string("Next Generation");
-#else
- tmp = json_object_new_string("Legacy");
-#endif
- json_object_object_add(msg, "aclk-implementation", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
-
- json_object_put(msg);
- freez(msg_id);
-}
-
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -340,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
return;
}
- msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(msg, "payload", payload);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
@@ -352,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
{
// we create header here on purpose (and not send message with it already as `msg` param)
- // one is version_neg is guaranteed to be done here
- // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
+ // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
// send message with timestamps already to Query Queue they would be incorrect at time
// when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg);
+ json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(obj, "payload", msg);
aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);