summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-01-26 18:05:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-01-26 18:05:10 +0000
commit34a0b66bc2d48223748ed1cf5bc1b305c396bd74 (patch)
treefbd36be86cc6bc4288fe627f2b5beada569848bb /aclk/aclk_tx_msgs.c
parentAdding upstream version 1.32.1. (diff)
downloadnetdata-34a0b66bc2d48223748ed1cf5bc1b305c396bd74.tar.xz
netdata-34a0b66bc2d48223748ed1cf5bc1b305c396bd74.zip
Adding upstream version 1.33.0.upstream/1.33.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c17
1 files changed, 14 insertions, 3 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 237c1bdd2..74fc19c72 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -147,7 +147,11 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- mqtt_wss_publish_pid(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT)
+ error("Timeout sending binpacked message");
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL)
+ error("Message is bigger than allowed maximum");
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -188,7 +192,7 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time
// TODO handle this somehow on older json-c
// tmp = json_object_new_uint64(ts_us);
-// probably jso->_to_json_strinf -> custom function
+// probably jso->_to_json_string -> custom function
// jso->o.c_uint64 -> map this with pointer to signed int
// commit that implements json_object_new_uint64 is 3c3b592
// between 0.14 and 0.15
@@ -420,7 +424,10 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
rrdhost_aclk_state_unlock(localhost);
return 0;
}
- conn.claim_id = localhost->aclk_state.claimed_id;
+ if (localhost->aclk_state.prev_claimed_id)
+ conn.claim_id = localhost->aclk_state.prev_claimed_id;
+ else
+ conn.claim_id = localhost->aclk_state.claimed_id;
char *msg = generate_update_agent_connection(&len, &conn);
rrdhost_aclk_state_unlock(localhost);
@@ -432,6 +439,10 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
freez(msg);
+ if (localhost->aclk_state.prev_claimed_id) {
+ freez(localhost->aclk_state.prev_claimed_id);
+ localhost->aclk_state.prev_claimed_id = NULL;
+ }
return pid;
}