From 00151562145df50cc65e9902d52d5fa77f89fe50 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 9 Jun 2022 06:52:47 +0200 Subject: Merging upstream version 1.35.0. Signed-off-by: Daniel Baumann --- database/sqlite/sqlite_aclk_alert.c | 172 +++++++++++++++++++++++++++++++++--- 1 file changed, 158 insertions(+), 14 deletions(-) (limited to 'database/sqlite/sqlite_aclk_alert.c') diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 54e8be4a7..53c6c2a65 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -8,9 +8,120 @@ #include "../../aclk/aclk.h" #endif +time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + int rc = 0; + time_t when = 0; + char sql[ACLK_SYNC_QUERY_SIZE]; + + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \ + "and unique_id > %u and unique_id < %u " \ + "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to find removed gap."); + return 0; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + when = (time_t) sqlite3_column_int64(res, 0); + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc); + + return when; +} + +#define MAX_REMOVED_PERIOD 900 +//decide if some events should be sent or not +int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) +{ + sqlite3_stmt *res = NULL; + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + int send = 1, rc = 0; + + if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) { + return 0; + } + + if (unlikely(uuid_is_null(ae->config_hash_id))) + return 0; + + char sql[ACLK_SYNC_QUERY_SIZE]; + uuid_t config_hash_id; + RRDCALC_STATUS status; + uint32_t unique_id; + + //get the previous sent event of this alarm_id + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ + where hl.unique_id = aa.alert_unique_id \ + and hl.alarm_id = %u and hl.unique_id <> %u \ + order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to filter alert events."); + send = 1; + return send; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); + if (sqlite3_column_type(res, 1) != SQLITE_NULL) + uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1))); + unique_id = (uint32_t) sqlite3_column_int64(res, 2); + + } else { + send = 1; + goto done; + } + + if (ae->new_status != (RRDCALC_STATUS)status) { + send = 1; + goto done; + } + + if (uuid_compare(ae->config_hash_id, config_hash_id)) { + send = 1; + goto done; + } + + //same status, same config + if (ae->new_status == RRDCALC_STATUS_CLEAR) { + send = 0; + goto done; + } + + //detect a long off period of the agent, TODO make global + if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) { + time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str); + + if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) { + send = 1; + goto done; + } else { + send = 0; + goto done; + } + } + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc); + + return send; +} + // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases -int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) +int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) { //check aclk architecture and handle old json alarm update to cloud //include also the valid statuses for this case @@ -30,17 +141,15 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) if (!claimed()) return 0; - if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) + if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) { return 0; + } - if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) - return 0; - - if (unlikely(!host->dbsync_worker)) - return 1; - - if (unlikely(uuid_is_null(ae->config_hash_id))) - return 0; + if (!skip_filter) { + if (!should_send_to_cloud(host, ae)) { + return 0; + } + } int rc = 0; @@ -76,6 +185,10 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) } ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; + if (wc) { + wc->pause_alert_updates = 0; + } bind_fail: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) @@ -86,6 +199,7 @@ bind_fail: #else UNUSED(host); UNUSED(ae); + UNUSED(skip_filter); #endif return 0; } @@ -283,6 +397,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d wc->alerts_batch_id); log_first_sequence_id = 0; log_last_sequence_id = 0; + wc->pause_alert_updates = 1; } rc = sqlite3_finalize(res); @@ -296,6 +411,27 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d return; } +void sql_queue_existing_alerts_to_aclk(RRDHOST *host) +{ + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + BUFFER *sql = buffer_create(1024); + + buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \ + "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \ + "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ + "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str); + + db_execute(buffer_tostring(sql)); + + buffer_free(sql); + + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; + if (wc) { + wc->pause_alert_updates = 0; + } +} + void aclk_send_alarm_health_log(char *node_id) { if (unlikely(!node_id)) @@ -421,6 +557,8 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a freez(claim_id); buffer_free(sql); + + aclk_alert_reloaded = 1; #endif return; @@ -593,6 +731,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } + + if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) + sql_queue_existing_alerts_to_aclk(host); } else wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); @@ -602,6 +743,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; wc->alert_updates = 1; + wc->pause_alert_updates = 0; __sync_synchronize(); } else @@ -631,9 +773,11 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config db_execute(buffer_tostring(sql)); - log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A"); buffer_free(sql); + + wc->pause_alert_updates = 0; #endif return; } @@ -644,6 +788,9 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) if (unlikely(!host->dbsync_worker)) return; + if (!claimed()) + return; + struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS; @@ -912,9 +1059,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) if (!claimed()) return; - if (unlikely(!host->dbsync_worker)) - return; - char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); -- cgit v1.2.3