summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_alert.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r--database/sqlite/sqlite_aclk_alert.c117
1 files changed, 58 insertions, 59 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index ea1cc9fea..47663a8d1 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -24,7 +24,7 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after
return 0;
}
- rc = sqlite3_step(res);
+ rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
when = (time_t) sqlite3_column_int64(res, 0);
}
@@ -36,7 +36,14 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after
return when;
}
-#define MAX_REMOVED_PERIOD 900
+void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) {
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id);
+ sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
+ ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+}
+
+#define MAX_REMOVED_PERIOD 86400
//decide if some events should be sent or not
int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
{
@@ -56,12 +63,13 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
uuid_t config_hash_id;
RRDCALC_STATUS status;
uint32_t unique_id;
-
+
//get the previous sent event of this alarm_id
+ //base the search on the last filtered event
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
- where hl.unique_id = aa.alert_unique_id \
- and hl.alarm_id = %u and hl.unique_id <> %u \
- order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id);
+ where hl.unique_id = aa.filtered_alert_unique_id \
+ and hl.alarm_id = %u \
+ order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id);
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -70,7 +78,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
return send;
}
- rc = sqlite3_step(res);
+ rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
if (sqlite3_column_type(res, 1) != SQLITE_NULL)
@@ -93,8 +101,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
}
//same status, same config
- if (ae->new_status == RRDCALC_STATUS_CLEAR) {
+ if (ae->new_status == RRDCALC_STATUS_CLEAR || ae->new_status == RRDCALC_STATUS_UNDEFINED) {
send = 0;
+ update_filtered(ae, unique_id, uuid_str);
goto done;
}
@@ -107,6 +116,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
goto done;
} else {
send = 0;
+ update_filtered(ae, unique_id, uuid_str);
goto done;
}
}
@@ -130,6 +140,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
return 0;
}
+ CHECK_SQLITE_CONNECTION(db_meta);
+
if (!skip_filter) {
if (!should_send_to_cloud(host, ae)) {
return 0;
@@ -137,9 +149,6 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
}
int rc = 0;
-
- CHECK_SQLITE_CONNECTION(db_meta);
-
sqlite3_stmt *res_alert = NULL;
char uuid_str[GUID_LEN + 1];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
@@ -148,8 +157,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
buffer_sprintf(
sql,
- "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) "
- "VALUES (@alert_unique_id, unixepoch()) on conflict (alert_unique_id) do nothing; ",
+ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
+ "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ",
uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
@@ -220,7 +229,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
int rc;
if (unlikely(!wc->alert_updates)) {
- log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
return;
}
@@ -280,7 +289,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
static __thread uint64_t log_first_sequence_id = 0;
static __thread uint64_t log_last_sequence_id = 0;
- while (sqlite3_step(res) == SQLITE_ROW) {
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
struct alarm_log_entry alarm_log;
char old_value_string[100 + 1];
char new_value_string[100 + 1];
@@ -300,9 +309,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.config_hash = strdupz((char *)uuid_str);
alarm_log.utc_offset = wc->host->utc_offset;
- alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone);
+ alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
- strdupz((char *)wc->host->health_default_exec);
+ strdupz((char *)string2str(wc->host->health_default_exec));
alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
@@ -374,7 +383,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
log_access(
"ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ wc->host ? rrdhost_hostname(wc->host) : "N/A",
log_first_sequence_id,
log_last_sequence_id,
wc->alerts_batch_id);
@@ -401,8 +410,8 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql,"delete from aclk_alert_%s; " \
- "insert into aclk_alert_%s (alert_unique_id, date_created) " \
- "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
+ "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+ "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
"where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
"order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
@@ -424,9 +433,7 @@ void aclk_send_alarm_health_log(char *node_id)
struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
if (likely(!wc)) {
- rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- rrd_unlock();
if (likely(host))
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
}
@@ -460,9 +467,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
RRDHOST *host = wc->host;
if (unlikely(!host)) {
- rrd_rdlock();
host = find_host_by_node_id(wc->node_id);
- rrd_unlock();
if (unlikely(!host)) {
log_access(
@@ -500,7 +505,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
last_timestamp.tv_sec = 0;
last_timestamp.tv_usec = 0;
- while (sqlite3_step(res) == SQLITE_ROW) {
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
if (sqlite3_column_bytes(res, 1) > 0) {
first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
@@ -536,8 +541,6 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
freez(claim_id);
buffer_free(sql);
-
- aclk_alert_reloaded = 1;
#endif
return;
@@ -554,7 +557,7 @@ void aclk_send_alarm_configuration(char *config_hash)
return;
}
- log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -603,7 +606,7 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
struct provide_alarm_configuration p_alarm_config;
p_alarm_config.cfg_hash = NULL;
- if (sqlite3_step(res) == SQLITE_ROW) {
+ if (sqlite3_step_monitored(res) == SQLITE_ROW) {
alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
@@ -664,14 +667,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
}
if (likely(p_alarm_config.cfg_hash)) {
- log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez((char *) cmd.data_param);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
else
- log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
bind_fail:
rc = sqlite3_finalize(res);
@@ -697,9 +700,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- rrd_unlock();
if (likely(host)) {
wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
(struct aclk_database_worker_config *)host->dbsync_worker :
@@ -716,7 +717,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
if (likely(wc)) {
- log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id);
+ log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id);
__sync_synchronize();
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
@@ -736,15 +737,15 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
BUFFER *sql = buffer_create(1024);
- buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
- "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
+ buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+ "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
"where new_status = -2 and updated_by_id = 0 and unique_id not in " \
"(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
"on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
- log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
buffer_free(sql);
@@ -780,17 +781,15 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
if (likely(host))
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
- rrd_unlock();
if (likely(wc)) {
log_access(
"IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ wc->host ? rrdhost_hostname(wc->host) : "N/A",
snapshot_id,
sequence_id);
if (wc->alerts_snapshot_id == snapshot_id)
@@ -831,13 +830,13 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
#ifdef ENABLE_ACLK
void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
{
- char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN");
+ char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
char config_hash_id[GUID_LEN + 1];
uuid_unparse_lower(ae->config_hash_id, config_hash_id);
- alarm_log->chart = strdupz((char *)ae->chart);
- alarm_log->name = strdupz((char *)ae->name);
- alarm_log->family = strdupz((char *)ae->family);
+ alarm_log->chart = strdupz(ae_chart_name(ae));
+ alarm_log->name = strdupz(ae_name(ae));
+ alarm_log->family = strdupz(ae_family(ae));
alarm_log->batch_id = 0;
alarm_log->sequence_id = 0;
@@ -846,9 +845,9 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->config_hash = strdupz((char *)config_hash_id);
alarm_log->utc_offset = host->utc_offset;
- alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
- alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
- alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)"");
+ alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
+ alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health_default_exec));
+ alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
alarm_log->command = strdupz((char *)edit_command);
@@ -861,31 +860,31 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->last_repeat = (time_t)ae->last_repeat;
alarm_log->silenced =
- ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ?
+ ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
1 :
0;
- alarm_log->value_string = strdupz(ae->new_value_string);
- alarm_log->old_value_string = strdupz(ae->old_value_string);
+ alarm_log->value_string = strdupz(ae_new_value_string(ae));
+ alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
- alarm_log->chart_context = ae->chart_context ? strdupz(ae->chart_context) : strdupz((char *)"");
+ alarm_log->rendered_info = strdupz(ae_info(ae));
+ alarm_log->chart_context = strdupz(ae_chart_context(ae));
freez(edit_command);
}
#endif
#ifdef ENABLE_ACLK
-static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
+static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
{
ALARM_ENTRY *ae = host->health_log.alarms;
while (ae) {
- if (ae->alarm_id == alarm_id && ae->unique_id > mark &&
+ if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
(ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
return 1;
ae = ae->next;
@@ -905,7 +904,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
UNUSED(cmd);
// we perhaps we don't need this for snapshots
if (unlikely(!wc->alert_updates)) {
- log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
return;
}
@@ -921,7 +920,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
if (unlikely(!claim_id))
return;
- log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
+ log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id);
aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
@@ -1025,11 +1024,11 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
BUFFER *sql = buffer_create(1024);
- buffer_sprintf(sql,"delete from aclk_alert_%s where alert_unique_id not in "
+ buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in "
" (select unique_id from health_log_%s); ", uuid_str, uuid_str);
char *err_msg = NULL;
- int rc = sqlite3_exec(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
+ int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""",
uuid_str, err_msg);
@@ -1064,7 +1063,7 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert
return 1;
}
- while (sqlite3_step(res) == SQLITE_ROW) {
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;