summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_alert.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r--database/sqlite/sqlite_aclk_alert.c128
1 files changed, 99 insertions, 29 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 238b500a8..54e8be4a7 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -127,7 +127,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
int rc;
if (unlikely(!wc->alert_updates)) {
- log_access("AC [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@@ -135,6 +135,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
if (unlikely(!claim_id))
return;
+ if (unlikely(!wc->host)) {
+ freez(claim_id);
+ return;
+ }
+
BUFFER *sql = buffer_create(1024);
if (wc->alerts_start_seq_id != 0) {
@@ -242,7 +247,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18));
+ alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
+ strdupz((char *)"") :
+ strdupz((char *)sqlite3_column_text(res, 18));
aclk_send_alarm_log_entry(&alarm_log);
@@ -267,7 +274,13 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
db_execute(buffer_tostring(sql));
} else {
if (log_first_sequence_id)
- log_access("OG [%s (%s)]: Sent alert events, first sequence_id %"PRIu64", last sequence_id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", log_first_sequence_id, log_last_sequence_id);
+ log_access(
+ "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ log_first_sequence_id,
+ log_last_sequence_id,
+ wc->alerts_batch_id);
log_first_sequence_id = 0;
log_last_sequence_id = 0;
}
@@ -288,23 +301,32 @@ void aclk_send_alarm_health_log(char *node_id)
if (unlikely(!node_id))
return;
- log_access("IN [%s (N/A)]: Request to send alarm health log.", node_id);
+ char *hostname= NULL;
struct aclk_database_worker_config *wc = NULL;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
+ if (likely(host)) {
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ hostname = host->hostname;
+ }
+ else
+ hostname = get_hostname_by_node_id(node_id);
rrd_unlock();
+
+ log_access("ACLK REQ [%s (%s)]: HEALTH LOG request received", node_id, hostname ? hostname : "N/A");
+ if (unlikely(!host))
+ freez(hostname);
+
if (wc)
aclk_database_enq_cmd(wc, &cmd);
else {
if (aclk_worker_enq_cmd(node_id, &cmd))
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
}
return;
}
@@ -323,7 +345,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
RRDHOST *host = wc->host;
if (unlikely(!host)) {
- rrd_wrlock();
+ rrd_rdlock();
host = find_host_by_node_id(wc->node_id);
rrd_unlock();
@@ -391,7 +413,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
wc->alert_sequence_id = last_sequence;
aclk_send_alarm_log_health(&alarm_log);
- log_access("OG [%s (%s)]: Alarm health log sent, first sequence id %"PRIu64", last sequence id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
+ log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
@@ -415,7 +437,7 @@ void aclk_send_alarm_configuration(char *config_hash)
return;
}
- log_access("IN [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -525,14 +547,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
}
if (likely(p_alarm_config.cfg_hash)) {
- log_access("OG [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez((char *) cmd.data_param);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
else
- log_access("AC [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
bind_fail:
rc = sqlite3_finalize(res);
@@ -552,28 +574,30 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
if (unlikely(!node_id))
return;
- log_access("IN [%s (N/A)]: Start streaming alerts with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, batch_id, start_seq_id);
+ //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
+ rrd_unlock();
+ if (likely(host)) {
wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
(struct aclk_database_worker_config *)host->dbsync_worker :
(struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
- rrd_unlock();
- if (unlikely(!host->health_enabled)) {
- log_access("AC [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
- return;
- }
+ if (unlikely(!host->health_enabled)) {
+ log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ return;
+ }
+ } else
+ wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
if (likely(wc)) {
- log_access("AC [%s (%s)]: Start streaming alerts enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, wc->host ? wc->host->hostname : "N/A", batch_id, start_seq_id);
+ log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id);
__sync_synchronize();
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
@@ -581,7 +605,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
__sync_synchronize();
}
else
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
@@ -607,7 +631,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
db_execute(buffer_tostring(sql));
- log_access("AC [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_free(sql);
#endif
@@ -644,7 +668,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
if (likely(host))
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
@@ -657,6 +681,8 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
wc->host ? wc->host->hostname : "N/A",
snapshot_id,
sequence_id);
+ if (wc->alerts_snapshot_id == snapshot_id)
+ return;
__sync_synchronize();
wc->alerts_snapshot_id = snapshot_id;
wc->alerts_ack_sequence_id = sequence_id;
@@ -669,7 +695,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
} else
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
UNUSED(snapshot_id);
@@ -714,7 +740,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->utc_offset = host->utc_offset;
alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
- alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : "";
+ alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)"");
alarm_log->command = strdupz((char *)edit_command);
@@ -738,7 +764,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log->rendered_info = strdupz(ae->info);
+ alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
freez(edit_command);
}
@@ -770,7 +796,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
UNUSED(cmd);
// we perhaps we don't need this for snapshots
if (unlikely(!wc->alert_updates)) {
- log_access("AC [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@@ -779,11 +805,14 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
return;
}
+ if (unlikely(!wc->alerts_snapshot_id))
+ return;
+
char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;
- log_access("OG [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
+ log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
@@ -906,3 +935,44 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
UNUSED(host);
#endif
}
+
+int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
+{
+ int rc;
+ struct aclk_database_worker_config *wc = NULL;
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (!wc)
+ return 1;
+
+ proto_alert_status->alert_updates = wc->alert_updates;
+ proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
+
+ BUFFER *sql = buffer_create(1024);
+ sqlite3_stmt *res = NULL;
+
+ buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
+ "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to get alert log status from the database.");
+ buffer_free(sql);
+ return 1;
+ }
+
+ while (sqlite3_step(res) == SQLITE_ROW) {
+ proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
+ proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
+ proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
+ proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
+
+ buffer_free(sql);
+ return 0;
+}