// SPDX-License-Identifier: GPL-3.0-or-later #include "sqlite_functions.h" #include "sqlite_aclk_alert.h" #ifdef ENABLE_ACLK #include "../../aclk/aclk_alarm_api.h" #define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \ ({ \ int _param = (param); \ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \ }) #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \ "SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \ "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \ "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL" static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uuid_t *host_id) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res)) return false; bool ret = false; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC)); param = 0; ret = (sqlite3_step_monitored(res) == SQLITE_ROW); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return ret; } #define MAX_REMOVED_PERIOD 604800 //a week #define SQL_UPDATE_ALERT_VERSION_TRANSITION \ "UPDATE alert_version SET unique_id = @unique_id WHERE health_log_id = @health_log_id" static void update_alert_version_transition(int64_t health_log_id, int64_t unique_id) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION_TRANSITION, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); param = 0; int rc = sqlite3_step_monitored(res); if (rc != SQLITE_DONE) error_report("Failed to update alert_version to latest transition"); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); } //decide if some events should be sent or not #define SQL_SELECT_LAST_ALERT_STATUS "SELECT status FROM alert_version WHERE health_log_id = @health_log_id " static bool cloud_status_matches(int64_t health_log_id, RRDCALC_STATUS status) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_LAST_ALERT_STATUS, &res)) return true; bool send = false; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, health_log_id)); param = 0; int rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); send = (current_status == status); } done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return send; } #define SQL_QUEUE_ALERT_TO_CLOUD \ "INSERT INTO aclk_queue (host_id, health_log_id, unique_id, date_created)" \ " VALUES (@host_id, @health_log_id, @unique_id, UNIXEPOCH())" \ " ON CONFLICT(host_id, health_log_id) DO UPDATE SET unique_id=excluded.unique_id, " \ " date_created=excluded.date_created" // // Attempt to insert an alert to the submit queue to reach the cloud // // The alert will NOT be added in the submit queue if // - Cloud is already aware of the alert status // - The transition refers to a variable // static int insert_alert_to_submit_queue(RRDHOST *host, int64_t health_log_id, uint32_t unique_id, RRDCALC_STATUS status) { static __thread sqlite3_stmt *res = NULL; if (cloud_status_matches(health_log_id, status)) { update_alert_version_transition(health_log_id, unique_id); return 1; } if (is_event_from_alert_variable_config(unique_id, &host->host_uuid)) return 2; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_QUEUE_ALERT_TO_CLOUD, &res)) return -1; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (int64_t) unique_id)); param = 0; int rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to insert alert in the submit queue %"PRIu32", rc = %d", unique_id, rc); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return 0; } #define SQL_DELETE_QUEUE_ALERT_TO_CLOUD \ "DELETE FROM aclk_queue WHERE host_id = @host_id AND sequence_id BETWEEN @seq1 AND @seq2" // // Delete a range of alerts from the submit queue (after being sent to the the cloud) // static int delete_alert_from_submit_queue(RRDHOST *host, int64_t first_seq_id, int64_t last_seq_id) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_QUEUE_ALERT_TO_CLOUD, &res)) return -1; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, first_seq_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, last_seq_id)); param = 0; int rc = sqlite3_step_monitored(res); if (rc != SQLITE_DONE) error_report("Failed to delete submitted to ACLK"); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return 0; } int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) { switch(status) { case RRDCALC_STATUS_REMOVED: return ALARM_STATUS_REMOVED; case RRDCALC_STATUS_UNDEFINED: return ALARM_STATUS_NOT_A_NUMBER; case RRDCALC_STATUS_CLEAR: return ALARM_STATUS_CLEAR; case RRDCALC_STATUS_WARNING: return ALARM_STATUS_WARNING; case RRDCALC_STATUS_CRITICAL: return ALARM_STATUS_CRITICAL; default: return ALARM_STATUS_UNKNOWN; } } static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) { char uuid_str[UUID_STR_LEN]; if(sqlite3_column_type(res, iCol) == SQLITE_NULL) uuid_str[0] = '\0'; else uuid_unparse_lower(*((nd_uuid_t *) sqlite3_column_blob(res, iCol)), uuid_str); return strdupz(uuid_str); } static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) { char *ret; if(sqlite3_column_type(res, iCol) == SQLITE_NULL) ret = ""; else ret = (char *)sqlite3_column_text(res, iCol); return strdupz(ret); } #define SQL_UPDATE_ALERT_VERSION \ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted)" \ " VALUES (@health_log_id, @unique_id, @status, @version, UNIXEPOCH())" \ " ON CONFLICT(health_log_id) DO UPDATE SET status = excluded.status, version = excluded.version, " \ " unique_id=excluded.unique_id, date_submitted=excluded.date_submitted" // // Store a new alert transition along with the version after sending to the cloud // - Update an existing alert with the updated version, status, transition and date submitted // static void sql_update_alert_version(int64_t health_log_id, int64_t unique_id, RRDCALC_STATUS status, uint64_t version) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, status)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, version)); param = 0; int rc = sqlite3_step_monitored(res); if (rc != SQLITE_DONE) error_report("Failed to execute sql_update_alert_version"); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); } #define SQL_SELECT_ALERT_TO_DUMMY \ "SELECT aq.sequence_id, hld.unique_id, hld.when_key, hld.new_status, hld.health_log_id" \ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \ " ORDER BY aq.sequence_id ASC" // // Check all queued alerts for a host and commit them as if they have been send to the cloud // this will produce new versions as needed. We need this because we are about to send a // a snapshot so we can include the latest transition. // static void commit_alert_events(RRDHOST *host) { sqlite3_stmt *res = NULL; if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_DUMMY, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); int64_t first_sequence_id = 0; int64_t last_sequence_id = 0; param = 0; while (sqlite3_step_monitored(res) == SQLITE_ROW) { last_sequence_id = sqlite3_column_int64(res, 0); if (first_sequence_id == 0) first_sequence_id = last_sequence_id; int64_t unique_id = sqlite3_column_int(res, 1); int64_t version = sqlite3_column_int64(res, 2); RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 3); int64_t health_log_id = sqlite3_column_int64(res, 4); sql_update_alert_version(health_log_id, unique_id, status, version); } if (first_sequence_id) delete_alert_from_submit_queue(host, first_sequence_id, last_sequence_id); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); } typedef enum { SEQUENCE_ID, UNIQUE_ID, ALARM_ID, CONFIG_HASH_ID, UPDATED_BY_ID, WHEN_KEY, DURATION, NON_CLEAR_DURATION, FLAGS, EXEC_RUN_TIMESTAMP, DELAY_UP_TO_TIMESTAMP, NAME, CHART, EXEC, RECIPIENT, SOURCE, UNITS, INFO, EXEC_CODE, NEW_STATUS, OLD_STATUS, DELAY, NEW_VALUE, OLD_VALUE, LAST_REPEAT, CHART_CONTEXT, TRANSITION_ID, ALARM_EVENT_ID, CHART_NAME, SUMMARY, HEALTH_LOG_ID, VERSION } HealthLogDetails; void health_alarm_log_populate( struct alarm_log_entry *alarm_log, sqlite3_stmt *res, RRDHOST *host, RRDCALC_STATUS *status) { char old_value_string[100 + 1]; char new_value_string[100 + 1]; RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, NEW_STATUS); if (status) *status = current_status; char *source = (char *) sqlite3_column_text(res, SOURCE); alarm_log->command = source ? health_edit_command_from_source(source) : strdupz("UNKNOWN=0=UNKNOWN"); alarm_log->chart = strdupz((char *) sqlite3_column_text(res, CHART)); alarm_log->name = strdupz((char *) sqlite3_column_text(res, NAME)); alarm_log->when = sqlite3_column_int64(res, WHEN_KEY); alarm_log->config_hash = sqlite3_uuid_unparse_strdupz(res, CONFIG_HASH_ID); alarm_log->utc_offset = host->utc_offset; alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host)); alarm_log->exec_path = sqlite3_column_bytes(res, EXEC) ? strdupz((char *)sqlite3_column_text(res, EXEC)) : strdupz((char *)string2str(host->health.health_default_exec)); alarm_log->conf_source = source ? strdupz(source) : strdupz(""); time_t duration = sqlite3_column_int64(res, DURATION); alarm_log->duration = (duration > 0) ? duration : 0; alarm_log->non_clear_duration = sqlite3_column_int64(res, NON_CLEAR_DURATION); alarm_log->status = rrdcalc_status_to_proto_enum(current_status); alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)sqlite3_column_int64(res, OLD_STATUS)); alarm_log->delay = sqlite3_column_int64(res, DELAY); alarm_log->delay_up_to_timestamp = sqlite3_column_int64(res, DELAY_UP_TO_TIMESTAMP); alarm_log->last_repeat = sqlite3_column_int64(res, LAST_REPEAT); uint64_t flags = sqlite3_column_int64(res, FLAGS); char *recipient = (char *) sqlite3_column_text(res, RECIPIENT); alarm_log->silenced = ((flags & HEALTH_ENTRY_FLAG_SILENCED) || (recipient && !strncmp(recipient, "silent", 6))) ? 1 : 0; double value = sqlite3_column_double(res, NEW_VALUE); double old_value = sqlite3_column_double(res, OLD_VALUE); alarm_log->value_string = sqlite3_column_type(res, NEW_VALUE) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit( new_value_string, 100, value, (char *)sqlite3_column_text(res, UNITS), -1)); alarm_log->old_value_string = sqlite3_column_type(res, OLD_VALUE) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit( old_value_string, 100, old_value, (char *)sqlite3_column_text(res, UNITS), -1)); alarm_log->value = (!isnan(value)) ? (NETDATA_DOUBLE)value : 0; alarm_log->old_value = (!isnan(old_value)) ? (NETDATA_DOUBLE)old_value : 0; alarm_log->updated = (flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; alarm_log->rendered_info = sqlite3_text_strdupz_empty(res, INFO); alarm_log->chart_context = sqlite3_text_strdupz_empty(res, CHART_CONTEXT); alarm_log->chart_name = sqlite3_text_strdupz_empty(res, CHART_NAME); alarm_log->transition_id = sqlite3_uuid_unparse_strdupz(res, TRANSITION_ID); alarm_log->event_id = sqlite3_column_int64(res, ALARM_EVENT_ID); alarm_log->version = sqlite3_column_int64(res, VERSION); alarm_log->summary = sqlite3_text_strdupz_empty(res, SUMMARY); alarm_log->health_log_id = sqlite3_column_int64(res, HEALTH_LOG_ID); alarm_log->unique_id = sqlite3_column_int64(res, UNIQUE_ID); alarm_log->alarm_id = sqlite3_column_int64(res, ALARM_ID); alarm_log->sequence_id = sqlite3_column_int64(res, SEQUENCE_ID); } #define SQL_SELECT_ALERT_TO_PUSH \ "SELECT aq.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key," \ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name," \ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status," \ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id," \ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, hld.when_key" \ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \ " ORDER BY aq.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES static void aclk_push_alert_event(RRDHOST *host __maybe_unused) { char *claim_id = get_agent_claimid(); if (!claim_id || !host->node_id) return; sqlite3_stmt *res = NULL; if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_PUSH, &res)) { freez(claim_id); return; } int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); char node_id_str[UUID_STR_LEN]; uuid_unparse_lower(*host->node_id, node_id_str); struct alarm_log_entry alarm_log; alarm_log.node_id = node_id_str; alarm_log.claim_id = claim_id; int64_t first_id = 0; int64_t last_id = 0; param = 0; RRDCALC_STATUS status; struct aclk_sync_cfg_t *wc = host->aclk_config; while (sqlite3_step_monitored(res) == SQLITE_ROW) { health_alarm_log_populate(&alarm_log, res, host, &status); aclk_send_alarm_log_entry(&alarm_log); wc->alert_count++; last_id = alarm_log.sequence_id; if (first_id == 0) first_id = last_id; sql_update_alert_version(alarm_log.health_log_id, alarm_log.unique_id, status, alarm_log.version); destroy_alarm_log_entry(&alarm_log); } if (first_id) { nd_log( NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS SENT from %ld - %ld", node_id_str, rrdhost_hostname(host), first_id, last_id); delete_alert_from_submit_queue(host, first_id, last_id); // Mark to do one more check rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); freez(claim_id); } #define SQL_DELETE_PROCESSED_ROWS \ "DELETE FROM alert_queue WHERE host_id = @host_id AND rowid between @row1 AND @row2" static void delete_alert_from_pending_queue(RRDHOST *host, int64_t row1, int64_t row2) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_PROCESSED_ROWS, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row1)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row2)); param = 0; int rc = sqlite3_step_monitored(res); if (rc != SQLITE_DONE) error_report("Failed to delete processed rows, rc = %d", rc); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); } #define SQL_REBUILD_HOST_ALERT_VERSION_TABLE \ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted) " \ " SELECT hl.health_log_id, hld.unique_id, hld.new_status, hld.when_key, UNIXEPOCH() " \ " FROM health_log hl, health_log_detail hld WHERE " \ " hl.host_id = @host_id AND hld.health_log_id = hl.health_log_id AND hld.transition_id = hl.last_transition_id" #define SQL_DELETE_HOST_ALERT_VERSION_TABLE \ "DELETE FROM alert_version WHERE health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id)" void rebuild_host_alert_version_table(RRDHOST *host) { sqlite3_stmt *res = NULL; if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_HOST_ALERT_VERSION_TABLE, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; int rc = execute_insert(res); if (rc != SQLITE_DONE) { netdata_log_error("Failed to delete the host alert version table"); goto done; } SQLITE_FINALIZE(res); if (!PREPARE_STATEMENT(db_meta, SQL_REBUILD_HOST_ALERT_VERSION_TABLE, &res)) return; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; rc = execute_insert(res); if (rc != SQLITE_DONE) netdata_log_error("Failed to rebuild the host alert version table"); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); } #define SQL_PROCESS_ALERT_PENDING_QUEUE \ "SELECT health_log_id, unique_id, status, rowid" \ " FROM alert_queue WHERE host_id = @host_id AND date_scheduled <= UNIXEPOCH() ORDER BY rowid ASC" bool process_alert_pending_queue(RRDHOST *host) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_PROCESS_ALERT_PENDING_QUEUE, &res)) return false; int param = 0; int added =0, count = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; int64_t start_row = 0; int64_t end_row = 0; while (sqlite3_step_monitored(res) == SQLITE_ROW) { int64_t health_log_id = sqlite3_column_int64(res, 0); uint32_t unique_id = sqlite3_column_int64(res, 1); RRDCALC_STATUS new_status = sqlite3_column_int(res, 2); int64_t row = sqlite3_column_int64(res, 3); if (host->aclk_config) { int ret = insert_alert_to_submit_queue(host, health_log_id, unique_id, new_status); if (ret == 0) added++; } if (!start_row) start_row = row; end_row = row; count++; } if (start_row) delete_alert_from_pending_queue(host, start_row, end_row); if(count) nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Processed %d entries, queued %d", rrdhost_hostname(host), count, added); done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return added > 0; } void aclk_push_alert_events_for_all_hosts(void) { RRDHOST *host; // Checking if we shutting down if (!service_running(SERVICE_ACLK)) return; dfe_start_reentrant(rrdhost_root_index, host) { if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) || rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) continue; rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); struct aclk_sync_cfg_t *wc = host->aclk_config; if (!wc || false == wc->stream_alerts || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { (void)process_alert_pending_queue(host); commit_alert_events(host); continue; } if (wc->send_snapshot) { rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); if (wc->send_snapshot == 1) continue; (void)process_alert_pending_queue(host); commit_alert_events(host); rebuild_host_alert_version_table(host); send_alert_snapshot_to_cloud(host); wc->snapshot_count++; wc->send_snapshot = 0; } else aclk_push_alert_event(host); } dfe_done(host); } void aclk_send_alert_configuration(char *config_hash) { if (unlikely(!config_hash)) return; struct aclk_sync_cfg_t *wc = localhost->aclk_config; if (unlikely(!wc)) return; nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_push_alert_config(wc->node_id, config_hash); } #define SQL_SELECT_ALERT_CONFIG \ "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ "module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \ "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id" void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { sqlite3_stmt *res = NULL; struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); if (unlikely(!host || !(wc = host->aclk_config))) { freez(config_hash); freez(node_id); return; } if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_CONFIG, &res)) return; nd_uuid_t hash_uuid; if (uuid_parse(config_hash, hash_uuid)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC)); struct aclk_alarm_configuration alarm_config; struct provide_alarm_configuration p_alarm_config; p_alarm_config.cfg_hash = NULL; param = 0; if (sqlite3_step_monitored(res) == SQLITE_ROW) { alarm_config.alarm = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.tmpl = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.on_chart = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.classification = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.type = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.component = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.os = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.hosts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.plugin = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.module = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.charts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.lookup = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.every = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.units = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.green = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.red = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.calculation_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.warning_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.critical_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.recipient = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.exec = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.delay = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.repeat = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.info = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.options = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); alarm_config.host_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 25 alarm_config.p_db_lookup_dimensions = NULL; alarm_config.p_db_lookup_method = NULL; alarm_config.p_db_lookup_options = NULL; alarm_config.p_db_lookup_after = 0; alarm_config.p_db_lookup_before = 0; if (sqlite3_column_bytes(res, 29) > 0) { alarm_config.p_db_lookup_dimensions = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 26 alarm_config.p_db_lookup_method = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 27 if (param != 28) netdata_log_error("aclk_push_alert_config_event: Unexpected param number %d", param); BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); rrdr_options_to_buffer(tmp_buf, sqlite3_column_int(res, 28)); alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf)); buffer_free(tmp_buf); alarm_config.p_db_lookup_after = sqlite3_column_int(res, 29); alarm_config.p_db_lookup_before = sqlite3_column_int(res, 30); } alarm_config.p_update_every = sqlite3_column_int(res, 31); alarm_config.chart_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 32); alarm_config.summary = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 33); p_alarm_config.cfg_hash = strdupz((char *) config_hash); p_alarm_config.cfg = alarm_config; } param = 0; if (likely(p_alarm_config.cfg_hash)) { nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); freez(config_hash); freez(node_id); } #define SQL_ALERT_VERSION_CALC \ "SELECT SUM(version) FROM health_log hl, alert_version av" \ " WHERE hl.host_id = @host_uuid AND hl.health_log_id = av.health_log_id AND av.status <> -2" static uint64_t calculate_node_alert_version(RRDHOST *host) { static __thread sqlite3_stmt *res = NULL; if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_ALERT_VERSION_CALC, &res)) return 0; uint64_t version = 0; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; while (sqlite3_step_monitored(res) == SQLITE_ROW) { version = (uint64_t)sqlite3_column_int64(res, 0); } done: REPORT_BIND_FAIL(res, param); SQLITE_RESET(res); return version; } static void schedule_alert_snapshot_if_needed(struct aclk_sync_cfg_t *wc, uint64_t cloud_version) { uint64_t local_version = calculate_node_alert_version(wc->host); if (local_version != cloud_version) { nd_log( NDLS_ACCESS, NDLP_NOTICE, "Scheduling alert snapshot for host \"%s\", node \"%s\" (version: cloud %zu, local %zu)", rrdhost_hostname(wc->host), wc->node_id, cloud_version, local_version); wc->send_snapshot = 1; rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } else nd_log( NDLS_ACCESS, NDLP_DEBUG, "Alert check on \"%s\", node \"%s\" (version: cloud %zu, local %zu)", rrdhost_hostname(wc->host), wc->node_id, cloud_version, local_version); wc->checkpoint_count++; } void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { nd_uuid_t node_uuid; if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); if (unlikely(!host || !(wc = host->aclk_config))) { nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); return; } nd_log(NDLS_ACCESS, NDLP_DEBUG, "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", snapshot_uuid); if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) return; wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); wc->send_snapshot = 1; rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } #define SQL_COUNT_SNAPSHOT_ENTRIES \ "SELECT COUNT(1) FROM alert_version av, health_log hl " \ "WHERE hl.host_id = @host_id AND hl.health_log_id = av.health_log_id AND av.status <> -2" static int calculate_alert_snapshot_entries(nd_uuid_t *host_uuid) { int count = 0; sqlite3_stmt *res = NULL; if (!PREPARE_STATEMENT(db_meta, SQL_COUNT_SNAPSHOT_ENTRIES, &res)) return 0; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC)); param = 0; int rc = sqlite3_step_monitored(res); if (rc == SQLITE_ROW) count = sqlite3_column_int(res, 0); else error_report("Failed to select snapshot count"); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); return count; } #define SQL_GET_SNAPSHOT_ENTRIES \ " SELECT 0, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " \ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, av.version " \ " FROM health_log hl, alert_hash ah, health_log_detail hld, alert_version av " \ " WHERE hl.config_hash_id = ah.hash_id" \ " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \ " AND hld.health_log_id = av.health_log_id AND av.unique_id = hld.unique_id AND av.status <> -2" #define ALARM_EVENTS_PER_CHUNK 1000 void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused) { struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!host)) { nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", wc->node_id); return; } char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; // Check database for this node to see how many alerts we will need to put in the snapshot int cnt = calculate_alert_snapshot_entries(&host->host_uuid); if (!cnt) { freez(claim_id); return; } sqlite3_stmt *res = NULL; if (!PREPARE_STATEMENT(db_meta, SQL_GET_SNAPSHOT_ENTRIES, &res)) return; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); nd_uuid_t local_snapshot_uuid; char snapshot_uuid_str[UUID_STR_LEN]; uuid_generate_random(local_snapshot_uuid); uuid_unparse_lower(local_snapshot_uuid, snapshot_uuid_str); char *snapshot_uuid = &snapshot_uuid_str[0]; nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending %d alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(host), cnt, snapshot_uuid); uint32_t chunks; chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); alarm_snapshot_proto_ptr_t snapshot_proto = NULL; struct alarm_snapshot alarm_snap; struct alarm_log_entry alarm_log; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; alarm_snap.snapshot_uuid = snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = 1; alarm_log.node_id = wc->node_id; alarm_log.claim_id = claim_id; cnt = 0; param = 0; uint64_t version = 0; int total_count = 0; while (sqlite3_step_monitored(res) == SQLITE_ROW) { cnt++; total_count++; if (!snapshot_proto) snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); health_alarm_log_populate(&alarm_log, res, host, NULL); add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log); version += alarm_log.version; if (cnt == ALARM_EVENTS_PER_CHUNK) { if (aclk_connected) aclk_send_alarm_snapshot(snapshot_proto); cnt = 0; if (alarm_snap.chunk < chunks) { alarm_snap.chunk++; snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); } } destroy_alarm_log_entry(&alarm_log); } if (cnt) aclk_send_alarm_snapshot(snapshot_proto); nd_log( NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sent! %d alerts snapshot, snapshot_uuid %s (version = %zu)", wc->node_id, rrdhost_hostname(host), cnt, snapshot_uuid, version); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); freez(claim_id); } // Start streaming alerts void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version) { nd_uuid_t node_uuid; if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); if (unlikely(!host || !(wc = host->aclk_config))) { nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, invalid node.", node_id); return; } if (unlikely(!host->health.health_enabled)) { nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); schedule_alert_snapshot_if_needed(wc, cloud_version); wc->stream_alerts = true; } // Do checkpoint alert version check void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version) { nd_uuid_t node_uuid; if (unlikely(!node_id || !claim_id || !claimed() || uuid_parse(node_id, node_uuid))) return; char *agent_claim_id = get_agent_claimid(); if (claim_id && agent_claim_id && strcmp(agent_claim_id, claim_id) != 0) { nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED WITH INVALID CLAIM ID", node_id); goto done; } struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); if ((!host || !(wc = host->aclk_config))) nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED FOR INVALID NODE", node_id); else schedule_alert_snapshot_if_needed(wc, cloud_version); done: freez(agent_claim_id); } #endif