summaryrefslogtreecommitdiffstats
path: root/src/database/sqlite/sqlite_aclk_alert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/sqlite/sqlite_aclk_alert.c')
-rw-r--r--src/database/sqlite/sqlite_aclk_alert.c1045
1 files changed, 1045 insertions, 0 deletions
diff --git a/src/database/sqlite/sqlite_aclk_alert.c b/src/database/sqlite/sqlite_aclk_alert.c
new file mode 100644
index 000000000..dbe5a5045
--- /dev/null
+++ b/src/database/sqlite/sqlite_aclk_alert.c
@@ -0,0 +1,1045 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "sqlite_functions.h"
+#include "sqlite_aclk_alert.h"
+
+#ifdef ENABLE_ACLK
+#include "../../aclk/aclk_alarm_api.h"
+
+#define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \
+ ({ \
+ int _param = (param); \
+ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
+ })
+
+#define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
+ "SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \
+ "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
+ "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL"
+
+static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uuid_t *host_id)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res))
+ return false;
+
+ bool ret = false;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC));
+
+ param = 0;
+ ret = (sqlite3_step_monitored(res) == SQLITE_ROW);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return ret;
+}
+
+#define MAX_REMOVED_PERIOD 604800 //a week
+
+#define SQL_UPDATE_ALERT_VERSION_TRANSITION \
+ "UPDATE alert_version SET unique_id = @unique_id WHERE health_log_id = @health_log_id"
+
+static void update_alert_version_transition(int64_t health_log_id, int64_t unique_id)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION_TRANSITION, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to update alert_version to latest transition");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
+
+//decide if some events should be sent or not
+
+#define SQL_SELECT_LAST_ALERT_STATUS "SELECT status FROM alert_version WHERE health_log_id = @health_log_id "
+
+static bool cloud_status_matches(int64_t health_log_id, RRDCALC_STATUS status)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_LAST_ALERT_STATUS, &res))
+ return true;
+
+ bool send = false;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, health_log_id));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (likely(rc == SQLITE_ROW)) {
+ RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
+ send = (current_status == status);
+ }
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return send;
+}
+
+#define SQL_QUEUE_ALERT_TO_CLOUD \
+ "INSERT INTO aclk_queue (host_id, health_log_id, unique_id, date_created)" \
+ " VALUES (@host_id, @health_log_id, @unique_id, UNIXEPOCH())" \
+ " ON CONFLICT(host_id, health_log_id) DO UPDATE SET unique_id=excluded.unique_id, " \
+ " date_created=excluded.date_created"
+
+//
+// Attempt to insert an alert to the submit queue to reach the cloud
+//
+// The alert will NOT be added in the submit queue if
+// - Cloud is already aware of the alert status
+// - The transition refers to a variable
+//
+static int insert_alert_to_submit_queue(RRDHOST *host, int64_t health_log_id, uint32_t unique_id, RRDCALC_STATUS status)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (cloud_status_matches(health_log_id, status)) {
+ update_alert_version_transition(health_log_id, unique_id);
+ return 1;
+ }
+
+ if (is_event_from_alert_variable_config(unique_id, &host->host_uuid))
+ return 2;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_QUEUE_ALERT_TO_CLOUD, &res))
+ return -1;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (int64_t) unique_id));
+
+ param = 0;
+ int rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to insert alert in the submit queue %"PRIu32", rc = %d", unique_id, rc);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return 0;
+}
+
+#define SQL_DELETE_QUEUE_ALERT_TO_CLOUD \
+ "DELETE FROM aclk_queue WHERE host_id = @host_id AND sequence_id BETWEEN @seq1 AND @seq2"
+
+//
+// Delete a range of alerts from the submit queue (after being sent to the the cloud)
+//
+static int delete_alert_from_submit_queue(RRDHOST *host, int64_t first_seq_id, int64_t last_seq_id)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_QUEUE_ALERT_TO_CLOUD, &res))
+ return -1;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, first_seq_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, last_seq_id));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to delete submitted to ACLK");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return 0;
+}
+
+int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
+{
+
+ switch(status) {
+ case RRDCALC_STATUS_REMOVED:
+ return ALARM_STATUS_REMOVED;
+
+ case RRDCALC_STATUS_UNDEFINED:
+ return ALARM_STATUS_NOT_A_NUMBER;
+
+ case RRDCALC_STATUS_CLEAR:
+ return ALARM_STATUS_CLEAR;
+
+ case RRDCALC_STATUS_WARNING:
+ return ALARM_STATUS_WARNING;
+
+ case RRDCALC_STATUS_CRITICAL:
+ return ALARM_STATUS_CRITICAL;
+
+ default:
+ return ALARM_STATUS_UNKNOWN;
+ }
+}
+
+static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) {
+ char uuid_str[UUID_STR_LEN];
+
+ if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
+ uuid_str[0] = '\0';
+ else
+ uuid_unparse_lower(*((nd_uuid_t *) sqlite3_column_blob(res, iCol)), uuid_str);
+
+ return strdupz(uuid_str);
+}
+
+static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
+ char *ret;
+
+ if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
+ ret = "";
+ else
+ ret = (char *)sqlite3_column_text(res, iCol);
+
+ return strdupz(ret);
+}
+
+#define SQL_UPDATE_ALERT_VERSION \
+ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted)" \
+ " VALUES (@health_log_id, @unique_id, @status, @version, UNIXEPOCH())" \
+ " ON CONFLICT(health_log_id) DO UPDATE SET status = excluded.status, version = excluded.version, " \
+ " unique_id=excluded.unique_id, date_submitted=excluded.date_submitted"
+
+//
+// Store a new alert transition along with the version after sending to the cloud
+// - Update an existing alert with the updated version, status, transition and date submitted
+//
+static void sql_update_alert_version(int64_t health_log_id, int64_t unique_id, RRDCALC_STATUS status, uint64_t version)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, status));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, version));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to execute sql_update_alert_version");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
+
+#define SQL_SELECT_ALERT_TO_DUMMY \
+ "SELECT aq.sequence_id, hld.unique_id, hld.when_key, hld.new_status, hld.health_log_id" \
+ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
+ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
+ " ORDER BY aq.sequence_id ASC"
+
+//
+// Check all queued alerts for a host and commit them as if they have been send to the cloud
+// this will produce new versions as needed. We need this because we are about to send a
+// a snapshot so we can include the latest transition.
+//
+static void commit_alert_events(RRDHOST *host)
+{
+ sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_DUMMY, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ int64_t first_sequence_id = 0;
+ int64_t last_sequence_id = 0;
+
+ param = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+
+ last_sequence_id = sqlite3_column_int64(res, 0);
+ if (first_sequence_id == 0)
+ first_sequence_id = last_sequence_id;
+
+ int64_t unique_id = sqlite3_column_int(res, 1);
+ int64_t version = sqlite3_column_int64(res, 2);
+ RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 3);
+ int64_t health_log_id = sqlite3_column_int64(res, 4);
+
+ sql_update_alert_version(health_log_id, unique_id, status, version);
+ }
+
+ if (first_sequence_id)
+ delete_alert_from_submit_queue(host, first_sequence_id, last_sequence_id);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+}
+
+typedef enum {
+ SEQUENCE_ID,
+ UNIQUE_ID,
+ ALARM_ID,
+ CONFIG_HASH_ID,
+ UPDATED_BY_ID,
+ WHEN_KEY,
+ DURATION,
+ NON_CLEAR_DURATION,
+ FLAGS,
+ EXEC_RUN_TIMESTAMP,
+ DELAY_UP_TO_TIMESTAMP,
+ NAME,
+ CHART,
+ EXEC,
+ RECIPIENT,
+ SOURCE,
+ UNITS,
+ INFO,
+ EXEC_CODE,
+ NEW_STATUS,
+ OLD_STATUS,
+ DELAY,
+ NEW_VALUE,
+ OLD_VALUE,
+ LAST_REPEAT,
+ CHART_CONTEXT,
+ TRANSITION_ID,
+ ALARM_EVENT_ID,
+ CHART_NAME,
+ SUMMARY,
+ HEALTH_LOG_ID,
+ VERSION
+} HealthLogDetails;
+
+void health_alarm_log_populate(
+ struct alarm_log_entry *alarm_log,
+ sqlite3_stmt *res,
+ RRDHOST *host,
+ RRDCALC_STATUS *status)
+{
+ char old_value_string[100 + 1];
+ char new_value_string[100 + 1];
+
+ RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, NEW_STATUS);
+ if (status)
+ *status = current_status;
+
+ char *source = (char *) sqlite3_column_text(res, SOURCE);
+ alarm_log->command = source ? health_edit_command_from_source(source) : strdupz("UNKNOWN=0=UNKNOWN");
+
+ alarm_log->chart = strdupz((char *) sqlite3_column_text(res, CHART));
+ alarm_log->name = strdupz((char *) sqlite3_column_text(res, NAME));
+
+ alarm_log->when = sqlite3_column_int64(res, WHEN_KEY);
+
+ alarm_log->config_hash = sqlite3_uuid_unparse_strdupz(res, CONFIG_HASH_ID);
+
+ alarm_log->utc_offset = host->utc_offset;
+ alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
+ alarm_log->exec_path = sqlite3_column_bytes(res, EXEC) ?
+ strdupz((char *)sqlite3_column_text(res, EXEC)) :
+ strdupz((char *)string2str(host->health.health_default_exec));
+
+ alarm_log->conf_source = source ? strdupz(source) : strdupz("");
+
+ time_t duration = sqlite3_column_int64(res, DURATION);
+ alarm_log->duration = (duration > 0) ? duration : 0;
+
+ alarm_log->non_clear_duration = sqlite3_column_int64(res, NON_CLEAR_DURATION);
+
+ alarm_log->status = rrdcalc_status_to_proto_enum(current_status);
+ alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)sqlite3_column_int64(res, OLD_STATUS));
+ alarm_log->delay = sqlite3_column_int64(res, DELAY);
+ alarm_log->delay_up_to_timestamp = sqlite3_column_int64(res, DELAY_UP_TO_TIMESTAMP);
+ alarm_log->last_repeat = sqlite3_column_int64(res, LAST_REPEAT);
+
+ uint64_t flags = sqlite3_column_int64(res, FLAGS);
+ char *recipient = (char *) sqlite3_column_text(res, RECIPIENT);
+ alarm_log->silenced =
+ ((flags & HEALTH_ENTRY_FLAG_SILENCED) || (recipient && !strncmp(recipient, "silent", 6))) ? 1 : 0;
+
+ double value = sqlite3_column_double(res, NEW_VALUE);
+ double old_value = sqlite3_column_double(res, OLD_VALUE);
+
+ alarm_log->value_string =
+ sqlite3_column_type(res, NEW_VALUE) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ new_value_string, 100, value, (char *)sqlite3_column_text(res, UNITS), -1));
+
+ alarm_log->old_value_string =
+ sqlite3_column_type(res, OLD_VALUE) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ old_value_string, 100, old_value, (char *)sqlite3_column_text(res, UNITS), -1));
+
+ alarm_log->value = (!isnan(value)) ? (NETDATA_DOUBLE)value : 0;
+ alarm_log->old_value = (!isnan(old_value)) ? (NETDATA_DOUBLE)old_value : 0;
+
+ alarm_log->updated = (flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
+ alarm_log->rendered_info = sqlite3_text_strdupz_empty(res, INFO);
+ alarm_log->chart_context = sqlite3_text_strdupz_empty(res, CHART_CONTEXT);
+ alarm_log->chart_name = sqlite3_text_strdupz_empty(res, CHART_NAME);
+
+ alarm_log->transition_id = sqlite3_uuid_unparse_strdupz(res, TRANSITION_ID);
+ alarm_log->event_id = sqlite3_column_int64(res, ALARM_EVENT_ID);
+ alarm_log->version = sqlite3_column_int64(res, VERSION);
+
+ alarm_log->summary = sqlite3_text_strdupz_empty(res, SUMMARY);
+
+ alarm_log->health_log_id = sqlite3_column_int64(res, HEALTH_LOG_ID);
+ alarm_log->unique_id = sqlite3_column_int64(res, UNIQUE_ID);
+ alarm_log->alarm_id = sqlite3_column_int64(res, ALARM_ID);
+ alarm_log->sequence_id = sqlite3_column_int64(res, SEQUENCE_ID);
+}
+
+#define SQL_SELECT_ALERT_TO_PUSH \
+ "SELECT aq.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key," \
+ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name," \
+ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status," \
+ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id," \
+ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, hld.when_key" \
+ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
+ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
+ " ORDER BY aq.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES
+
+static void aclk_push_alert_event(RRDHOST *host __maybe_unused)
+{
+
+ char *claim_id = get_agent_claimid();
+ if (!claim_id || !host->node_id)
+ return;
+
+ sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_PUSH, &res)) {
+ freez(claim_id);
+ return;
+ }
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ char node_id_str[UUID_STR_LEN];
+ uuid_unparse_lower(*host->node_id, node_id_str);
+
+ struct alarm_log_entry alarm_log;
+ alarm_log.node_id = node_id_str;
+ alarm_log.claim_id = claim_id;
+
+ int64_t first_id = 0;
+ int64_t last_id = 0;
+
+ param = 0;
+ RRDCALC_STATUS status;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ health_alarm_log_populate(&alarm_log, res, host, &status);
+ aclk_send_alarm_log_entry(&alarm_log);
+ wc->alert_count++;
+
+ last_id = alarm_log.sequence_id;
+ if (first_id == 0)
+ first_id = last_id;
+
+ sql_update_alert_version(alarm_log.health_log_id, alarm_log.unique_id, status, alarm_log.version);
+
+ destroy_alarm_log_entry(&alarm_log);
+ }
+
+ if (first_id) {
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK RES [%s (%s)]: ALERTS SENT from %ld - %ld",
+ node_id_str,
+ rrdhost_hostname(host),
+ first_id,
+ last_id);
+
+ delete_alert_from_submit_queue(host, first_id, last_id);
+ // Mark to do one more check
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ }
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+
+ freez(claim_id);
+}
+
+#define SQL_DELETE_PROCESSED_ROWS "DELETE FROM alert_queue WHERE host_id = @host_id AND rowid = @row"
+
+static void delete_alert_from_pending_queue(RRDHOST *host, int64_t row)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_PROCESSED_ROWS, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to delete processed rows, rc = %d", rc);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
+
+#define SQL_REBUILD_HOST_ALERT_VERSION_TABLE \
+ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted) " \
+ " SELECT hl.health_log_id, hld.unique_id, hld.new_status, hld.when_key, UNIXEPOCH() " \
+ " FROM health_log hl, health_log_detail hld WHERE " \
+ " hl.host_id = @host_id AND hld.health_log_id = hl.health_log_id AND hld.transition_id = hl.last_transition_id"
+
+#define SQL_DELETE_HOST_ALERT_VERSION_TABLE \
+ "DELETE FROM alert_version WHERE health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id)"
+
+void rebuild_host_alert_version_table(RRDHOST *host)
+{
+ sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_HOST_ALERT_VERSION_TABLE, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ int rc = execute_insert(res);
+ if (rc != SQLITE_DONE) {
+ netdata_log_error("Failed to delete the host alert version table");
+ goto done;
+ }
+
+ SQLITE_FINALIZE(res);
+ if (!PREPARE_STATEMENT(db_meta, SQL_REBUILD_HOST_ALERT_VERSION_TABLE, &res))
+ return;
+
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ rc = execute_insert(res);
+ if (rc != SQLITE_DONE)
+ netdata_log_error("Failed to rebuild the host alert version table");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+}
+
+#define SQL_PROCESS_ALERT_PENDING_QUEUE \
+ "SELECT health_log_id, unique_id, status, rowid" \
+ " FROM alert_queue WHERE host_id = @host_id AND date_scheduled <= UNIXEPOCH() ORDER BY rowid ASC"
+
+bool process_alert_pending_queue(RRDHOST *host)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_PROCESS_ALERT_PENDING_QUEUE, &res))
+ return false;
+
+ int param = 0;
+ int added =0, count = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+
+ int64_t health_log_id = sqlite3_column_int64(res, 0);
+ uint32_t unique_id = sqlite3_column_int64(res, 1);
+ RRDCALC_STATUS new_status = sqlite3_column_int(res, 2);
+ int64_t row = sqlite3_column_int64(res, 3);
+
+ if (host->aclk_config) {
+ int ret = insert_alert_to_submit_queue(host, health_log_id, unique_id, new_status);
+ if (ret == 0)
+ added++;
+ }
+
+ delete_alert_from_pending_queue(host, row);
+
+ count++;
+ }
+
+ if(count)
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Processed %d entries, queued %d", rrdhost_hostname(host), count, added);
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return added > 0;
+}
+
+void aclk_push_alert_events_for_all_hosts(void)
+{
+ RRDHOST *host;
+
+ // Checking if we shutting down
+ if (!service_running(SERVICE_ACLK))
+ return;
+
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) ||
+ rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
+ continue;
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc || false == wc->stream_alerts || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
+ (void)process_alert_pending_queue(host);
+ commit_alert_events(host);
+ continue;
+ }
+
+ if (wc->send_snapshot) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ if (wc->send_snapshot == 1)
+ continue;
+ (void)process_alert_pending_queue(host);
+ commit_alert_events(host);
+ rebuild_host_alert_version_table(host);
+ send_alert_snapshot_to_cloud(host);
+ wc->snapshot_count++;
+ wc->send_snapshot = 0;
+ }
+ else
+ aclk_push_alert_event(host);
+ }
+ dfe_done(host);
+}
+
+void aclk_send_alert_configuration(char *config_hash)
+{
+ if (unlikely(!config_hash))
+ return;
+
+ struct aclk_sync_cfg_t *wc = localhost->aclk_config;
+
+ if (unlikely(!wc))
+ return;
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: Request to send alert config %s.",
+ wc->node_id,
+ wc->host ? rrdhost_hostname(wc->host) : "N/A",
+ config_hash);
+
+ aclk_push_alert_config(wc->node_id, config_hash);
+}
+
+#define SQL_SELECT_ALERT_CONFIG \
+ "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
+ "module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
+ "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
+ "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id"
+
+void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
+{
+ sqlite3_stmt *res = NULL;
+ struct aclk_sync_cfg_t *wc;
+
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host || !(wc = host->aclk_config))) {
+ freez(config_hash);
+ freez(node_id);
+ return;
+ }
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_CONFIG, &res))
+ return;
+
+ nd_uuid_t hash_uuid;
+ if (uuid_parse(config_hash, hash_uuid))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC));
+
+ struct aclk_alarm_configuration alarm_config;
+ struct provide_alarm_configuration p_alarm_config;
+ p_alarm_config.cfg_hash = NULL;
+
+ param = 0;
+ if (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ alarm_config.alarm = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.tmpl = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.on_chart = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.classification = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.type = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.component = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.os = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.hosts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.plugin = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.module = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.charts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.lookup = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.every = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.units = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.green = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.red = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.calculation_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.warning_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.critical_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.recipient = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.exec = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.delay = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.repeat = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.info = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.options = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
+ alarm_config.host_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 25
+
+ alarm_config.p_db_lookup_dimensions = NULL;
+ alarm_config.p_db_lookup_method = NULL;
+ alarm_config.p_db_lookup_options = NULL;
+ alarm_config.p_db_lookup_after = 0;
+ alarm_config.p_db_lookup_before = 0;
+
+ if (sqlite3_column_bytes(res, 29) > 0) {
+
+ alarm_config.p_db_lookup_dimensions = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 26
+ alarm_config.p_db_lookup_method = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 27
+ if (param != 28)
+ netdata_log_error("aclk_push_alert_config_event: Unexpected param number %d", param);
+
+ BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ rrdr_options_to_buffer(tmp_buf, sqlite3_column_int(res, 28));
+ alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
+ buffer_free(tmp_buf);
+
+ alarm_config.p_db_lookup_after = sqlite3_column_int(res, 29);
+ alarm_config.p_db_lookup_before = sqlite3_column_int(res, 30);
+ }
+
+ alarm_config.p_update_every = sqlite3_column_int(res, 31);
+
+ alarm_config.chart_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 32);
+ alarm_config.summary = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 33);
+
+ p_alarm_config.cfg_hash = strdupz((char *) config_hash);
+ p_alarm_config.cfg = alarm_config;
+ }
+
+ param = 0;
+
+ if (likely(p_alarm_config.cfg_hash)) {
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
+ aclk_send_provide_alarm_cfg(&p_alarm_config);
+ freez(p_alarm_config.cfg_hash);
+ destroy_aclk_alarm_configuration(&alarm_config);
+ }
+ else
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+ freez(config_hash);
+ freez(node_id);
+}
+
+#define SQL_ALERT_VERSION_CALC \
+ "SELECT SUM(version) FROM health_log hl, alert_version av" \
+ " WHERE hl.host_id = @host_uuid AND hl.health_log_id = av.health_log_id AND av.status <> -2"
+
+static uint64_t calculate_node_alert_version(RRDHOST *host)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_ALERT_VERSION_CALC, &res))
+ return 0;
+
+ uint64_t version = 0;
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ version = (uint64_t)sqlite3_column_int64(res, 0);
+ }
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return version;
+}
+
+static void schedule_alert_snapshot_if_needed(struct aclk_sync_cfg_t *wc, uint64_t cloud_version)
+{
+ uint64_t local_version = calculate_node_alert_version(wc->host);
+ if (local_version != cloud_version) {
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_NOTICE,
+ "Scheduling alert snapshot for host \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
+ rrdhost_hostname(wc->host),
+ wc->node_id,
+ cloud_version,
+ local_version);
+
+ wc->send_snapshot = 1;
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ }
+ else
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "Alert check on \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
+ rrdhost_hostname(wc->host),
+ wc->node_id,
+ cloud_version,
+ local_version);
+ wc->checkpoint_count++;
+}
+
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
+{
+ nd_uuid_t node_uuid;
+
+ if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
+ return;
+
+ struct aclk_sync_cfg_t *wc;
+
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (unlikely(!host || !(wc = host->aclk_config))) {
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
+ return;
+ }
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
+ "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
+ node_id,
+ wc->host ? rrdhost_hostname(wc->host) : "N/A",
+ snapshot_uuid);
+
+ if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
+ return;
+
+ wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
+
+ wc->send_snapshot = 1;
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+}
+
+#define SQL_COUNT_SNAPSHOT_ENTRIES \
+ "SELECT COUNT(1) FROM alert_version av, health_log hl " \
+ "WHERE hl.host_id = @host_id AND hl.health_log_id = av.health_log_id AND av.status <> -2"
+
+static int calculate_alert_snapshot_entries(nd_uuid_t *host_uuid)
+{
+ int count = 0;
+
+ sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_COUNT_SNAPSHOT_ENTRIES, &res))
+ return 0;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc == SQLITE_ROW)
+ count = sqlite3_column_int(res, 0);
+ else
+ error_report("Failed to select snapshot count");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+
+ return count;
+}
+
+#define SQL_GET_SNAPSHOT_ENTRIES \
+ " SELECT 0, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \
+ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \
+ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \
+ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " \
+ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, av.version " \
+ " FROM health_log hl, alert_hash ah, health_log_detail hld, alert_version av " \
+ " WHERE hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \
+ " AND hld.health_log_id = av.health_log_id AND av.unique_id = hld.unique_id AND av.status <> -2"
+
+#define ALARM_EVENTS_PER_CHUNK 1000
+void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused)
+{
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+
+ if (unlikely(!host)) {
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", wc->node_id);
+ return;
+ }
+
+ char *claim_id = get_agent_claimid();
+ if (unlikely(!claim_id))
+ return;
+
+ // Check database for this node to see how many alerts we will need to put in the snapshot
+ int cnt = calculate_alert_snapshot_entries(&host->host_uuid);
+ if (!cnt) {
+ freez(claim_id);
+ return;
+ }
+
+ sqlite3_stmt *res = NULL;
+ if (!PREPARE_STATEMENT(db_meta, SQL_GET_SNAPSHOT_ENTRIES, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ nd_uuid_t local_snapshot_uuid;
+ char snapshot_uuid_str[UUID_STR_LEN];
+ uuid_generate_random(local_snapshot_uuid);
+ uuid_unparse_lower(local_snapshot_uuid, snapshot_uuid_str);
+ char *snapshot_uuid = &snapshot_uuid_str[0];
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: Sending %d alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(host),
+ cnt, snapshot_uuid);
+
+ uint32_t chunks;
+ chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
+
+ alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
+ struct alarm_snapshot alarm_snap;
+ struct alarm_log_entry alarm_log;
+
+ alarm_snap.node_id = wc->node_id;
+ alarm_snap.claim_id = claim_id;
+ alarm_snap.snapshot_uuid = snapshot_uuid;
+ alarm_snap.chunks = chunks;
+ alarm_snap.chunk = 1;
+
+ alarm_log.node_id = wc->node_id;
+ alarm_log.claim_id = claim_id;
+
+ cnt = 0;
+ param = 0;
+ uint64_t version = 0;
+ int total_count = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ cnt++;
+ total_count++;
+
+ if (!snapshot_proto)
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+
+ health_alarm_log_populate(&alarm_log, res, host, NULL);
+
+ add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
+ version += alarm_log.version;
+
+ if (cnt == ALARM_EVENTS_PER_CHUNK) {
+ if (aclk_connected)
+ aclk_send_alarm_snapshot(snapshot_proto);
+ cnt = 0;
+ if (alarm_snap.chunk < chunks) {
+ alarm_snap.chunk++;
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+ }
+ }
+ destroy_alarm_log_entry(&alarm_log);
+ }
+ if (cnt)
+ aclk_send_alarm_snapshot(snapshot_proto);
+
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: Sent! %d alerts snapshot, snapshot_uuid %s (version = %zu)",
+ wc->node_id,
+ rrdhost_hostname(host),
+ cnt,
+ snapshot_uuid,
+ version);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+
+ freez(claim_id);
+}
+
+// Start streaming alerts
+void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version)
+{
+ nd_uuid_t node_uuid;
+
+ if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
+ return;
+
+ struct aclk_sync_cfg_t *wc;
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host || !(wc = host->aclk_config))) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, invalid node.", node_id);
+ return;
+ }
+
+ if (unlikely(!host->health.health_enabled)) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ return;
+ }
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ schedule_alert_snapshot_if_needed(wc, cloud_version);
+ wc->stream_alerts = true;
+}
+
+// Do checkpoint alert version check
+void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version)
+{
+ nd_uuid_t node_uuid;
+
+ if (unlikely(!node_id || !claim_id || !claimed() || uuid_parse(node_id, node_uuid)))
+ return;
+
+ char *agent_claim_id = get_agent_claimid();
+ if (claim_id && agent_claim_id && strcmp(agent_claim_id, claim_id) != 0) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED WITH INVALID CLAIM ID", node_id);
+ goto done;
+ }
+
+ struct aclk_sync_cfg_t *wc;
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if ((!host || !(wc = host->aclk_config)))
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED FOR INVALID NODE", node_id);
+ else
+ schedule_alert_snapshot_if_needed(wc, cloud_version);
+
+done:
+ freez(agent_claim_id);
+}
+
+#endif