summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c31
1 files changed, 10 insertions, 21 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index e80897221..399bc9876 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -49,8 +49,6 @@ float last_backoff_value = 0;
time_t aclk_block_until = 0;
-int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
-
#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
@@ -249,14 +247,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
}
}
-//TODO prevent big buffer on stack
-#define RX_MSGLEN_MAX 4096
static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
aclk_rcvd_cloud_msgs++;
- if (msglen > RX_MSGLEN_MAX)
- error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
@@ -870,6 +864,11 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->host_id, host_id);
RRDHOST *host = rrdhost_find_by_guid(host_id);
+ if (unlikely(!host)) {
+ freez((void*)node_state_update.node_id);
+ freez(query);
+ continue;
+ }
node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
@@ -927,14 +926,10 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
}
buffer_sprintf(wb,
"\n\t\tUpdates: %d"
- "\n\t\tBatch ID: %"PRIu64
- "\n\t\tLast Acked Seq ID: %"PRIu64
"\n\t\tPending Min Seq ID: %"PRIu64
"\n\t\tPending Max Seq ID: %"PRIu64
"\n\t\tLast Submitted Seq ID: %"PRIu64,
status.alert_updates,
- status.alerts_batch_id,
- status.last_acked_sequence_id,
status.pending_min_sequence_id,
status.pending_max_sequence_id,
status.last_submitted_sequence_id
@@ -1042,12 +1037,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
json_object *tmp = json_object_new_int(status.alert_updates);
json_object_object_add(obj, "updates", tmp);
- tmp = json_object_new_int(status.alerts_batch_id);
- json_object_object_add(obj, "batch-id", tmp);
-
- tmp = json_object_new_int(status.last_acked_sequence_id);
- json_object_object_add(obj, "last-acked-seq-id", tmp);
-
tmp = json_object_new_int(status.pending_min_sequence_id);
json_object_object_add(obj, "pending-min-seq-id", tmp);
@@ -1216,9 +1205,9 @@ void add_aclk_host_labels(void) {
#endif
}
-void aclk_queue_node_info(RRDHOST *host) {
- struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
- if (likely(wc)) {
- wc->node_info_send = 1;
- }
+void aclk_queue_node_info(RRDHOST *host, bool immediate)
+{
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
+ if (likely(wc))
+ wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
}