diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 452 |
1 files changed, 214 insertions, 238 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 60bf5dbdc..9bd060f96 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -13,16 +13,42 @@ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \ }) - #define SQL_UPDATE_FILTERED_ALERT \ - "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u" + "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \ + "WHERE filtered_alert_unique_id = @old_alert" -static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) +static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id); - sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); - ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to update_filtered"); + return; + } + + rc = sqlite3_bind_int64(res, 1, ae->unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind ae unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_bind_int64(res, 2, unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_DONE)) + ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc); } #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \ @@ -30,35 +56,35 @@ static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \ "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL" -static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id) +static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id) { sqlite3_stmt *res = NULL; - int rc = 0; - bool ret = false; - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to check for alert variables."); return false; } - rc = sqlite3_bind_int(res, 1, (int) unique_id); + bool ret = false; + + rc = sqlite3_bind_int64(res, 1, unique_id); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind unique_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) ret = true; -fail: +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc); @@ -71,32 +97,25 @@ fail: //decide if some events should be sent or not #define SQL_SELECT_ALERT_BY_ID \ "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND +hld.unique_id = aa.filtered_alert_unique_id " \ + "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \ "AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \ - "ORDER BY hld.rowid DESC LIMIT 1;" + "ORDER BY hld.rowid DESC LIMIT 1" static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - - bool send = false; if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) return 0; - if (unlikely(uuid_is_null(ae->config_hash_id))) + if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config)) return 0; char sql[ACLK_SYNC_QUERY_SIZE]; - uuid_t config_hash_id; - RRDCALC_STATUS status; - uint32_t unique_id; //get the previous sent event of this alarm_id //base the search on the last filtered event - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -104,6 +123,8 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) return true; } + bool send = false; + rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking should_send_to_cloud"); @@ -119,17 +140,18 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { - status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); + uuid_t config_hash_id; + RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); if (sqlite3_column_type(res, 1) != SQLITE_NULL) uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1))); - unique_id = (uint32_t)sqlite3_column_int64(res, 2); + int64_t unique_id = sqlite3_column_int64(res, 2); if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id)) send = true; else - update_filtered(ae, unique_id, uuid_str); + update_filtered(ae, unique_id, host->aclk_config->uuid_str); } else send = true; @@ -143,13 +165,12 @@ done: #define SQL_QUEUE_ALERT_TO_CLOUD \ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING;" + "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING" void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) { sqlite3_stmt *res_alert = NULL; char sql[ACLK_SYNC_QUERY_SIZE]; - char uuid_str[UUID_STR_LEN]; if (!service_running(SERVICE_ACLK)) return; @@ -163,8 +184,7 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) return; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0); if (unlikely(rc != SQLITE_OK)) { @@ -172,18 +192,18 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) return; } - rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id); + rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id); if (unlikely(rc != SQLITE_OK)) - goto bind_fail; + goto done; rc = execute_insert(res_alert); if (unlikely(rc == SQLITE_DONE)) { ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } else - error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc); + error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc); -bind_fail: +done: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) error_report("Failed to reset statement in store alert event, rc = %d", rc); } @@ -239,15 +259,13 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) { } -void aclk_push_alert_event(struct aclk_sync_host_config *wc) +static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused) { -#ifndef ENABLE_ACLK - UNUSED(wc); -#else +#ifdef ENABLE_ACLK int rc; if (unlikely(!wc->alert_updates)) { - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); @@ -265,23 +283,20 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - int limit = ACLK_MAX_ALERT_UPDATES; - sqlite3_stmt *res = NULL; buffer_sprintf( sql, - "select aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " + "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " " hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, " " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " " hld.alarm_event_id, hl.chart_name, hld.summary " - " from health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " - " where hld.unique_id = aa.alert_unique_id and hl.config_hash_id = ha.hash_id and aa.date_submitted is null " - " and hl.host_id = @host_id and hl.health_log_id = hld.health_log_id " - " order by aa.sequence_id asc limit %d;", - wc->uuid_str, - limit); + " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " + " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL " + " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " + " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES, + wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { @@ -292,13 +307,6 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) rc = db_execute(db_meta, buffer_tostring(sql_fix)); if (unlikely(rc)) error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); - - else { - buffer_flush(sql_fix); - buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix)))) - error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); - } buffer_free(sql_fix); // Try again @@ -315,10 +323,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for pushing alert event."); - sqlite3_finalize(res); - buffer_free(sql); - freez(claim_id); - return; + goto done; } uint64_t first_sequence_id = 0; @@ -395,9 +400,13 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) if (first_sequence_id) { buffer_flush(sql); - buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " - "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", - wc->uuid_str, first_sequence_id, last_sequence_id); + buffer_sprintf( + sql, + "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " + "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64, + wc->uuid_str, + first_sequence_id, + last_sequence_id); if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); @@ -407,7 +416,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) } else { if (wc->alerts_log_first_sequence_id) - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", @@ -417,6 +426,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) wc->alerts_log_last_sequence_id = 0; } +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc); @@ -437,7 +447,7 @@ void aclk_push_alert_events_for_all_hosts(void) rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (likely(wc)) aclk_push_alert_event(wc); } @@ -446,59 +456,54 @@ void aclk_push_alert_events_for_all_hosts(void) void sql_queue_existing_alerts_to_aclk(RRDHOST *host) { - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); sqlite3_stmt *res = NULL; int rc; + struct aclk_sync_cfg_t *wc = host->aclk_config; + + BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + rw_spinlock_write_lock(&host->health_log.spinlock); - buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) { - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; - } + buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str); + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + goto skip; buffer_flush(sql); + buffer_sprintf( sql, - "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " - "select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl " - "where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null " - "and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;", - uuid_str); + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " + "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl " + "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL " + "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING", + wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to queue existing alerts."); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto skip; } rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue existing alerts."); - sqlite3_finalize(res); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto done; } rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to queue existing alerts, rc = %d", rc); - + else + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc); +skip: rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } void aclk_send_alarm_configuration(char *config_hash) @@ -506,12 +511,12 @@ void aclk_send_alarm_configuration(char *config_hash) if (unlikely(!config_hash)) return; - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = localhost->aclk_config; if (unlikely(!wc)) return; - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", @@ -524,43 +529,33 @@ void aclk_send_alarm_configuration(char *config_hash) "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ "module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \ "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ - "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id;" + "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id" -int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) +void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { - int rc = 0; - #ifdef ENABLE_ACLK - - CHECK_SQLITE_CONNECTION(db_meta); + int rc; sqlite3_stmt *res = NULL; + struct aclk_sync_cfg_t *wc; - struct aclk_sync_host_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { + if (unlikely(!host || !(wc = host->aclk_config))) { freez(config_hash); freez(node_id); - return 1; - } - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { - freez(config_hash); - freez(node_id); - return 1; + return; } rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to fetch an alarm hash configuration"); - return 1; + return; } uuid_t hash_uuid; if (uuid_parse(config_hash, hash_uuid)) - return 1; + return; rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) @@ -632,13 +627,13 @@ int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash } if (likely(p_alarm_config.cfg_hash)) { - netdata_log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else - netdata_log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); bind_fail: rc = sqlite3_finalize(res); @@ -648,7 +643,6 @@ bind_fail: freez(config_hash); freez(node_id); #endif - return rc; } @@ -660,51 +654,50 @@ void aclk_start_alert_streaming(char *node_id, bool resets) if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc; - if (unlikely(!wc)) + RRDHOST *host = find_host_by_node_id(node_id); + if (unlikely(!host || !(wc = host->aclk_config))) return; if (unlikely(!host->health.health_enabled)) { - netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } if (resets) { - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); sql_queue_existing_alerts_to_aclk(host); } else - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); wc->alert_updates = 1; wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } -#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ +#define SQL_QUEUE_REMOVE_ALERTS \ + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ - "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ - "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ - "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \ - "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;" + "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ + "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \ + "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \ + "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \ + "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING" + void sql_process_queue_removed_alerts_to_aclk(char *node_id) { - struct aclk_sync_host_config *wc; + struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); freez(node_id); - if (unlikely(!host || !(wc = host->aclk_sync_host_config))) + if (unlikely(!host || !(wc = host->aclk_config))) return; char sql[ACLK_SYNC_QUERY_SIZE * 2]; sqlite3_stmt *res = NULL; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -715,33 +708,25 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id) rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue remvoed alerts."); - sqlite3_finalize(res); - return; + goto skip; } rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) { - sqlite3_finalize(res); - error_report("Failed to queue removed alerts, rc = %d", rc); - return; + if (likely(rc == SQLITE_DONE)) { + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } +skip: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc); - - netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - - rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) { - if (unlikely(!host->aclk_sync_host_config)) - return; - - if (!claimed() || !host->node_id) + if (unlikely(!host->aclk_config || !claimed() || !host->node_id)) return; char node_id[UUID_STR_LEN]; @@ -753,32 +738,28 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { uuid_t node_uuid; + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; + struct aclk_sync_cfg_t *wc; + RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { - netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + if (unlikely(!host || !(wc = host->aclk_config))) { + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); return; } - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - - if (unlikely(!wc)) { - netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); - return; - } - - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", snapshot_uuid); + if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) return; - __sync_synchronize(); + wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); - __sync_synchronize(); aclk_push_node_alert_snapshot(node_id); } @@ -795,9 +776,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->chart = strdupz(ae_chart_id(ae)); alarm_log->name = strdupz(ae_name(ae)); - alarm_log->batch_id = 0; - alarm_log->sequence_id = 0; - alarm_log->when = (time_t)ae->when; + alarm_log->when = ae->when; alarm_log->config_hash = strdupz((char *)config_hash_id); @@ -812,7 +791,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->non_clear_duration = (time_t)ae->non_clear_duration; alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status); alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status); - alarm_log->delay = (int)ae->delay; + alarm_log->delay = ae->delay; alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp; alarm_log->last_repeat = (time_t)ae->last_repeat; @@ -842,18 +821,18 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN #endif #ifdef ENABLE_ACLK -static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) +static bool have_recent_alarm(RRDHOST *host, int64_t alarm_id, int64_t mark) { ALARM_ENTRY *ae = host->health_log.alarms; while (ae) { if (ae->alarm_id == alarm_id && ae->unique_id >mark && (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) - return 1; + return true; ae = ae->next; } - return 0; + return false; } #endif @@ -864,17 +843,17 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) RRDHOST *host = find_host_by_node_id(node_id); if (unlikely(!host)) { - netdata_log_access("AC [%s (N/A)]: Node id not found", node_id); + nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id); freez(node_id); return; } freez(node_id); - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); @@ -888,11 +867,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) if (unlikely(!claim_id)) return; - netdata_log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); uint32_t cnt = 0; - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); rw_spinlock_read_lock(&host->health_log.spinlock); @@ -915,7 +892,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) } if (cnt) { - uint32_t chunk = 1, chunks = 0; + uint32_t chunks; chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); ae = host->health_log.alarms; @@ -926,15 +903,12 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) alarm_snap.claim_id = claim_id; alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; - alarm_snap.chunk = chunk; + alarm_snap.chunk = 1; alarm_snapshot_proto_ptr_t snapshot_proto = NULL; for (; ae; ae = ae->next) { - if (likely(ae->updated_by_id)) - continue; - - if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) + if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) continue; if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) @@ -957,19 +931,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) if (cnt == ALARM_EVENTS_PER_CHUNK) { aclk_send_alarm_snapshot(snapshot_proto); - cnt = 0; - - if (chunk < chunks) { - chunk++; - - struct alarm_snapshot alarm_snap; - alarm_snap.node_id = wc->node_id; - alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; - alarm_snap.chunks = chunks; - alarm_snap.chunk = chunk; - + if (alarm_snap.chunk < chunks) { + alarm_snap.chunk++; snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); } } @@ -986,51 +950,70 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) #endif } -#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created + %d < UNIXEPOCH();" +#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period" + void sql_aclk_alert_clean_dead_entries(RRDHOST *host) { - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (unlikely(!wc)) + return; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_DELETE_ALERT_ENTRIES, uuid_str, MAX_REMOVED_PERIOD); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str); - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg); + sqlite3_stmt *res = NULL; + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { - error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg); - sqlite3_free(err_msg); + error_report("Failed to prepare statement for cleaning stale ACLK alert entries."); + return; } + + rc = sqlite3_bind_int64(res, 1, MAX_REMOVED_PERIOD); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind MAX_REMOVED_PERIOD parameter."); + goto skip; + } + + rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries."); + +skip: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement for cleaning stale ACLK alert entries."); } #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ - "FROM aclk_alert_%s WHERE date_submitted IS NULL;" + "FROM aclk_alert_%s WHERE date_submitted IS NULL" int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) { - int rc; - struct aclk_sync_host_config *wc = NULL; - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + + struct aclk_sync_cfg_t *wc = host->aclk_config; if (!wc) return 1; proto_alert_status->alert_updates = wc->alert_updates; char sql[ACLK_SYNC_QUERY_SIZE]; - sqlite3_stmt *res = NULL; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); + sqlite3_stmt *res = NULL; + snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to get alert log status from the database."); return 1; } while (sqlite3_step_monitored(res) == SQLITE_ROW) { - proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; - proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; + proto_alert_status->pending_min_sequence_id = + sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0; + proto_alert_status->pending_max_sequence_id = + sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0; + proto_alert_status->last_submitted_sequence_id = + sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0; } rc = sqlite3_finalize(res); @@ -1045,21 +1028,15 @@ void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) if (unlikely(!node_id)) return; - struct aclk_sync_host_config *wc = NULL; + struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) - return; - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); - return; + if (unlikely(!host || !(wc = host->aclk_config))) + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); + else { + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); + wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; } - - netdata_log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); - - wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; } typedef struct active_alerts { @@ -1068,15 +1045,14 @@ typedef struct active_alerts { RRDCALC_STATUS status; } active_alerts_t; -static inline int compare_active_alerts(const void * a, const void * b) { +static inline int compare_active_alerts(const void *a, const void *b) +{ active_alerts_t *active_alerts_a = (active_alerts_t *)a; active_alerts_t *active_alerts_b = (active_alerts_t *)b; - if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) ) - { - return strcmp(active_alerts_a->chart, active_alerts_b->chart); - } - else + if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) { + return strcmp(active_alerts_a->chart, active_alerts_b->chart); + } else return strcmp(active_alerts_a->name, active_alerts_b->name); } @@ -1084,16 +1060,16 @@ static inline int compare_active_alerts(const void * a, const void * b) { void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) { #ifdef ENABLE_ACLK - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!wc)) { - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); return; } if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { //postpone checkpoint send - wc->alert_checkpoint_req+=3; - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + wc->alert_checkpoint_req += 3; + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); return; } @@ -1126,16 +1102,16 @@ void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) BUFFER *alarms_to_hash; if (cnt) { - qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); + qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); alarms_to_hash = buffer_create(len, NULL); - for (uint32_t i=0;i<cnt;i++) { + for (uint32_t i = 0; i < cnt; i++) { buffer_strcat(alarms_to_hash, active_alerts[i].name); buffer_strcat(alarms_to_hash, active_alerts[i].chart); if (active_alerts[i].status == RRDCALC_STATUS_WARNING) - buffer_strcat(alarms_to_hash, "W"); + buffer_fast_strcat(alarms_to_hash, "W", 1); else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL) - buffer_strcat(alarms_to_hash, "C"); + buffer_fast_strcat(alarms_to_hash, "C", 1); } } else { alarms_to_hash = buffer_create(1, NULL); @@ -1156,10 +1132,10 @@ void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) aclk_send_provide_alarm_checkpoint(&alarm_checkpoint); freez(claim_id); - netdata_log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); - } else { - netdata_log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); - } + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); + } else + nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); + wc->alert_checkpoint_req = 0; buffer_free(alarms_to_hash); #endif |