diff options
Diffstat (limited to '')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 64 |
1 files changed, 21 insertions, 43 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 158fc4e26..144008e4d 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -13,8 +13,14 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, { uint16_t packet_id; const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + const char *topic = aclk_get_topic(subtopic); - mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting mesage send"); + return; + } + + mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -30,8 +36,14 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje { uint16_t packet_id; const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + const char *topic = aclk_get_topic(subtopic); - mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting mesage send"); + return 0; + } + + mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -199,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -241,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) // session. if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -265,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) buffer_free(local_buffer); } -void aclk_hello_msg(mqtt_wss_client client) -{ - json_object *tmp, *msg; - - char *msg_id = create_uuid(); - - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; - - //Hello message is versioned separatelly from the rest of the protocol - msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); - - tmp = json_object_new_int(ACLK_VERSION_MIN); - json_object_object_add(msg, "min-version", tmp); - - tmp = json_object_new_int(ACLK_VERSION_MAX); - json_object_object_add(msg, "max-version", tmp); - -#ifdef ACLK_NG - tmp = json_object_new_string("Next Generation"); -#else - tmp = json_object_new_string("Legacy"); -#endif - json_object_object_add(msg, "aclk-implementation", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); -} - void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -340,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) return; } - msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); json_object_object_add(msg, "payload", payload); aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); @@ -352,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) { // we create header here on purpose (and not send message with it already as `msg` param) - // one is version_neg is guaranteed to be done here - // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy + // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy // send message with timestamps already to Query Queue they would be incorrect at time // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg); + json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); json_object_object_add(obj, "payload", msg); aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); |