summaryrefslogtreecommitdiffstats
path: root/src/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/aclk.c')
-rw-r--r--src/aclk/aclk.c63
1 files changed, 25 insertions, 38 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c
index 991745491..627edfc91 100644
--- a/src/aclk/aclk.c
+++ b/src/aclk/aclk.c
@@ -52,9 +52,9 @@ time_t aclk_block_until = 0;
#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
-netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
-#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
-#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_id = -1,
@@ -1058,30 +1058,24 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con
static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
{
- struct proto_alert_status status;
- memset(&status, 0, sizeof(status));
- if (get_proto_alert_status(host, &status)) {
- buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc)
return;
- }
+
buffer_sprintf(wb,
"\n\t\tUpdates: %d"
- "\n\t\tPending Min Seq ID: %"PRIu64
- "\n\t\tPending Max Seq ID: %"PRIu64
- "\n\t\tLast Submitted Seq ID: %"PRIu64,
- status.alert_updates,
- status.pending_min_sequence_id,
- status.pending_max_sequence_id,
- status.last_submitted_sequence_id
+ "\n\t\tCheckpoints: %d"
+ "\n\t\tAlert count: %d"
+ "\n\t\tAlert snapshot count: %d",
+ wc->stream_alerts,
+ wc->checkpoint_count,
+ wc->alert_count,
+ wc->snapshot_count
);
}
-#endif /* ENABLE_ACLK */
char *aclk_state(void)
{
-#ifndef ENABLE_ACLK
- return strdupz("ACLK Available: No");
-#else
BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
struct tm *tmptr, tmbuf;
char *ret;
@@ -1163,28 +1157,25 @@ char *aclk_state(void)
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
-#endif /* ENABLE_ACLK */
}
-#ifdef ENABLE_ACLK
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
- struct proto_alert_status status;
- memset(&status, 0, sizeof(status));
- if (get_proto_alert_status(host, &status))
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc)
return;
- json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object *tmp = json_object_new_int(wc->stream_alerts);
json_object_object_add(obj, "updates", tmp);
- tmp = json_object_new_int(status.pending_min_sequence_id);
- json_object_object_add(obj, "pending-min-seq-id", tmp);
+ tmp = json_object_new_int(wc->checkpoint_count);
+ json_object_object_add(obj, "checkpoint-count", tmp);
- tmp = json_object_new_int(status.pending_max_sequence_id);
- json_object_object_add(obj, "pending-max-seq-id", tmp);
+ tmp = json_object_new_int(wc->alert_count);
+ json_object_object_add(obj, "alert-count", tmp);
- tmp = json_object_new_int(status.last_submitted_sequence_id);
- json_object_object_add(obj, "last-submitted-seq-id", tmp);
+ tmp = json_object_new_int(wc->snapshot_count);
+ json_object_object_add(obj, "alert-snapshot-count", tmp);
}
static json_object *timestamp_to_json(const time_t *t)
@@ -1197,13 +1188,9 @@ static json_object *timestamp_to_json(const time_t *t)
}
return NULL;
}
-#endif /* ENABLE_ACLK */
char *aclk_state_json(void)
{
-#ifndef ENABLE_ACLK
- return strdupz("{\"aclk-available\":false}");
-#else
json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
@@ -1313,8 +1300,8 @@ char *aclk_state_json(void)
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
-#endif /* ENABLE_ACLK */
}
+#endif /* ENABLE_ACLK */
void add_aclk_host_labels(void) {
RRDLABELS *labels = localhost->rrdlabels;
@@ -1347,7 +1334,7 @@ void add_aclk_host_labels(void) {
void aclk_queue_node_info(RRDHOST *host, bool immediate)
{
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (likely(wc))
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (wc)
wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
}