diff options
Diffstat (limited to 'src/aclk/aclk.c')
-rw-r--r-- | src/aclk/aclk.c | 63 |
1 files changed, 25 insertions, 38 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c index 991745491..627edfc91 100644 --- a/src/aclk/aclk.c +++ b/src/aclk/aclk.c @@ -52,9 +52,9 @@ time_t aclk_block_until = 0; #ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; -netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; -#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) -#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) +//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_id = -1, @@ -1058,30 +1058,24 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { - struct proto_alert_status status; - memset(&status, 0, sizeof(status)); - if (get_proto_alert_status(host, &status)) { - buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!wc) return; - } + buffer_sprintf(wb, "\n\t\tUpdates: %d" - "\n\t\tPending Min Seq ID: %"PRIu64 - "\n\t\tPending Max Seq ID: %"PRIu64 - "\n\t\tLast Submitted Seq ID: %"PRIu64, - status.alert_updates, - status.pending_min_sequence_id, - status.pending_max_sequence_id, - status.last_submitted_sequence_id + "\n\t\tCheckpoints: %d" + "\n\t\tAlert count: %d" + "\n\t\tAlert snapshot count: %d", + wc->stream_alerts, + wc->checkpoint_count, + wc->alert_count, + wc->snapshot_count ); } -#endif /* ENABLE_ACLK */ char *aclk_state(void) { -#ifndef ENABLE_ACLK - return strdupz("ACLK Available: No"); -#else BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk); struct tm *tmptr, tmbuf; char *ret; @@ -1163,28 +1157,25 @@ char *aclk_state(void) ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; -#endif /* ENABLE_ACLK */ } -#ifdef ENABLE_ACLK static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { - struct proto_alert_status status; - memset(&status, 0, sizeof(status)); - if (get_proto_alert_status(host, &status)) + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!wc) return; - json_object *tmp = json_object_new_int(status.alert_updates); + json_object *tmp = json_object_new_int(wc->stream_alerts); json_object_object_add(obj, "updates", tmp); - tmp = json_object_new_int(status.pending_min_sequence_id); - json_object_object_add(obj, "pending-min-seq-id", tmp); + tmp = json_object_new_int(wc->checkpoint_count); + json_object_object_add(obj, "checkpoint-count", tmp); - tmp = json_object_new_int(status.pending_max_sequence_id); - json_object_object_add(obj, "pending-max-seq-id", tmp); + tmp = json_object_new_int(wc->alert_count); + json_object_object_add(obj, "alert-count", tmp); - tmp = json_object_new_int(status.last_submitted_sequence_id); - json_object_object_add(obj, "last-submitted-seq-id", tmp); + tmp = json_object_new_int(wc->snapshot_count); + json_object_object_add(obj, "alert-snapshot-count", tmp); } static json_object *timestamp_to_json(const time_t *t) @@ -1197,13 +1188,9 @@ static json_object *timestamp_to_json(const time_t *t) } return NULL; } -#endif /* ENABLE_ACLK */ char *aclk_state_json(void) { -#ifndef ENABLE_ACLK - return strdupz("{\"aclk-available\":false}"); -#else json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); @@ -1313,8 +1300,8 @@ char *aclk_state_json(void) char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; -#endif /* ENABLE_ACLK */ } +#endif /* ENABLE_ACLK */ void add_aclk_host_labels(void) { RRDLABELS *labels = localhost->rrdlabels; @@ -1347,7 +1334,7 @@ void add_aclk_host_labels(void) { void aclk_queue_node_info(RRDHOST *host, bool immediate) { - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (likely(wc)) + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (wc) wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec(); } |