summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_alert.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r--database/sqlite/sqlite_aclk_alert.c172
1 files changed, 158 insertions, 14 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 54e8be4a..53c6c2a6 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -8,9 +8,120 @@
#include "../../aclk/aclk.h"
#endif
+time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
+ sqlite3_stmt *res = NULL;
+ int rc = 0;
+ time_t when = 0;
+ char sql[ACLK_SYNC_QUERY_SIZE];
+
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \
+ "and unique_id > %u and unique_id < %u " \
+ "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id);
+
+ rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to find removed gap.");
+ return 0;
+ }
+
+ rc = sqlite3_step(res);
+ if (likely(rc == SQLITE_ROW)) {
+ when = (time_t) sqlite3_column_int64(res, 0);
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc);
+
+ return when;
+}
+
+#define MAX_REMOVED_PERIOD 900
+//decide if some events should be sent or not
+int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
+{
+ sqlite3_stmt *res = NULL;
+ char uuid_str[GUID_LEN + 1];
+ uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
+ int send = 1, rc = 0;
+
+ if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
+ return 0;
+ }
+
+ if (unlikely(uuid_is_null(ae->config_hash_id)))
+ return 0;
+
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ uuid_t config_hash_id;
+ RRDCALC_STATUS status;
+ uint32_t unique_id;
+
+ //get the previous sent event of this alarm_id
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
+ where hl.unique_id = aa.alert_unique_id \
+ and hl.alarm_id = %u and hl.unique_id <> %u \
+ order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id);
+
+ rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to filter alert events.");
+ send = 1;
+ return send;
+ }
+
+ rc = sqlite3_step(res);
+ if (likely(rc == SQLITE_ROW)) {
+ status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
+ if (sqlite3_column_type(res, 1) != SQLITE_NULL)
+ uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
+ unique_id = (uint32_t) sqlite3_column_int64(res, 2);
+
+ } else {
+ send = 1;
+ goto done;
+ }
+
+ if (ae->new_status != (RRDCALC_STATUS)status) {
+ send = 1;
+ goto done;
+ }
+
+ if (uuid_compare(ae->config_hash_id, config_hash_id)) {
+ send = 1;
+ goto done;
+ }
+
+ //same status, same config
+ if (ae->new_status == RRDCALC_STATUS_CLEAR) {
+ send = 0;
+ goto done;
+ }
+
+ //detect a long off period of the agent, TODO make global
+ if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) {
+ time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str);
+
+ if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) {
+ send = 1;
+ goto done;
+ } else {
+ send = 0;
+ goto done;
+ }
+ }
+
+done:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
+
+ return send;
+}
+
// will replace call to aclk_update_alarm in health/health_log.c
// and handle both cases
-int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
+int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
{
//check aclk architecture and handle old json alarm update to cloud
//include also the valid statuses for this case
@@ -30,17 +141,15 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
if (!claimed())
return 0;
- if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
+ if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
return 0;
+ }
- if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
- return 0;
-
- if (unlikely(!host->dbsync_worker))
- return 1;
-
- if (unlikely(uuid_is_null(ae->config_hash_id)))
- return 0;
+ if (!skip_filter) {
+ if (!should_send_to_cloud(host, ae)) {
+ return 0;
+ }
+ }
int rc = 0;
@@ -76,6 +185,10 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
}
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (wc) {
+ wc->pause_alert_updates = 0;
+ }
bind_fail:
if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
@@ -86,6 +199,7 @@ bind_fail:
#else
UNUSED(host);
UNUSED(ae);
+ UNUSED(skip_filter);
#endif
return 0;
}
@@ -283,6 +397,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
wc->alerts_batch_id);
log_first_sequence_id = 0;
log_last_sequence_id = 0;
+ wc->pause_alert_updates = 1;
}
rc = sqlite3_finalize(res);
@@ -296,6 +411,27 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
return;
}
+void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
+{
+ char uuid_str[GUID_LEN + 1];
+ uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
+ BUFFER *sql = buffer_create(1024);
+
+ buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
+ "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \
+ "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
+ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str);
+
+ db_execute(buffer_tostring(sql));
+
+ buffer_free(sql);
+
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (wc) {
+ wc->pause_alert_updates = 0;
+ }
+}
+
void aclk_send_alarm_health_log(char *node_id)
{
if (unlikely(!node_id))
@@ -421,6 +557,8 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
freez(claim_id);
buffer_free(sql);
+
+ aclk_alert_reloaded = 1;
#endif
return;
@@ -593,6 +731,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
return;
}
+
+ if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1))
+ sql_queue_existing_alerts_to_aclk(host);
} else
wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
@@ -602,6 +743,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
wc->alert_updates = 1;
+ wc->pause_alert_updates = 0;
__sync_synchronize();
}
else
@@ -631,9 +773,11 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
db_execute(buffer_tostring(sql));
- log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_free(sql);
+
+ wc->pause_alert_updates = 0;
#endif
return;
}
@@ -644,6 +788,9 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
if (unlikely(!host->dbsync_worker))
return;
+ if (!claimed())
+ return;
+
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
@@ -912,9 +1059,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
if (!claimed())
return;
- if (unlikely(!host->dbsync_worker))
- return;
-
char uuid_str[GUID_LEN + 1];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);