diff options
Diffstat (limited to '')
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 117 |
1 files changed, 58 insertions, 59 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index ea1cc9fea..47663a8d1 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -24,7 +24,7 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return 0; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { when = (time_t) sqlite3_column_int64(res, 0); } @@ -36,7 +36,14 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return when; } -#define MAX_REMOVED_PERIOD 900 +void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) { + char sql[ACLK_SYNC_QUERY_SIZE]; + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id); + sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); + ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; +} + +#define MAX_REMOVED_PERIOD 86400 //decide if some events should be sent or not int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { @@ -56,12 +63,13 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) uuid_t config_hash_id; RRDCALC_STATUS status; uint32_t unique_id; - + //get the previous sent event of this alarm_id + //base the search on the last filtered event snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ - where hl.unique_id = aa.alert_unique_id \ - and hl.alarm_id = %u and hl.unique_id <> %u \ - order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id); + where hl.unique_id = aa.filtered_alert_unique_id \ + and hl.alarm_id = %u \ + order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -70,7 +78,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) return send; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); if (sqlite3_column_type(res, 1) != SQLITE_NULL) @@ -93,8 +101,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) } //same status, same config - if (ae->new_status == RRDCALC_STATUS_CLEAR) { + if (ae->new_status == RRDCALC_STATUS_CLEAR || ae->new_status == RRDCALC_STATUS_UNDEFINED) { send = 0; + update_filtered(ae, unique_id, uuid_str); goto done; } @@ -107,6 +116,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) goto done; } else { send = 0; + update_filtered(ae, unique_id, uuid_str); goto done; } } @@ -130,6 +140,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) return 0; } + CHECK_SQLITE_CONNECTION(db_meta); + if (!skip_filter) { if (!should_send_to_cloud(host, ae)) { return 0; @@ -137,9 +149,6 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } int rc = 0; - - CHECK_SQLITE_CONNECTION(db_meta); - sqlite3_stmt *res_alert = NULL; char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); @@ -148,8 +157,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) buffer_sprintf( sql, - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) " - "VALUES (@alert_unique_id, unixepoch()) on conflict (alert_unique_id) do nothing; ", + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " + "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ", uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0); @@ -220,7 +229,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d int rc; if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } @@ -280,7 +289,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d static __thread uint64_t log_first_sequence_id = 0; static __thread uint64_t log_last_sequence_id = 0; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { struct alarm_log_entry alarm_log; char old_value_string[100 + 1]; char new_value_string[100 + 1]; @@ -300,9 +309,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.config_hash = strdupz((char *)uuid_str); alarm_log.utc_offset = wc->host->utc_offset; - alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone); + alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host)); alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : - strdupz((char *)wc->host->health_default_exec); + strdupz((char *)string2str(wc->host->health_default_exec)); alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16)); char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? @@ -374,7 +383,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + wc->host ? rrdhost_hostname(wc->host) : "N/A", log_first_sequence_id, log_last_sequence_id, wc->alerts_batch_id); @@ -401,8 +410,8 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host) BUFFER *sql = buffer_create(1024); buffer_sprintf(sql,"delete from aclk_alert_%s; " \ - "insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, unixepoch() from health_log_%s " \ + "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); @@ -424,9 +433,7 @@ void aclk_send_alarm_health_log(char *node_id) struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); if (likely(!wc)) { - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - rrd_unlock(); if (likely(host)) wc = (struct aclk_database_worker_config *)host->dbsync_worker; } @@ -460,9 +467,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a RRDHOST *host = wc->host; if (unlikely(!host)) { - rrd_rdlock(); host = find_host_by_node_id(wc->node_id); - rrd_unlock(); if (unlikely(!host)) { log_access( @@ -500,7 +505,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a last_timestamp.tv_sec = 0; last_timestamp.tv_usec = 0; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; if (sqlite3_column_bytes(res, 1) > 0) { first_timestamp.tv_sec = sqlite3_column_int64(res, 1); @@ -536,8 +541,6 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a freez(claim_id); buffer_free(sql); - - aclk_alert_reloaded = 1; #endif return; @@ -554,7 +557,7 @@ void aclk_send_alarm_configuration(char *config_hash) return; } - log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -603,7 +606,7 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct struct provide_alarm_configuration p_alarm_config; p_alarm_config.cfg_hash = NULL; - if (sqlite3_step(res) == SQLITE_ROW) { + if (sqlite3_step_monitored(res) == SQLITE_ROW) { alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL; alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL; @@ -664,14 +667,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct } if (likely(p_alarm_config.cfg_hash)) { - log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else - log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); bind_fail: rc = sqlite3_finalize(res); @@ -697,9 +700,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start return; struct aclk_database_worker_config *wc = NULL; - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - rrd_unlock(); if (likely(host)) { wc = (struct aclk_database_worker_config *)host->dbsync_worker ? (struct aclk_database_worker_config *)host->dbsync_worker : @@ -716,7 +717,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); if (likely(wc)) { - log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id); + log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id); __sync_synchronize(); wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; @@ -736,15 +737,15 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, unixepoch() from health_log_%s " \ + buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ "where new_status = -2 and updated_by_id = 0 and unique_id not in " \ "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \ "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_execute(buffer_tostring(sql)); - log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); buffer_free(sql); @@ -780,17 +781,15 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn return; struct aclk_database_worker_config *wc = NULL; - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); if (likely(host)) wc = (struct aclk_database_worker_config *)host->dbsync_worker; - rrd_unlock(); if (likely(wc)) { log_access( "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + wc->host ? rrdhost_hostname(wc->host) : "N/A", snapshot_id, sequence_id); if (wc->alerts_snapshot_id == snapshot_id) @@ -831,13 +830,13 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id) #ifdef ENABLE_ACLK void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host) { - char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN"); + char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN"); char config_hash_id[GUID_LEN + 1]; uuid_unparse_lower(ae->config_hash_id, config_hash_id); - alarm_log->chart = strdupz((char *)ae->chart); - alarm_log->name = strdupz((char *)ae->name); - alarm_log->family = strdupz((char *)ae->family); + alarm_log->chart = strdupz(ae_chart_name(ae)); + alarm_log->name = strdupz(ae_name(ae)); + alarm_log->family = strdupz(ae_family(ae)); alarm_log->batch_id = 0; alarm_log->sequence_id = 0; @@ -846,9 +845,9 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->config_hash = strdupz((char *)config_hash_id); alarm_log->utc_offset = host->utc_offset; - alarm_log->timezone = strdupz((char *)host->abbrev_timezone); - alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec); - alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)""); + alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host)); + alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health_default_exec)); + alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)""); alarm_log->command = strdupz((char *)edit_command); @@ -861,31 +860,31 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->last_repeat = (time_t)ae->last_repeat; alarm_log->silenced = - ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ? + ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ? 1 : 0; - alarm_log->value_string = strdupz(ae->new_value_string); - alarm_log->old_value_string = strdupz(ae->old_value_string); + alarm_log->value_string = strdupz(ae_new_value_string(ae)); + alarm_log->old_value_string = strdupz(ae_old_value_string(ae)); alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0; alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0; alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)""); - alarm_log->chart_context = ae->chart_context ? strdupz(ae->chart_context) : strdupz((char *)""); + alarm_log->rendered_info = strdupz(ae_info(ae)); + alarm_log->chart_context = strdupz(ae_chart_context(ae)); freez(edit_command); } #endif #ifdef ENABLE_ACLK -static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark) +static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) { ALARM_ENTRY *ae = host->health_log.alarms; while (ae) { - if (ae->alarm_id == alarm_id && ae->unique_id > mark && + if (ae->alarm_id == alarm_id && ae->unique_id >mark && (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) return 1; ae = ae->next; @@ -905,7 +904,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru UNUSED(cmd); // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } @@ -921,7 +920,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (unlikely(!claim_id)) return; - log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id); + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id); aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); @@ -1025,11 +1024,11 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql,"delete from aclk_alert_%s where alert_unique_id not in " + buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in " " (select unique_id from health_log_%s); ", uuid_str, uuid_str); char *err_msg = NULL; - int rc = sqlite3_exec(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""", uuid_str, err_msg); @@ -1064,7 +1063,7 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert return 1; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; |