From a836a244a3d2bdd4da1ee2641e3e957850668cea Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 8 May 2023 18:27:04 +0200 Subject: Adding upstream version 1.39.0. Signed-off-by: Daniel Baumann --- database/sqlite/sqlite_aclk_alert.c | 719 ++++++++++++++++++------------------ 1 file changed, 361 insertions(+), 358 deletions(-) (limited to 'database/sqlite/sqlite_aclk_alert.c') diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index ce284ebc3..62f1df29d 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -5,20 +5,20 @@ #ifdef ENABLE_ACLK #include "../../aclk/aclk_alarm_api.h" -#include "../../aclk/aclk.h" #endif +#define SQL_GET_ALERT_REMOVE_TIME "SELECT when_key FROM health_log_%s WHERE alarm_id = %u " \ + "AND unique_id > %u AND unique_id < %u " \ + "AND new_status = -2;" + time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) { sqlite3_stmt *res = NULL; - int rc = 0; time_t when = 0; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \ - "and unique_id > %u and unique_id < %u " \ - "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_GET_ALERT_REMOVE_TIME, uuid_str, alarm_id, after_unique_id, before_unique_id); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to find removed gap."); return 0; @@ -36,22 +36,26 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return when; } +#define SQL_UPDATE_FILTERED_ALERT "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u" + void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) { char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id); sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; } +#define SQL_SELECT_ALERT_BY_UNIQUE_ID "SELECT hl.unique_id FROM health_log_%s hl, alert_hash ah WHERE hl.unique_id = %u " \ + "AND hl.config_hash_id = ah.hash_id " \ + "AND ah.warn IS NULL AND ah.crit IS NULL;" + static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char *uuid_str) { sqlite3_stmt *res = NULL; int rc = 0; bool ret = false; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.unique_id from health_log_%s hl, alert_hash ah where hl.unique_id = %u " \ - "and hl.config_hash_id = ah.hash_id " \ - "and ah.warn is null and ah.crit is null;", uuid_str, unique_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_UNIQUE_ID, uuid_str, unique_id); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -73,12 +77,18 @@ static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char #define MAX_REMOVED_PERIOD 86400 //decide if some events should be sent or not + +#define SQL_SELECT_ALERT_BY_ID "SELECT hl.new_status, hl.config_hash_id, hl.unique_id FROM health_log_%s hl, aclk_alert_%s aa " \ + "WHERE hl.unique_id = aa.filtered_alert_unique_id " \ + "AND hl.alarm_id = %u " \ + "ORDER BY alarm_event_id DESC LIMIT 1;" + int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - int send = 1, rc = 0; + int send = 1; if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) { return 0; @@ -87,9 +97,6 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) if (unlikely(uuid_is_null(ae->config_hash_id))) return 0; - if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) - return 0; - char sql[ACLK_SYNC_QUERY_SIZE]; uuid_t config_hash_id; RRDCALC_STATUS status; @@ -97,12 +104,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) //get the previous sent event of this alarm_id //base the search on the last filtered event - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ - where hl.unique_id = aa.filtered_alert_unique_id \ - and hl.alarm_id = %u \ - order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_ID, uuid_str, uuid_str, ae->alarm_id); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to filter alert events."); send = 1; @@ -126,7 +130,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) goto done; } - if (uuid_compare(ae->config_hash_id, config_hash_id)) { + if (uuid_memcmp(&ae->config_hash_id, &config_hash_id)) { send = 1; goto done; } @@ -162,6 +166,10 @@ done: // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases + +#define SQL_QUEUE_ALERT_TO_CLOUD "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) ON CONFLICT (alert_unique_id) do nothing;" + int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) { if(!service_running(SERVICE_ACLK)) @@ -182,27 +190,24 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } } - int rc = 0; - sqlite3_stmt *res_alert = NULL; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + return 0; + + sqlite3_stmt *res_alert = NULL; + char sql[ACLK_SYNC_QUERY_SIZE]; - buffer_sprintf( - sql, - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " - "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ", - uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to prepare statement to store alert event"); - buffer_free(sql); return 1; } - rc = sqlite3_bind_int(res_alert, 1, ae->unique_id); + rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id); if (unlikely(rc != SQLITE_OK)) goto bind_fail; @@ -213,16 +218,12 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; - if (wc) { - wc->pause_alert_updates = 0; - } + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); bind_fail: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) error_report("Failed to reset statement in store alert event, rc = %d", rc); - buffer_free(sql); return 0; } @@ -254,11 +255,10 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) #endif } -void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_push_alert_event(struct aclk_sync_host_config *wc) { #ifndef ENABLE_ACLK UNUSED(wc); - UNUSED(cmd); #else int rc; @@ -278,26 +278,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - if (wc->alerts_start_seq_id != 0) { - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64 - "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_cloud_ack is null " - "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_submitted is null", - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id); - db_execute(buffer_tostring(sql)); - buffer_reset(sql); - wc->alerts_start_seq_id = 0; - } - - int limit = cmd.count > 0 ? cmd.count : 1; + int limit = ACLK_MAX_ALERT_UPDATES; sqlite3_stmt *res = NULL; @@ -318,10 +299,15 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str); - db_execute(buffer_tostring(sql_fix)); - buffer_flush(sql_fix); - buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); - db_execute(buffer_tostring(sql_fix)); + rc = db_execute(db_meta, buffer_tostring(sql_fix)); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); + else { + buffer_flush(sql_fix); + buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); + if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix)))) + error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); + } buffer_free(sql_fix); // Try again @@ -353,8 +339,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11)); alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL; - alarm_log.batch_id = wc->alerts_batch_id; - alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + //alarm_log.batch_id = wc->alerts_batch_id; + //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); alarm_log.when = (time_t) sqlite3_column_int64(res, 5); uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str); @@ -429,19 +415,23 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", wc->uuid_str, first_sequence_id, last_sequence_id); - db_execute(buffer_tostring(sql)); + + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); + + // Mark to do one more check + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + } else { if (log_first_sequence_id) log_access( - "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, + "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", log_first_sequence_id, - log_last_sequence_id, - wc->alerts_batch_id); + log_last_sequence_id); log_first_sequence_id = 0; log_last_sequence_id = 0; - wc->pause_alert_updates = 1; } rc = sqlite3_finalize(res); @@ -451,8 +441,24 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d freez(claim_id); buffer_free(sql); #endif +} + +void aclk_push_alert_events_for_all_hosts(void) +{ + RRDHOST *host; + + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) + continue; - return; + internal_error(true, "ACLK SYNC: Scanning host %s", rrdhost_hostname(host)); + rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (likely(wc)) + aclk_push_alert_event(wc); + } + dfe_done(host); } void sql_queue_existing_alerts_to_aclk(RRDHOST *host) @@ -467,137 +473,15 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host) "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); - - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; - if (wc) { - wc->pause_alert_updates = 0; - } -} - -void aclk_send_alarm_health_log(char *node_id) -{ - if (unlikely(!node_id)) - return; - - struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); - - if (likely(!wc)) { - RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - } - - if (!wc) { - log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id); - return; - } - - log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A"); - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG; - - aclk_database_enq_cmd(wc, &cmd); - return; -} - -void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); -#ifndef ENABLE_ACLK - UNUSED(wc); -#else - int rc; - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - RRDHOST *host = wc->host; - if (unlikely(!host)) { - host = find_host_by_node_id(wc->node_id); - - if (unlikely(!host)) { - log_access( - "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.", - wc->node_id, - wc->host_guid); - freez(claim_id); - return; - } - } - - uint64_t first_sequence = 0; - uint64_t last_sequence = 0; - struct timeval first_timestamp; - struct timeval last_timestamp; - - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - - sqlite3_stmt *res = NULL; - - //TODO: make this better: include info from health log too - buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \ - "FROM aclk_alert_%s;", wc->uuid_str); - - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to get health log statistics from the database"); - buffer_free(sql); - freez(claim_id); - return; - } - - first_timestamp.tv_sec = 0; - first_timestamp.tv_usec = 0; - last_timestamp.tv_sec = 0; - last_timestamp.tv_usec = 0; - - while (sqlite3_step_monitored(res) == SQLITE_ROW) { - first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; - if (sqlite3_column_bytes(res, 1) > 0) { - first_timestamp.tv_sec = sqlite3_column_int64(res, 1); - } - - last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; - if (sqlite3_column_bytes(res, 3) > 0) { - last_timestamp.tv_sec = sqlite3_column_int64(res, 3); - } - } - - struct alarm_log_entries log_entries; - log_entries.first_seq_id = first_sequence; - log_entries.first_when = first_timestamp; - log_entries.last_seq_id = last_sequence; - log_entries.last_when = last_timestamp; - - struct alarm_log_health alarm_log; - alarm_log.claim_id = claim_id; - alarm_log.node_id = wc->node_id; - alarm_log.log_entries = log_entries; - alarm_log.status = wc->alert_updates == 0 ? 2 : 1; - alarm_log.enabled = (int)host->health.health_enabled; - - wc->alert_sequence_id = last_sequence; + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - aclk_send_alarm_log_health(&alarm_log); - log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence); + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host)); - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - freez(claim_id); buffer_free(sql); - - aclk_alert_reloaded = 1; -#endif - - return; + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } void aclk_send_alarm_configuration(char *config_hash) @@ -605,22 +489,14 @@ void aclk_send_alarm_configuration(char *config_hash) if (unlikely(!config_hash)) return; - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker; + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config; - if (unlikely(!wc)) { + if (unlikely(!wc)) return; - } log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG; - cmd.data_param = (void *) strdupz(config_hash); - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - - return; + aclk_push_alert_config(wc->node_id, config_hash); } #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ @@ -628,19 +504,31 @@ void aclk_send_alarm_configuration(char *config_hash) "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;" -int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { - UNUSED(wc); -#ifndef ENABLE_ACLK - UNUSED(cmd); -#else int rc = 0; +#ifdef ENABLE_ACLK + CHECK_SQLITE_CONNECTION(db_meta); sqlite3_stmt *res = NULL; - char *config_hash = (char *) cmd.data_param; + struct aclk_sync_host_config *wc = NULL; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) { + freez(config_hash); + freez(node_id); + return 1; + } + + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + freez(config_hash); + freez(node_id); + return 1; + } rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0); if (rc != SQLITE_OK) { @@ -723,7 +611,6 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct if (likely(p_alarm_config.cfg_hash)) { log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); - freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } @@ -735,150 +622,125 @@ bind_fail: if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc); - return rc; + freez(config_hash); + freez(node_id); #endif - return 0; + return rc; } // Start streaming alerts -void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id) +void aclk_start_alert_streaming(char *node_id, bool resets) { if (unlikely(!node_id)) return; - //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id); - uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) return; - struct aclk_database_worker_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) { - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - if (unlikely(!host->health.health_enabled)) { - log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); - return; - } + if (unlikely(!host)) + return; - if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) - sql_queue_existing_alerts_to_aclk(host); - } else - wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - - if (likely(wc)) { - log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id); - __sync_synchronize(); - wc->alerts_batch_id = batch_id; - wc->alerts_start_seq_id = start_seq_id; - wc->alert_updates = 1; - wc->pause_alert_updates = 0; - __sync_synchronize(); + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + + if (unlikely(!wc)) + return; + + if (unlikely(!host->health.health_enabled)) { + log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + return; } - else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); - return; + if (resets) { + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + sql_queue_existing_alerts_to_aclk(host); + } else + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + + wc->alert_updates = 1; + wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } -void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); +#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \ + "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \ + "(SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ + "ORDER BY unique_id ASC " \ + "ON CONFLICT (alert_unique_id) DO NOTHING;" - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); +void sql_process_queue_removed_alerts_to_aclk(char *node_id) +{ + struct aclk_sync_host_config *wc; + RRDHOST *host = find_host_by_node_id(node_id); + freez(node_id); - buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ - "where new_status = -2 and updated_by_id = 0 and unique_id not in " \ - "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \ - "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + if (unlikely(!host || !(wc = host->aclk_sync_host_config))) + return; - db_execute(buffer_tostring(sql)); + char sql[ACLK_SYNC_QUERY_SIZE * 2]; - log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str, wc->uuid_str); - buffer_free(sql); + if (unlikely(db_execute(db_meta, sql))) { + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS FAILED", wc->node_id, rrdhost_hostname(wc->host)); + error_report("Failed to queue ACLK alert removed entries for host %s", rrdhost_hostname(wc->host)); + } + else + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - wc->pause_alert_updates = 0; - return; + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) { - if (unlikely(!host->dbsync_worker)) + if (unlikely(!host->aclk_sync_host_config)) return; - if (!claimed()) + if (!claimed() || !host->node_id) return; - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS; - cmd.data = NULL; - cmd.data_param = NULL; - cmd.completion = NULL; - aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd); + char node_id[UUID_STR_LEN]; + uuid_unparse_lower(*host->node_id, node_id); + + aclk_push_node_removed_alerts(node_id); } -void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id) +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { - UNUSED(claim_id); - if (unlikely(!node_id)) - return; - uuid_t node_uuid; - if (uuid_parse(node_id, node_uuid)) + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; - struct aclk_database_worker_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - - if (likely(wc)) { - log_access( - "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A", - snapshot_id, - sequence_id); - if (wc->alerts_snapshot_id == snapshot_id) - return; - __sync_synchronize(); - wc->alerts_snapshot_id = snapshot_id; - wc->alerts_ack_sequence_id = sequence_id; - __sync_synchronize(); - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT; - cmd.data_param = NULL; - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - } else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); - - return; -} + if (unlikely(!host)) { + log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + return; + } -void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id) -{ - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (alerts_ack_sequence_id != 0) { - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "", - uuid_str, - alerts_ack_sequence_id); - db_execute(buffer_tostring(sql)); + if (unlikely(!wc)) { + log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + return; } - buffer_free(sql); + log_access( + "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", + node_id, + wc->host ? rrdhost_hostname(wc->host) : "N/A", + snapshot_uuid); + if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) + return; + __sync_synchronize(); + wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); + __sync_synchronize(); + + aclk_push_node_alert_snapshot(node_id); } #ifdef ENABLE_ACLK @@ -949,37 +811,41 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) #endif #define ALARM_EVENTS_PER_CHUNK 10 -void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) { -#ifndef ENABLE_ACLK - UNUSED(wc); - UNUSED(cmd); -#else - UNUSED(cmd); - // we perhaps we don't need this for snapshots - if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); +#ifdef ENABLE_ACLK + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) { + log_access("AC [%s (N/A)]: Node id not found", node_id); + freez(node_id); return; } + freez(node_id); - if (unlikely(!wc->host)) { - error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + + // we perhaps we don't need this for snapshots + if (unlikely(!wc->alert_updates)) { + log_access( + "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", + wc->node_id, + wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } - if (unlikely(!wc->alerts_snapshot_id)) + if (unlikely(!wc->alerts_snapshot_uuid)) return; char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; - log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id); + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); - aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); - - RRDHOST *host = wc->host; uint32_t cnt = 0; + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -995,6 +861,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) continue; + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + continue; + cnt++; } @@ -1008,7 +877,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1024,6 +893,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) continue; + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + continue; + cnt++; struct alarm_log_entry alarm_log; @@ -1047,7 +919,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1061,73 +933,204 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru } netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - wc->alerts_snapshot_id = 0; + wc->alerts_snapshot_uuid = NULL; freez(claim_id); #endif - return; } +#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE filtered_alert_unique_id NOT IN (SELECT unique_id FROM health_log_%s);" + void sql_aclk_alert_clean_dead_entries(RRDHOST *host) { if (!claimed()) return; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + char sql[512]; + snprintfz(sql,511,SQL_DELETE_ALERT_ENTRIES, uuid_str, uuid_str); - buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in " - " (select unique_id from health_log_%s); ", uuid_str, uuid_str); - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg); if (rc != SQLITE_OK) { - error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""", - uuid_str, err_msg); + error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg); sqlite3_free(err_msg); } - buffer_free(sql); } +#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ + "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ + "FROM aclk_alert_%s WHERE date_submitted IS NULL;" + int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) { int rc; - struct aclk_database_worker_config *wc = NULL; - wc = (struct aclk_database_worker_config *)host->dbsync_worker; + struct aclk_sync_host_config *wc = NULL; + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; if (!wc) return 1; proto_alert_status->alert_updates = wc->alert_updates; - proto_alert_status->alerts_batch_id = wc->alerts_batch_id; - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + char sql[ACLK_SYNC_QUERY_SIZE]; sqlite3_stmt *res = NULL; - buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \ - "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \ - "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \ - "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to get alert log status from the database."); - buffer_free(sql); return 1; } while (sqlite3_step_monitored(res) == SQLITE_ROW) { proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; - proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0; + proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; } rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc); - buffer_free(sql); return 0; } + +void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) +{ + if (unlikely(!node_id)) + return; + + struct aclk_sync_host_config *wc = NULL; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) + return; + + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); + return; + } + + log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); + + wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; +} + +typedef struct active_alerts { + char *name; + char *chart; + RRDCALC_STATUS status; +} active_alerts_t; + +static inline int compare_active_alerts(const void * a, const void * b) { + active_alerts_t *active_alerts_a = (active_alerts_t *)a; + active_alerts_t *active_alerts_b = (active_alerts_t *)b; + + if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) ) + { + return strcmp(active_alerts_a->chart, active_alerts_b->chart); + } + else + return strcmp(active_alerts_a->name, active_alerts_b->name); +} + +void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) +{ +#ifdef ENABLE_ACLK + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + return; + } + + //TODO: make sure all pending events are sent. + if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { + //postpone checkpoint send + wc->alert_checkpoint_req++; + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + return; + } + + //TODO: lock rc here, or make sure it's called when health decides + //count them + RRDCALC *rc; + uint32_t cnt = 0; + size_t len = 0; + active_alerts_t *active_alerts = NULL; + + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + + if (cnt) { + active_alerts = callocz(cnt, sizeof(active_alerts_t)); + cnt = 0; + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + active_alerts[cnt].name = (char *)rrdcalc_name(rc); + len += string_strlen(rc->name); + active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc); + len += string_strlen(rc->chart); + active_alerts[cnt].status = rc->status; + len++; + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + } + + BUFFER *alarms_to_hash; + if (cnt) { + qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); + + alarms_to_hash = buffer_create(len, NULL); + for (uint32_t i=0;inode_id; + alarm_checkpoint.checksum = (char *)hash; + + aclk_send_provide_alarm_checkpoint(&alarm_checkpoint); + log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); + } else { + log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); + } + wc->alert_checkpoint_req = 0; + buffer_free(alarms_to_hash); +#endif +} -- cgit v1.2.3