diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-04-14 18:12:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-04-14 18:12:10 +0000 |
commit | b5321aff06d6ea8d730d62aec2ffd8e9271c1ffc (patch) | |
tree | 36c41e35994786456154f9d3bf88c324763aeea4 /database/sqlite/sqlite_aclk_alert.c | |
parent | Adding upstream version 1.33.1. (diff) | |
download | netdata-upstream/1.34.0.tar.xz netdata-upstream/1.34.0.zip |
Adding upstream version 1.34.0.upstream/1.34.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 128 |
1 files changed, 99 insertions, 29 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 238b500a8..54e8be4a7 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -127,7 +127,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d int rc; if (unlikely(!wc->alert_updates)) { - log_access("AC [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } @@ -135,6 +135,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d if (unlikely(!claim_id)) return; + if (unlikely(!wc->host)) { + freez(claim_id); + return; + } + BUFFER *sql = buffer_create(1024); if (wc->alerts_start_seq_id != 0) { @@ -242,7 +247,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24); alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18)); + alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ? + strdupz((char *)"") : + strdupz((char *)sqlite3_column_text(res, 18)); aclk_send_alarm_log_entry(&alarm_log); @@ -267,7 +274,13 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d db_execute(buffer_tostring(sql)); } else { if (log_first_sequence_id) - log_access("OG [%s (%s)]: Sent alert events, first sequence_id %"PRIu64", last sequence_id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", log_first_sequence_id, log_last_sequence_id); + log_access( + "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, + wc->node_id, + wc->host ? wc->host->hostname : "N/A", + log_first_sequence_id, + log_last_sequence_id, + wc->alerts_batch_id); log_first_sequence_id = 0; log_last_sequence_id = 0; } @@ -288,23 +301,32 @@ void aclk_send_alarm_health_log(char *node_id) if (unlikely(!node_id)) return; - log_access("IN [%s (N/A)]: Request to send alarm health log.", node_id); + char *hostname= NULL; struct aclk_database_worker_config *wc = NULL; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG; - rrd_wrlock(); + rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) + if (likely(host)) { wc = (struct aclk_database_worker_config *)host->dbsync_worker; + hostname = host->hostname; + } + else + hostname = get_hostname_by_node_id(node_id); rrd_unlock(); + + log_access("ACLK REQ [%s (%s)]: HEALTH LOG request received", node_id, hostname ? hostname : "N/A"); + if (unlikely(!host)) + freez(hostname); + if (wc) aclk_database_enq_cmd(wc, &cmd); else { if (aclk_worker_enq_cmd(node_id, &cmd)) - log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); } return; } @@ -323,7 +345,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a RRDHOST *host = wc->host; if (unlikely(!host)) { - rrd_wrlock(); + rrd_rdlock(); host = find_host_by_node_id(wc->node_id); rrd_unlock(); @@ -391,7 +413,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a wc->alert_sequence_id = last_sequence; aclk_send_alarm_log_health(&alarm_log); - log_access("OG [%s (%s)]: Alarm health log sent, first sequence id %"PRIu64", last sequence id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence); + log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence); rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) @@ -415,7 +437,7 @@ void aclk_send_alarm_configuration(char *config_hash) return; } - log_access("IN [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -525,14 +547,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct } if (likely(p_alarm_config.cfg_hash)) { - log_access("OG [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else - log_access("AC [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); bind_fail: rc = sqlite3_finalize(res); @@ -552,28 +574,30 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start if (unlikely(!node_id)) return; - log_access("IN [%s (N/A)]: Start streaming alerts with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, batch_id, start_seq_id); + //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id); uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) return; struct aclk_database_worker_config *wc = NULL; - rrd_wrlock(); + rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) + rrd_unlock(); + if (likely(host)) { wc = (struct aclk_database_worker_config *)host->dbsync_worker ? (struct aclk_database_worker_config *)host->dbsync_worker : (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - rrd_unlock(); - if (unlikely(!host->health_enabled)) { - log_access("AC [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); - return; - } + if (unlikely(!host->health_enabled)) { + log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + return; + } + } else + wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); if (likely(wc)) { - log_access("AC [%s (%s)]: Start streaming alerts enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, wc->host ? wc->host->hostname : "N/A", batch_id, start_seq_id); + log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id); __sync_synchronize(); wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; @@ -581,7 +605,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start __sync_synchronize(); } else - log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); #else UNUSED(node_id); @@ -607,7 +631,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config db_execute(buffer_tostring(sql)); - log_access("AC [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); buffer_free(sql); #endif @@ -644,7 +668,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn return; struct aclk_database_worker_config *wc = NULL; - rrd_wrlock(); + rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); if (likely(host)) wc = (struct aclk_database_worker_config *)host->dbsync_worker; @@ -657,6 +681,8 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn wc->host ? wc->host->hostname : "N/A", snapshot_id, sequence_id); + if (wc->alerts_snapshot_id == snapshot_id) + return; __sync_synchronize(); wc->alerts_snapshot_id = snapshot_id; wc->alerts_ack_sequence_id = sequence_id; @@ -669,7 +695,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn cmd.completion = NULL; aclk_database_enq_cmd(wc, &cmd); } else - log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); #else UNUSED(node_id); UNUSED(snapshot_id); @@ -714,7 +740,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->utc_offset = host->utc_offset; alarm_log->timezone = strdupz((char *)host->abbrev_timezone); alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec); - alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : ""; + alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)""); alarm_log->command = strdupz((char *)edit_command); @@ -738,7 +764,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0; alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log->rendered_info = strdupz(ae->info); + alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)""); freez(edit_command); } @@ -770,7 +796,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru UNUSED(cmd); // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - log_access("AC [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } @@ -779,11 +805,14 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru return; } + if (unlikely(!wc->alerts_snapshot_id)) + return; + char *claim_id = is_agent_claimed(); if (unlikely(!claim_id)) return; - log_access("OG [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id); + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id); aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); @@ -906,3 +935,44 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) UNUSED(host); #endif } + +int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) +{ + int rc; + struct aclk_database_worker_config *wc = NULL; + wc = (struct aclk_database_worker_config *)host->dbsync_worker; + if (!wc) + return 1; + + proto_alert_status->alert_updates = wc->alert_updates; + proto_alert_status->alerts_batch_id = wc->alerts_batch_id; + + BUFFER *sql = buffer_create(1024); + sqlite3_stmt *res = NULL; + + buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \ + "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \ + "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \ + "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + + rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to get alert log status from the database."); + buffer_free(sql); + return 1; + } + + while (sqlite3_step(res) == SQLITE_ROW) { + proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; + proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; + proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; + proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0; + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc); + + buffer_free(sql); + return 0; +} |