summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_alert.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_alert.c')
-rw-r--r--database/sqlite/sqlite_aclk_alert.c144
1 files changed, 51 insertions, 93 deletions
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 53c6c2a6..ea1cc9fe 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -3,7 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_alert.h"
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
#include "../../aclk/aclk_alarm_api.h"
#include "../../aclk/aclk.h"
#endif
@@ -123,21 +123,6 @@ done:
// and handle both cases
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
{
- //check aclk architecture and handle old json alarm update to cloud
- //include also the valid statuses for this case
-#ifdef ENABLE_ACLK
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!aclk_use_new_cloud_arch && aclk_connected) {
-#endif
-
- if ((ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) ||
- ((ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) {
- aclk_update_alarm(host, ae);
- }
-#endif
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- }
-
if (!claimed())
return 0;
@@ -164,7 +149,7 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
buffer_sprintf(
sql,
"INSERT INTO aclk_alert_%s (alert_unique_id, date_created) "
- "VALUES (@alert_unique_id, strftime('%%s')) on conflict (alert_unique_id) do nothing; ",
+ "VALUES (@alert_unique_id, unixepoch()) on conflict (alert_unique_id) do nothing; ",
uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
@@ -196,17 +181,11 @@ bind_fail:
buffer_free(sql);
return 0;
-#else
- UNUSED(host);
- UNUSED(ae);
- UNUSED(skip_filter);
-#endif
- return 0;
}
int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
switch(status) {
case RRDCALC_STATUS_REMOVED:
return ALARM_STATUS_REMOVED;
@@ -234,7 +213,7 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+#ifndef ENABLE_ACLK
UNUSED(wc);
UNUSED(cmd);
#else
@@ -245,7 +224,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
return;
}
- char *claim_id = is_agent_claimed();
+ char *claim_id = get_agent_claimid();
if (unlikely(!claim_id))
return;
@@ -260,9 +239,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
buffer_sprintf(
sql,
"UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
- "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64
+ "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64
" and date_cloud_ack is null "
- "; UPDATE aclk_alert_%s SET date_submitted = strftime('%%s','now') WHERE sequence_id < %"PRIu64
+ "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64
" and date_submitted is null",
wc->uuid_str,
wc->alerts_start_seq_id,
@@ -282,7 +261,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, \
hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, \
hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, \
- hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat \
+ hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat, hl.chart_context \
from health_log_%s hl, aclk_alert_%s aa \
where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null \
order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
@@ -357,14 +336,18 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
strdupz((char *)format_value_and_unit(
old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
- alarm_log.value = (calculated_number) sqlite3_column_double(res, 23);
- alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
+ alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
+ alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 24);
alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
strdupz((char *)"") :
strdupz((char *)sqlite3_column_text(res, 18));
+ alarm_log.chart_context = sqlite3_column_type(res, 26) == SQLITE_NULL ?
+ strdupz((char *)"") :
+ strdupz((char *)sqlite3_column_text(res, 26));
+
aclk_send_alarm_log_entry(&alarm_log);
if (first_sequence_id == 0)
@@ -382,7 +365,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
if (first_sequence_id) {
buffer_flush(sql);
- buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') "
+ buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
"WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
wc->uuid_str, first_sequence_id, last_sequence_id);
db_execute(buffer_tostring(sql));
@@ -417,10 +400,11 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
BUFFER *sql = buffer_create(1024);
- buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
- "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \
+ buffer_sprintf(sql,"delete from aclk_alert_%s; " \
+ "insert into aclk_alert_%s (alert_unique_id, date_created) " \
+ "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
"where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
- "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str);
+ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
db_execute(buffer_tostring(sql));
@@ -437,45 +421,40 @@ void aclk_send_alarm_health_log(char *node_id)
if (unlikely(!node_id))
return;
- char *hostname= NULL;
+ struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
- struct aclk_database_worker_config *wc = NULL;
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
+ if (likely(!wc)) {
+ rrd_rdlock();
+ RRDHOST *host = find_host_by_node_id(node_id);
+ rrd_unlock();
+ if (likely(host))
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ }
- rrd_rdlock();
- RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host)) {
- wc = (struct aclk_database_worker_config *)host->dbsync_worker;
- hostname = host->hostname;
+ if (!wc) {
+ log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id);
+ return;
}
- else
- hostname = get_hostname_by_node_id(node_id);
- rrd_unlock();
- log_access("ACLK REQ [%s (%s)]: HEALTH LOG request received", node_id, hostname ? hostname : "N/A");
- if (unlikely(!host))
- freez(hostname);
+ log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A");
- if (wc)
- aclk_database_enq_cmd(wc, &cmd);
- else {
- if (aclk_worker_enq_cmd(node_id, &cmd))
- log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
- }
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
+
+ aclk_database_enq_cmd(wc, &cmd);
return;
}
void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+#ifndef ENABLE_ACLK
UNUSED(wc);
#else
int rc;
- char *claim_id = is_agent_claimed();
+ char *claim_id = get_agent_claimid();
if (unlikely(!claim_id))
return;
@@ -549,7 +528,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
wc->alert_sequence_id = last_sequence;
aclk_send_alarm_log_health(&alarm_log);
- log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
+ log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
@@ -595,7 +574,7 @@ void aclk_send_alarm_configuration(char *config_hash)
int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(wc);
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+#ifndef ENABLE_ACLK
UNUSED(cmd);
#else
int rc = 0;
@@ -708,7 +687,6 @@ bind_fail:
// Start streaming alerts
void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
@@ -749,24 +727,17 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
else
log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
-#else
- UNUSED(node_id);
- UNUSED(start_seq_id);
- UNUSED(batch_id);
-#endif
return;
}
void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
- UNUSED(wc);
-#else
+
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
- "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \
+ "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
"where new_status = -2 and updated_by_id = 0 and unique_id not in " \
"(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
"on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
@@ -778,13 +749,11 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
buffer_free(sql);
wc->pause_alert_updates = 0;
-#endif
return;
}
void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!host->dbsync_worker))
return;
@@ -798,15 +767,11 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
cmd.data_param = NULL;
cmd.completion = NULL;
aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
-#else
- UNUSED(host);
-#endif
}
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
{
UNUSED(claim_id);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
@@ -843,11 +808,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
aclk_database_enq_cmd(wc, &cmd);
} else
log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
-#else
- UNUSED(node_id);
- UNUSED(snapshot_id);
- UNUSED(sequence_id);
-#endif
+
return;
}
@@ -858,7 +819,7 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
if (alerts_ack_sequence_id != 0) {
buffer_sprintf(
sql,
- "UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id <= %" PRIu64 "",
+ "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "",
uuid_str,
alerts_ack_sequence_id);
db_execute(buffer_tostring(sql));
@@ -867,7 +828,7 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
buffer_free(sql);
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
{
char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN");
@@ -907,17 +868,18 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->value_string = strdupz(ae->new_value_string);
alarm_log->old_value_string = strdupz(ae->old_value_string);
- alarm_log->value = (!isnan(ae->new_value)) ? (calculated_number)ae->new_value : 0;
- alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
+ alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
+ alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
+ alarm_log->chart_context = ae->chart_context ? strdupz(ae->chart_context) : strdupz((char *)"");
freez(edit_command);
}
#endif
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
{
ALARM_ENTRY *ae = host->health_log.alarms;
@@ -936,7 +898,7 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
#define ALARM_EVENTS_PER_CHUNK 10
void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+#ifndef ENABLE_ACLK
UNUSED(wc);
UNUSED(cmd);
#else
@@ -955,7 +917,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
if (unlikely(!wc->alerts_snapshot_id))
return;
- char *claim_id = is_agent_claimed();
+ char *claim_id = get_agent_claimid();
if (unlikely(!claim_id))
return;
@@ -1055,7 +1017,6 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!claimed())
return;
@@ -1075,9 +1036,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
sqlite3_free(err_msg);
}
buffer_free(sql);
-#else
- UNUSED(host);
-#endif
}
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)