summaryrefslogtreecommitdiffstats
path: root/src/database
diff options
context:
space:
mode:
Diffstat (limited to 'src/database')
-rw-r--r--src/database/engine/rrdengine.c2
-rw-r--r--src/database/rrd.h3
-rw-r--r--src/database/rrdhost.c16
-rw-r--r--src/database/rrdlabels.c38
-rw-r--r--src/database/sqlite/sqlite_aclk.c228
-rw-r--r--src/database/sqlite/sqlite_aclk.h44
-rw-r--r--src/database/sqlite/sqlite_aclk_alert.c1269
-rw-r--r--src/database/sqlite/sqlite_aclk_alert.h24
-rw-r--r--src/database/sqlite/sqlite_aclk_node.c8
-rw-r--r--src/database/sqlite/sqlite_context.c2
-rw-r--r--src/database/sqlite/sqlite_db_migration.c2
-rw-r--r--src/database/sqlite/sqlite_functions.c12
-rw-r--r--src/database/sqlite/sqlite_health.c263
-rw-r--r--src/database/sqlite/sqlite_health.h7
-rw-r--r--src/database/sqlite/sqlite_metadata.c47
15 files changed, 915 insertions, 1050 deletions
diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c
index 2d6583ead..a989877fc 100644
--- a/src/database/engine/rrdengine.c
+++ b/src/database/engine/rrdengine.c
@@ -1517,7 +1517,7 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
break;
}
- errno = 0;
+ errno_clear();
if(count)
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"DBENGINE: journal indexing done; %u files processed",
diff --git a/src/database/rrd.h b/src/database/rrd.h
index 097e25025..bd31e21e1 100644
--- a/src/database/rrd.h
+++ b/src/database/rrd.h
@@ -1043,7 +1043,6 @@ struct alarm_entry {
STRING *recipient;
time_t exec_run_timestamp;
int exec_code;
- uint64_t exec_spawn_serial;
STRING *source;
STRING *units;
@@ -1069,6 +1068,8 @@ struct alarm_entry {
time_t last_repeat;
+ POPEN_INSTANCE *popen_instance;
+
struct alarm_entry *next;
struct alarm_entry *next_in_progress;
struct alarm_entry *prev_in_progress;
diff --git a/src/database/rrdhost.c b/src/database/rrdhost.c
index 6bf2c2551..b3d786cff 100644
--- a/src/database/rrdhost.c
+++ b/src/database/rrdhost.c
@@ -935,7 +935,13 @@ void dbengine_init(char *hostname) {
config_set_number(CONFIG_SECTION_DB, "dbengine tier 0 disk space MB", default_multidb_disk_quota_mb);
}
+#ifdef OS_WINDOWS
+ // FIXME: for whatever reason joining the initialization threads
+ // fails on Windows.
+ bool parallel_initialization = false;
+#else
bool parallel_initialization = (storage_tiers <= (size_t)get_netdata_cpus()) ? true : false;
+#endif
struct dbengine_initialization tiers_init[RRD_STORAGE_TIERS] = {};
@@ -1488,18 +1494,16 @@ static void rrdhost_load_kubernetes_labels(void) {
return;
}
- pid_t pid;
- FILE *fp_child_input;
- FILE *fp_child_output = netdata_popen(label_script, &pid, &fp_child_input);
- if(!fp_child_output) return;
+ POPEN_INSTANCE *instance = spawn_popen_run(label_script);
+ if(!instance) return;
char buffer[1000 + 1];
- while (fgets(buffer, 1000, fp_child_output) != NULL)
+ while (fgets(buffer, 1000, instance->child_stdout_fp) != NULL)
rrdlabels_add_pair(localhost->rrdlabels, buffer, RRDLABEL_SRC_AUTO|RRDLABEL_SRC_K8S);
// Non-zero exit code means that all the script output is error messages. We've shown already any message that didn't include a ':'
// Here we'll inform with an ERROR that the script failed, show whatever (if anything) was added to the list of labels, free the memory and set the return to null
- int rc = netdata_pclose(fp_child_input, fp_child_output, pid);
+ int rc = spawn_popen_wait(instance);
if(rc)
nd_log(NDLS_DAEMON, NDLP_ERR,
"%s exited abnormally. Failed to get kubernetes labels.",
diff --git a/src/database/rrdlabels.c b/src/database/rrdlabels.c
index b82fa76d2..65e2dc9e4 100644
--- a/src/database/rrdlabels.c
+++ b/src/database/rrdlabels.c
@@ -412,32 +412,6 @@ __attribute__((constructor)) void initialize_labels_keys_char_map(void) {
label_names_char_map[i] = label_values_char_map[i];
// apply overrides to the label names map
- label_names_char_map['A'] = 'a';
- label_names_char_map['B'] = 'b';
- label_names_char_map['C'] = 'c';
- label_names_char_map['D'] = 'd';
- label_names_char_map['E'] = 'e';
- label_names_char_map['F'] = 'f';
- label_names_char_map['G'] = 'g';
- label_names_char_map['H'] = 'h';
- label_names_char_map['I'] = 'i';
- label_names_char_map['J'] = 'j';
- label_names_char_map['K'] = 'k';
- label_names_char_map['L'] = 'l';
- label_names_char_map['M'] = 'm';
- label_names_char_map['N'] = 'n';
- label_names_char_map['O'] = 'o';
- label_names_char_map['P'] = 'p';
- label_names_char_map['Q'] = 'q';
- label_names_char_map['R'] = 'r';
- label_names_char_map['S'] = 's';
- label_names_char_map['T'] = 't';
- label_names_char_map['U'] = 'u';
- label_names_char_map['V'] = 'v';
- label_names_char_map['W'] = 'w';
- label_names_char_map['X'] = 'x';
- label_names_char_map['Y'] = 'y';
- label_names_char_map['Z'] = 'z';
label_names_char_map['='] = '_';
label_names_char_map[':'] = '_';
label_names_char_map['+'] = '_';
@@ -1652,13 +1626,13 @@ static int rrdlabels_unittest_add_pairs() {
errors += rrdlabels_unittest_add_a_pair("\"tag=1\": country:\"Gre\\\"ece\"", "tag_1", "country:Gre_ece");
errors += rrdlabels_unittest_add_a_pair("\"tag=1\" = country:\"Gre\\\"ece\"", "tag_1", "country:Gre_ece");
- errors += rrdlabels_unittest_add_a_pair("\t'LABE=L'\t=\t\"World\" peace", "labe_l", "World peace");
- errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry:\"World\":\"Europe\":\"Greece\"", "la_b_el", "country:World:Europe:Greece");
- errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry\\\"World\"\\\"Europe\"\\\"Greece\"", "la_b_el", "country/World/Europe/Greece");
+ errors += rrdlabels_unittest_add_a_pair("\t'LABE=L'\t=\t\"World\" peace", "LABE_L", "World peace");
+ errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry:\"World\":\"Europe\":\"Greece\"", "LA_B_EL", "country:World:Europe:Greece");
+ errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry\\\"World\"\\\"Europe\"\\\"Greece\"", "LA_B_EL", "country/World/Europe/Greece");
- errors += rrdlabels_unittest_add_a_pair("NAME=\"VALUE\"", "name", "VALUE");
- errors += rrdlabels_unittest_add_a_pair("\"NAME\" : \"VALUE\"", "name", "VALUE");
- errors += rrdlabels_unittest_add_a_pair("NAME: \"VALUE\"", "name", "VALUE");
+ errors += rrdlabels_unittest_add_a_pair("NAME=\"VALUE\"", "NAME", "VALUE");
+ errors += rrdlabels_unittest_add_a_pair("\"NAME\" : \"VALUE\"", "NAME", "VALUE");
+ errors += rrdlabels_unittest_add_a_pair("NAME: \"VALUE\"", "NAME", "VALUE");
return errors;
}
diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c
index 8dc2231b4..027ee8f93 100644
--- a/src/database/sqlite/sqlite_aclk.c
+++ b/src/database/sqlite/sqlite_aclk.c
@@ -9,7 +9,6 @@ struct aclk_sync_config_s {
uv_thread_t thread;
uv_loop_t loop;
uv_timer_t timer_req;
- time_t cleanup_after; // Start a cleanup after this timestamp
uv_async_t async;
bool initialized;
SPINLOCK cmd_queue_lock;
@@ -24,7 +23,7 @@ void sanity_check(void) {
#ifdef ENABLE_ACLK
static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- struct aclk_database_cmd ret;
+ struct aclk_database_cmd ret = { 0 };
spinlock_lock(&aclk_sync_config.cmd_queue_lock);
if(aclk_sync_config.cmd_base) {
@@ -35,7 +34,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void)
}
else {
ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
}
spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
@@ -176,70 +174,24 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
#ifdef ENABLE_ACLK
-#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id"
-static int is_host_available(nd_uuid_t *host_id)
-{
- sqlite3_stmt *res = NULL;
- int rc = 0;
-
- if (!REQUIRE_DB(db_meta))
- return 1;
-
- if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_BY_UUID, &res))
- return 1;
-
- int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC));
-
- param = 0;
- rc = sqlite3_step_monitored(res);
-
-done:
- REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
- return (rc == SQLITE_ROW);
-}
+#define SQL_SELECT_ACLK_ALERT_TABLES \
+ "SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')"
-// OPCODE: ACLK_DATABASE_DELETE_HOST
-static void sql_delete_aclk_table_list(char *host_guid)
+static void sql_delete_aclk_table_list(void)
{
- char uuid_str[UUID_STR_LEN];
- char host_str[UUID_STR_LEN];
-
- int rc;
- nd_uuid_t host_uuid;
-
- if (unlikely(!host_guid))
- return;
-
- rc = uuid_parse(host_guid, host_uuid);
- freez(host_guid);
- if (rc)
- return;
-
- uuid_unparse_lower(host_uuid, host_str);
- uuid_unparse_lower_fix(&host_uuid, uuid_str);
-
- if (is_host_available(&host_uuid))
- return;
-
sqlite3_stmt *res = NULL;
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
- buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str);
+ BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, NULL);
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res))
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ACLK_ALERT_TABLES, &res))
goto fail;
- buffer_flush(sql);
-
while (sqlite3_step_monitored(res) == SQLITE_ROW)
buffer_strcat(sql, (char *) sqlite3_column_text(res, 0));
SQLITE_FINALIZE(res);
- rc = db_execute(db_meta, buffer_tostring(sql));
+ int rc = db_execute(db_meta, buffer_tostring(sql));
if (unlikely(rc))
netdata_log_error("Failed to drop unused ACLK tables");
@@ -285,53 +237,6 @@ skip:
freez(machine_guid);
}
-
-static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused)
-{
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_DELETE_HOST;
- cmd.param[0] = strdupz((char *) argv[0]);
- aclk_database_enq_cmd(&cmd);
- return 0;
-}
-
-#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')"
-
-static void sql_check_aclk_table_list(void)
-{
- char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
-}
-
-#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d"
-
-static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
-{
- char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
- if (unlikely(db_execute(db_meta, sql)))
- error_report("Failed to clean stale ACLK alert entries");
- return 0;
-}
-
-#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')"
-
-static void sql_maint_aclk_sync_database_all(void)
-{
- char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
-}
-
static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
char uuid_str[UUID_STR_LEN];
@@ -339,7 +244,7 @@ static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_un
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
if (host != localhost)
- sql_create_aclk_table(host, (nd_uuid_t *) argv[0], (nd_uuid_t *) argv[1]);
+ create_aclk_config(host, (nd_uuid_t *)argv[0], (nd_uuid_t *)argv[1]);
return 0;
}
@@ -356,16 +261,7 @@ static void timer_cb(uv_timer_t *handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
- struct aclk_sync_config_s *config = handle->data;
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
-
- if (config->cleanup_after < now_realtime_sec()) {
- cmd.opcode = ACLK_DATABASE_CLEANUP;
- aclk_database_enq_cmd(&cmd);
- config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
- }
-
+ struct aclk_database_cmd cmd = { 0 };
if (aclk_connected) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
aclk_database_enq_cmd(&cmd);
@@ -373,7 +269,7 @@ static void timer_cb(uv_timer_t *handle)
}
}
-static void aclk_synchronization(void *arg __maybe_unused)
+static void aclk_synchronization(void *arg)
{
struct aclk_sync_config_s *config = arg;
uv_thread_set_name_np("ACLKSYNC");
@@ -381,14 +277,9 @@ static void aclk_synchronization(void *arg __maybe_unused)
service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
- worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
- worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
- worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint");
- worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
- worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
uv_loop_t *loop = &config->loop;
@@ -401,9 +292,10 @@ static void aclk_synchronization(void *arg __maybe_unused)
netdata_log_info("Starting ACLK synchronization thread");
- config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST;
config->initialized = true;
+ sql_delete_aclk_table_list();
+
while (likely(service_running(SERVICE_ACLKSYNC))) {
enum aclk_database_opcode opcode;
worker_is_idle();
@@ -422,26 +314,17 @@ static void aclk_synchronization(void *arg __maybe_unused)
worker_is_busy(opcode);
switch (opcode) {
+ default:
case ACLK_DATABASE_NOOP:
/* the command queue was empty, do nothing */
break;
-// MAINTENANCE
- case ACLK_DATABASE_CLEANUP:
- // Scan all aclk_alert_ tables and cleanup as needed
- sql_maint_aclk_sync_database_all();
- sql_check_aclk_table_list();
- break;
-
- case ACLK_DATABASE_DELETE_HOST:
- sql_delete_aclk_table_list(cmd.param[0]);
- break;
// NODE STATE
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
struct aclk_sync_cfg_t *ahc = host->aclk_config;
if (unlikely(!ahc))
- sql_create_aclk_table(host, &host->host_uuid, host->node_id);
+ create_aclk_config(host, &host->host_uuid, host->node_id);
aclk_host_state_update(host, live, 1);
break;
case ACLK_DATABASE_NODE_UNREGISTER:
@@ -455,17 +338,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_PUSH_ALERT:
aclk_push_alert_events_for_all_hosts();
break;
- case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:;
- aclk_push_alert_snapshot_event(cmd.param[0]);
- break;
- case ACLK_DATABASE_QUEUE_REMOVED_ALERTS:
- sql_process_queue_removed_alerts_to_aclk(cmd.param[0]);
- break;
- default:
- break;
}
- if (cmd.completion)
- completion_mark_complete(cmd.completion);
} while (opcode != ACLK_DATABASE_NOOP);
}
@@ -489,39 +362,11 @@ static void aclk_synchronization_init(void)
// -------------------------------------------------------------
-void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused)
+void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
- char uuid_str[UUID_STR_LEN];
- char host_guid[UUID_STR_LEN];
- int rc;
-
- uuid_unparse_lower_fix(host_uuid, uuid_str);
- uuid_unparse_lower(*host_uuid, host_guid);
-
- char sql[ACLK_SYNC_QUERY_SIZE];
-
- snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid);
- else {
- snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report(
- "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid);
-
- snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report(
- "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
- }
- if (likely(host) && unlikely(host->aclk_config))
- return;
- if (unlikely(!host))
+ if (!host || host->aclk_config)
return;
struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t));
@@ -535,8 +380,7 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __
}
wc->host = host;
- strcpy(wc->uuid_str, uuid_str);
- wc->alert_updates = 0;
+ wc->stream_alerts = false;
time_t now = now_realtime_sec();
wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now;
#endif
@@ -579,7 +423,7 @@ void sql_aclk_sync_init(void)
if (!number_of_children)
aclk_queue_node_info(localhost, true);
- rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg);
+ rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES, aclk_config_parameters, NULL, &err_msg);
if (rc != SQLITE_OK) {
error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg);
@@ -591,15 +435,12 @@ void sql_aclk_sync_init(void)
#endif
}
-// Public
-
static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1)
{
struct aclk_database_cmd cmd;
cmd.opcode = opcode;
cmd.param[0] = (void *) param0;
cmd.param[1] = (void *) param1;
- cmd.completion = NULL;
aclk_database_enq_cmd(&cmd);
}
@@ -612,35 +453,12 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash)
queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash));
}
-void aclk_push_node_alert_snapshot(const char *node_id)
-{
- if (unlikely(!aclk_sync_config.initialized))
- return;
-
- queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL);
-}
-
-
-void aclk_push_node_removed_alerts(const char *node_id)
-{
- if (unlikely(!aclk_sync_config.initialized))
- return;
-
- queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL);
-}
-
void schedule_node_info_update(RRDHOST *host __maybe_unused)
{
#ifdef ENABLE_ACLK
if (unlikely(!host))
return;
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_NODE_STATE;
- cmd.param[0] = host;
- cmd.completion = NULL;
- aclk_database_enq_cmd(&cmd);
+ queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL);
#endif
}
@@ -649,12 +467,6 @@ void unregister_node(const char *machine_guid)
{
if (unlikely(!machine_guid))
return;
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER;
- cmd.param[0] = strdupz(machine_guid);
- cmd.completion = NULL;
- aclk_database_enq_cmd(&cmd);
+ queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL);
}
#endif
diff --git a/src/database/sqlite/sqlite_aclk.h b/src/database/sqlite/sqlite_aclk.h
index ce9fed840..ec8cfa9dd 100644
--- a/src/database/sqlite/sqlite_aclk.h
+++ b/src/database/sqlite/sqlite_aclk.h
@@ -3,21 +3,9 @@
#ifndef NETDATA_SQLITE_ACLK_H
#define NETDATA_SQLITE_ACLK_H
-#define ACLK_MAX_ALERT_UPDATES "5"
-#define ACLK_DATABASE_CLEANUP_FIRST (1200)
-#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
-#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
+#define ACLK_MAX_ALERT_UPDATES "50"
#define ACLK_SYNC_QUERY_SIZE 512
-static inline void uuid_unparse_lower_fix(nd_uuid_t *uuid, char *out)
-{
- uuid_unparse_lower(*uuid, out);
- out[8] = '_';
- out[13] = '_';
- out[18] = '_';
- out[23] = '_';
-}
-
static inline int uuid_parse_fix(char *in, nd_uuid_t uuid)
{
in[8] = '-';
@@ -32,25 +20,11 @@ static inline int claimed()
return localhost->aclk_state.claimed_id != NULL;
}
-#define TABLE_ACLK_ALERT \
- "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
- "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
- "UNIQUE(alert_unique_id))"
-
-#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id)"
-#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted)"
-
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
-
- ACLK_DATABASE_CLEANUP,
- ACLK_DATABASE_DELETE_HOST,
ACLK_DATABASE_NODE_STATE,
ACLK_DATABASE_PUSH_ALERT,
ACLK_DATABASE_PUSH_ALERT_CONFIG,
- ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
- ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,
- ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
ACLK_DATABASE_NODE_UNREGISTER,
ACLK_DATABASE_TIMER,
@@ -62,29 +36,25 @@ enum aclk_database_opcode {
struct aclk_database_cmd {
enum aclk_database_opcode opcode;
void *param[2];
- struct completion *completion;
struct aclk_database_cmd *prev, *next;
};
typedef struct aclk_sync_cfg_t {
RRDHOST *host;
- int alert_updates;
- int alert_checkpoint_req;
- int alert_queue_removed;
+ int8_t send_snapshot;
+ bool stream_alerts;
+ int alert_count;
+ int snapshot_count;
+ int checkpoint_count;
time_t node_info_send_time;
time_t node_collectors_send;
- char uuid_str[UUID_STR_LEN];
char node_id[UUID_STR_LEN];
char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
- uint64_t alerts_log_first_sequence_id;
- uint64_t alerts_log_last_sequence_id;
} aclk_sync_cfg_t;
-void sql_create_aclk_table(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id);
+void create_aclk_config(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id);
void sql_aclk_sync_init(void);
void aclk_push_alert_config(const char *node_id, const char *config_hash);
-void aclk_push_node_alert_snapshot(const char *node_id);
-void aclk_push_node_removed_alerts(const char *node_id);
void schedule_node_info_update(RRDHOST *host);
#ifdef ENABLE_ACLK
void unregister_node(const char *machine_guid);
diff --git a/src/database/sqlite/sqlite_aclk_alert.c b/src/database/sqlite/sqlite_aclk_alert.c
index 0982d32bd..3e7076169 100644
--- a/src/database/sqlite/sqlite_aclk_alert.c
+++ b/src/database/sqlite/sqlite_aclk_alert.c
@@ -5,7 +5,6 @@
#ifdef ENABLE_ACLK
#include "../../aclk/aclk_alarm_api.h"
-#endif
#define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \
({ \
@@ -13,33 +12,6 @@
sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
})
-#define SQL_UPDATE_FILTERED_ALERT \
- "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \
- "WHERE filtered_alert_unique_id = @old_alert"
-
-static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str)
-{
- sqlite3_stmt *res = NULL;
-
- char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str);
-
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
- return;
-
- int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, ae->unique_id));
- SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
-
- param = 0;
- if (likely(sqlite3_step_monitored(res) == SQLITE_DONE))
- ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
-
-done:
- REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
-}
-
#define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
"SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \
"WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
@@ -47,9 +19,9 @@ done:
static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uuid_t *host_id)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
- if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res))
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res))
return false;
bool ret = false;
@@ -63,115 +35,141 @@ static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uui
done:
REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
+ SQLITE_RESET(res);
return ret;
}
#define MAX_REMOVED_PERIOD 604800 //a week
-//decide if some events should be sent or not
-#define SQL_SELECT_ALERT_BY_ID \
- "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \
- "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \
- "AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \
- "ORDER BY hld.rowid DESC LIMIT 1"
+#define SQL_UPDATE_ALERT_VERSION_TRANSITION \
+ "UPDATE alert_version SET unique_id = @unique_id WHERE health_log_id = @health_log_id"
-static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
+static void update_alert_version_transition(int64_t health_log_id, int64_t unique_id)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
- if (ae->new_status == RRDCALC_STATUS_UNINITIALIZED ||
- (ae->new_status == RRDCALC_STATUS_REMOVED &&
- !(ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL)))
- return 0;
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION_TRANSITION, &res))
+ return;
- if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config))
- return 0;
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to update alert_version to latest transition");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
- char sql[ACLK_SYNC_QUERY_SIZE];
+//decide if some events should be sent or not
+
+#define SQL_SELECT_LAST_ALERT_STATUS "SELECT status FROM alert_version WHERE health_log_id = @health_log_id "
- //get the previous sent event of this alarm_id
- //base the search on the last filtered event
- snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str);
+static bool cloud_status_matches(int64_t health_log_id, RRDCALC_STATUS status)
+{
+ static __thread sqlite3_stmt *res = NULL;
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_LAST_ALERT_STATUS, &res))
return true;
bool send = false;
int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
- SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, (int) ae->alarm_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, health_log_id));
param = 0;
int rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
- nd_uuid_t config_hash_id;
- RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
-
- if (sqlite3_column_type(res, 1) != SQLITE_NULL)
- uuid_copy(config_hash_id, *((nd_uuid_t *)sqlite3_column_blob(res, 1)));
-
- int64_t unique_id = sqlite3_column_int64(res, 2);
-
- if (ae->new_status != (RRDCALC_STATUS)status || !uuid_eq(ae->config_hash_id, config_hash_id))
- send = true;
- else
- update_filtered(ae, unique_id, host->aclk_config->uuid_str);
- } else
- send = true;
+ RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
+ send = (current_status == status);
+ }
done:
REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
+ SQLITE_RESET(res);
return send;
}
#define SQL_QUEUE_ALERT_TO_CLOUD \
- "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
- "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING"
-
-void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
+ "INSERT INTO aclk_queue (host_id, health_log_id, unique_id, date_created)" \
+ " VALUES (@host_id, @health_log_id, @unique_id, UNIXEPOCH())" \
+ " ON CONFLICT(host_id, health_log_id) DO UPDATE SET unique_id=excluded.unique_id, " \
+ " date_created=excluded.date_created"
+
+//
+// Attempt to insert an alert to the submit queue to reach the cloud
+//
+// The alert will NOT be added in the submit queue if
+// - Cloud is already aware of the alert status
+// - The transition refers to a variable
+//
+static int insert_alert_to_submit_queue(RRDHOST *host, int64_t health_log_id, uint32_t unique_id, RRDCALC_STATUS status)
{
- sqlite3_stmt *res = NULL;
- char sql[ACLK_SYNC_QUERY_SIZE];
+ static __thread sqlite3_stmt *res = NULL;
- if (!service_running(SERVICE_ACLK))
- return;
+ if (cloud_status_matches(health_log_id, status)) {
+ update_alert_version_transition(health_log_id, unique_id);
+ return 1;
+ }
- if (!claimed() || ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
- return;
+ if (is_event_from_alert_variable_config(unique_id, &host->host_uuid))
+ return 2;
- if (false == skip_filter && !should_send_to_cloud(host, ae))
- return;
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_QUEUE_ALERT_TO_CLOUD, &res))
+ return -1;
- if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
- return;
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (int64_t) unique_id));
- snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str);
+ param = 0;
+ int rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to insert alert in the submit queue %"PRIu32", rc = %d", unique_id, rc);
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
- return;
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+ return 0;
+}
+
+#define SQL_DELETE_QUEUE_ALERT_TO_CLOUD \
+ "DELETE FROM aclk_queue WHERE host_id = @host_id AND sequence_id BETWEEN @seq1 AND @seq2"
+
+//
+// Delete a range of alerts from the submit queue (after being sent to the the cloud)
+//
+static int delete_alert_from_submit_queue(RRDHOST *host, int64_t first_seq_id, int64_t last_seq_id)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_QUEUE_ALERT_TO_CLOUD, &res))
+ return -1;
int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, ae->unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, first_seq_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, last_seq_id));
param = 0;
- int rc = execute_insert(res);
- if (unlikely(rc == SQLITE_DONE)) {
- ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
- rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- } else
- error_report("Failed to store alert event %"PRIu32", rc = %d", ae->unique_id, rc);
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to delete submitted to ACLK");
done:
REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
+ SQLITE_RESET(res);
+ return 0;
}
int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
{
-#ifdef ENABLE_ACLK
+
switch(status) {
case RRDCALC_STATUS_REMOVED:
return ALARM_STATUS_REMOVED;
@@ -191,10 +189,6 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
default:
return ALARM_STATUS_UNKNOWN;
}
-#else
- UNUSED(status);
- return 1;
-#endif
}
static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) {
@@ -219,245 +213,437 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
return strdupz(ret);
}
+#define SQL_UPDATE_ALERT_VERSION \
+ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted)" \
+ " VALUES (@health_log_id, @unique_id, @status, @version, UNIXEPOCH())" \
+ " ON CONFLICT(health_log_id) DO UPDATE SET status = excluded.status, version = excluded.version, " \
+ " unique_id=excluded.unique_id, date_submitted=excluded.date_submitted"
+
+//
+// Store a new alert transition along with the version after sending to the cloud
+// - Update an existing alert with the updated version, status, transition and date submitted
+//
+static void sql_update_alert_version(int64_t health_log_id, int64_t unique_id, RRDCALC_STATUS status, uint64_t version)
+{
+ static __thread sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION, &res))
+ return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, status));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, version));
+
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to execute sql_update_alert_version");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
-static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused)
+#define SQL_SELECT_ALERT_TO_DUMMY \
+ "SELECT aq.sequence_id, hld.unique_id, hld.when_key, hld.new_status, hld.health_log_id" \
+ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
+ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
+ " ORDER BY aq.sequence_id ASC"
+
+//
+// Check all queued alerts for a host and commit them as if they have been send to the cloud
+// this will produce new versions as needed. We need this because we are about to send a
+// a snapshot so we can include the latest transition.
+//
+static void commit_alert_events(RRDHOST *host)
{
-#ifdef ENABLE_ACLK
- int rc;
+ sqlite3_stmt *res = NULL;
- if (unlikely(!wc->alert_updates)) {
- nd_log(NDLS_ACCESS, NDLP_NOTICE,
- "ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.",
- wc->node_id,
- wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_DUMMY, &res))
return;
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ int64_t first_sequence_id = 0;
+ int64_t last_sequence_id = 0;
+
+ param = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+
+ last_sequence_id = sqlite3_column_int64(res, 0);
+ if (first_sequence_id == 0)
+ first_sequence_id = last_sequence_id;
+
+ int64_t unique_id = sqlite3_column_int(res, 1);
+ int64_t version = sqlite3_column_int64(res, 2);
+ RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 3);
+ int64_t health_log_id = sqlite3_column_int64(res, 4);
+
+ sql_update_alert_version(health_log_id, unique_id, status, version);
}
+ if (first_sequence_id)
+ delete_alert_from_submit_queue(host, first_sequence_id, last_sequence_id);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+}
+
+typedef enum {
+ SEQUENCE_ID,
+ UNIQUE_ID,
+ ALARM_ID,
+ CONFIG_HASH_ID,
+ UPDATED_BY_ID,
+ WHEN_KEY,
+ DURATION,
+ NON_CLEAR_DURATION,
+ FLAGS,
+ EXEC_RUN_TIMESTAMP,
+ DELAY_UP_TO_TIMESTAMP,
+ NAME,
+ CHART,
+ EXEC,
+ RECIPIENT,
+ SOURCE,
+ UNITS,
+ INFO,
+ EXEC_CODE,
+ NEW_STATUS,
+ OLD_STATUS,
+ DELAY,
+ NEW_VALUE,
+ OLD_VALUE,
+ LAST_REPEAT,
+ CHART_CONTEXT,
+ TRANSITION_ID,
+ ALARM_EVENT_ID,
+ CHART_NAME,
+ SUMMARY,
+ HEALTH_LOG_ID,
+ VERSION
+} HealthLogDetails;
+
+void health_alarm_log_populate(
+ struct alarm_log_entry *alarm_log,
+ sqlite3_stmt *res,
+ RRDHOST *host,
+ RRDCALC_STATUS *status)
+{
+ char old_value_string[100 + 1];
+ char new_value_string[100 + 1];
+
+ RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, NEW_STATUS);
+ if (status)
+ *status = current_status;
+
+ char *source = (char *) sqlite3_column_text(res, SOURCE);
+ alarm_log->command = source ? health_edit_command_from_source(source) : strdupz("UNKNOWN=0=UNKNOWN");
+
+ alarm_log->chart = strdupz((char *) sqlite3_column_text(res, CHART));
+ alarm_log->name = strdupz((char *) sqlite3_column_text(res, NAME));
+
+ alarm_log->when = sqlite3_column_int64(res, WHEN_KEY);
+
+ alarm_log->config_hash = sqlite3_uuid_unparse_strdupz(res, CONFIG_HASH_ID);
+
+ alarm_log->utc_offset = host->utc_offset;
+ alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
+ alarm_log->exec_path = sqlite3_column_bytes(res, EXEC) ?
+ strdupz((char *)sqlite3_column_text(res, EXEC)) :
+ strdupz((char *)string2str(host->health.health_default_exec));
+
+ alarm_log->conf_source = source ? strdupz(source) : strdupz("");
+
+ time_t duration = sqlite3_column_int64(res, DURATION);
+ alarm_log->duration = (duration > 0) ? duration : 0;
+
+ alarm_log->non_clear_duration = sqlite3_column_int64(res, NON_CLEAR_DURATION);
+
+ alarm_log->status = rrdcalc_status_to_proto_enum(current_status);
+ alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)sqlite3_column_int64(res, OLD_STATUS));
+ alarm_log->delay = sqlite3_column_int64(res, DELAY);
+ alarm_log->delay_up_to_timestamp = sqlite3_column_int64(res, DELAY_UP_TO_TIMESTAMP);
+ alarm_log->last_repeat = sqlite3_column_int64(res, LAST_REPEAT);
+
+ uint64_t flags = sqlite3_column_int64(res, FLAGS);
+ char *recipient = (char *) sqlite3_column_text(res, RECIPIENT);
+ alarm_log->silenced =
+ ((flags & HEALTH_ENTRY_FLAG_SILENCED) || (recipient && !strncmp(recipient, "silent", 6))) ? 1 : 0;
+
+ double value = sqlite3_column_double(res, NEW_VALUE);
+ double old_value = sqlite3_column_double(res, OLD_VALUE);
+
+ alarm_log->value_string =
+ sqlite3_column_type(res, NEW_VALUE) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ new_value_string, 100, value, (char *)sqlite3_column_text(res, UNITS), -1));
+
+ alarm_log->old_value_string =
+ sqlite3_column_type(res, OLD_VALUE) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ old_value_string, 100, old_value, (char *)sqlite3_column_text(res, UNITS), -1));
+
+ alarm_log->value = (!isnan(value)) ? (NETDATA_DOUBLE)value : 0;
+ alarm_log->old_value = (!isnan(old_value)) ? (NETDATA_DOUBLE)old_value : 0;
+
+ alarm_log->updated = (flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
+ alarm_log->rendered_info = sqlite3_text_strdupz_empty(res, INFO);
+ alarm_log->chart_context = sqlite3_text_strdupz_empty(res, CHART_CONTEXT);
+ alarm_log->chart_name = sqlite3_text_strdupz_empty(res, CHART_NAME);
+
+ alarm_log->transition_id = sqlite3_uuid_unparse_strdupz(res, TRANSITION_ID);
+ alarm_log->event_id = sqlite3_column_int64(res, ALARM_EVENT_ID);
+ alarm_log->version = sqlite3_column_int64(res, VERSION);
+
+ alarm_log->summary = sqlite3_text_strdupz_empty(res, SUMMARY);
+
+ alarm_log->health_log_id = sqlite3_column_int64(res, HEALTH_LOG_ID);
+ alarm_log->unique_id = sqlite3_column_int64(res, UNIQUE_ID);
+ alarm_log->alarm_id = sqlite3_column_int64(res, ALARM_ID);
+ alarm_log->sequence_id = sqlite3_column_int64(res, SEQUENCE_ID);
+}
+
+#define SQL_SELECT_ALERT_TO_PUSH \
+ "SELECT aq.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key," \
+ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name," \
+ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status," \
+ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id," \
+ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, hld.when_key" \
+ " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \
+ " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \
+ " ORDER BY aq.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES
+
+static void aclk_push_alert_event(RRDHOST *host __maybe_unused)
+{
+
char *claim_id = get_agent_claimid();
- if (unlikely(!claim_id))
+ if (!claim_id || !host->node_id)
return;
- if (unlikely(!wc->host)) {
+ sqlite3_stmt *res = NULL;
+
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_PUSH, &res)) {
freez(claim_id);
return;
}
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
-
- sqlite3_stmt *res = NULL;
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
- buffer_sprintf(
- sql,
- "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, "
- " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, "
- " hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, "
- " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, "
- " hld.alarm_event_id, hl.chart_name, hld.summary "
- " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld "
- " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL "
- " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id "
- " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES,
- wc->uuid_str);
-
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) {
-
- BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
- buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
-
- rc = db_execute(db_meta, buffer_tostring(sql_fix));
- if (unlikely(rc))
- error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
- buffer_free(sql_fix);
-
- // Try again
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) {
- buffer_free(sql);
- freez(claim_id);
- return;
- }
- }
+ char node_id_str[UUID_STR_LEN];
+ uuid_unparse_lower(*host->node_id, node_id_str);
- rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id for pushing alert event.");
- goto done;
- }
+ struct alarm_log_entry alarm_log;
+ alarm_log.node_id = node_id_str;
+ alarm_log.claim_id = claim_id;
- uint64_t first_sequence_id = 0;
- uint64_t last_sequence_id = 0;
+ int64_t first_id = 0;
+ int64_t last_id = 0;
+ param = 0;
+ RRDCALC_STATUS status;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
- struct alarm_log_entry alarm_log;
- char old_value_string[100 + 1];
- char new_value_string[100 + 1];
-
- alarm_log.node_id = wc->node_id;
- alarm_log.claim_id = claim_id;
- alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
- alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
- alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
- alarm_log.config_hash = sqlite3_uuid_unparse_strdupz(res, 3);
- alarm_log.utc_offset = wc->host->utc_offset;
- alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
- alarm_log.exec_path = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) :
- strdupz((char *)string2str(wc->host->health.health_default_exec));
- alarm_log.conf_source = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : strdupz("");
-
- char *edit_command = sqlite3_column_bytes(res, 15) > 0 ?
- health_edit_command_from_source((char *)sqlite3_column_text(res, 15)) :
- strdupz("UNKNOWN=0=UNKNOWN");
- alarm_log.command = strdupz(edit_command);
-
- time_t duration = (time_t) sqlite3_column_int64(res, 6);
- alarm_log.duration = (duration > 0) ? duration : 0;
- alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
- alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 19));
- alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
- alarm_log.delay = (int) sqlite3_column_int(res, 21);
- alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
- alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 24);
- alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
- (sqlite3_column_type(res, 14) != SQLITE_NULL &&
- !strncmp((char *)sqlite3_column_text(res, 14), "silent", 6))) ?
- 1 :
- 0;
- alarm_log.value_string =
- sqlite3_column_type(res, 22) == SQLITE_NULL ?
- strdupz((char *)"-") :
- strdupz((char *)format_value_and_unit(
- new_value_string, 100, sqlite3_column_double(res, 22), (char *)sqlite3_column_text(res, 16), -1));
- alarm_log.old_value_string =
- sqlite3_column_type(res, 23) == SQLITE_NULL ?
- strdupz((char *)"-") :
- strdupz((char *)format_value_and_unit(
- old_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 16), -1));
- alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 22);
- alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
- alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log.rendered_info = sqlite3_text_strdupz_empty(res, 17);
- alarm_log.chart_context = sqlite3_text_strdupz_empty(res, 25);
- alarm_log.transition_id = sqlite3_uuid_unparse_strdupz(res, 26);
- alarm_log.event_id = (time_t) sqlite3_column_int64(res, 27);
- alarm_log.chart_name = sqlite3_text_strdupz_empty(res, 28);
- alarm_log.summary = sqlite3_text_strdupz_empty(res, 29);
-
+ health_alarm_log_populate(&alarm_log, res, host, &status);
aclk_send_alarm_log_entry(&alarm_log);
+ wc->alert_count++;
- if (first_sequence_id == 0)
- first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+ last_id = alarm_log.sequence_id;
+ if (first_id == 0)
+ first_id = last_id;
- if (wc->alerts_log_first_sequence_id == 0)
- wc->alerts_log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
-
- last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
- wc->alerts_log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+ sql_update_alert_version(alarm_log.health_log_id, alarm_log.unique_id, status, alarm_log.version);
destroy_alarm_log_entry(&alarm_log);
- freez(edit_command);
}
- if (first_sequence_id) {
- buffer_flush(sql);
- buffer_sprintf(
- sql,
- "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
- "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64,
- wc->uuid_str,
- first_sequence_id,
- last_sequence_id);
-
- if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
- error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
-
+ if (first_id) {
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK RES [%s (%s)]: ALERTS SENT from %ld - %ld",
+ node_id_str,
+ rrdhost_hostname(host),
+ first_id,
+ last_id);
+
+ delete_alert_from_submit_queue(host, first_id, last_id);
// Mark to do one more check
- rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
-
- } else {
- if (wc->alerts_log_first_sequence_id)
- nd_log(NDLS_ACCESS, NDLP_DEBUG,
- "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
- wc->node_id,
- wc->host ? rrdhost_hostname(wc->host) : "N/A",
- wc->alerts_log_first_sequence_id,
- wc->alerts_log_last_sequence_id);
- wc->alerts_log_first_sequence_id = 0;
- wc->alerts_log_last_sequence_id = 0;
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
done:
+ REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
freez(claim_id);
- buffer_free(sql);
-#endif
}
-void aclk_push_alert_events_for_all_hosts(void)
+#define SQL_DELETE_PROCESSED_ROWS \
+ "DELETE FROM alert_queue WHERE host_id = @host_id AND rowid between @row1 AND @row2"
+
+static void delete_alert_from_pending_queue(RRDHOST *host, int64_t row1, int64_t row2)
{
- RRDHOST *host;
+ static __thread sqlite3_stmt *res = NULL;
- dfe_start_reentrant(rrdhost_root_index, host) {
- if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) ||
- !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS))
- continue;
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_PROCESSED_ROWS, &res))
+ return;
- rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row1));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row2));
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (likely(wc))
- aclk_push_alert_event(wc);
- }
- dfe_done(host);
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to delete processed rows, rc = %d", rc);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
}
-void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
+#define SQL_REBUILD_HOST_ALERT_VERSION_TABLE \
+ "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted) " \
+ " SELECT hl.health_log_id, hld.unique_id, hld.new_status, hld.when_key, UNIXEPOCH() " \
+ " FROM health_log hl, health_log_detail hld WHERE " \
+ " hl.host_id = @host_id AND hld.health_log_id = hl.health_log_id AND hld.transition_id = hl.last_transition_id"
+
+#define SQL_DELETE_HOST_ALERT_VERSION_TABLE \
+ "DELETE FROM alert_version WHERE health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id)"
+
+void rebuild_host_alert_version_table(RRDHOST *host)
{
sqlite3_stmt *res = NULL;
- int rc;
- struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_HOST_ALERT_VERSION_TABLE, &res))
+ return;
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
- rw_spinlock_write_lock(&host->health_log.spinlock);
+ param = 0;
+ int rc = execute_insert(res);
+ if (rc != SQLITE_DONE) {
+ netdata_log_error("Failed to delete the host alert version table");
+ goto done;
+ }
- buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str);
- if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
- goto skip;
+ SQLITE_FINALIZE(res);
+ if (!PREPARE_STATEMENT(db_meta, SQL_REBUILD_HOST_ALERT_VERSION_TABLE, &res))
+ return;
+
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+
+ param = 0;
+ rc = execute_insert(res);
+ if (rc != SQLITE_DONE)
+ netdata_log_error("Failed to rebuild the host alert version table");
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
+}
- buffer_flush(sql);
+#define SQL_PROCESS_ALERT_PENDING_QUEUE \
+ "SELECT health_log_id, unique_id, status, rowid" \
+ " FROM alert_queue WHERE host_id = @host_id AND date_scheduled <= UNIXEPOCH() ORDER BY rowid ASC"
- buffer_sprintf(
- sql,
- "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
- "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl "
- "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL "
- "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING",
- wc->uuid_str);
+bool process_alert_pending_queue(RRDHOST *host)
+{
+ static __thread sqlite3_stmt *res = NULL;
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res))
- goto skip;
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_PROCESS_ALERT_PENDING_QUEUE, &res))
+ return false;
int param = 0;
+ int added =0, count = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
param = 0;
- rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to queue existing alerts, rc = %d", rc);
- else
- rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ int64_t start_row = 0;
+ int64_t end_row = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+
+ int64_t health_log_id = sqlite3_column_int64(res, 0);
+ uint32_t unique_id = sqlite3_column_int64(res, 1);
+ RRDCALC_STATUS new_status = sqlite3_column_int(res, 2);
+ int64_t row = sqlite3_column_int64(res, 3);
+
+ if (host->aclk_config) {
+ int ret = insert_alert_to_submit_queue(host, health_log_id, unique_id, new_status);
+ if (ret == 0)
+ added++;
+ }
+
+ if (!start_row)
+ start_row = row;
+ end_row = row;
+
+ count++;
+ }
+ if (start_row)
+ delete_alert_from_pending_queue(host, start_row, end_row);
+
+ if(count)
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Processed %d entries, queued %d", rrdhost_hostname(host), count, added);
done:
REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
+ SQLITE_RESET(res);
+ return added > 0;
+}
+
+void aclk_push_alert_events_for_all_hosts(void)
+{
+ RRDHOST *host;
-skip:
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
+ // Checking if we shutting down
+ if (!service_running(SERVICE_ACLK))
+ return;
+
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) ||
+ rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
+ continue;
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc || false == wc->stream_alerts || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
+ (void)process_alert_pending_queue(host);
+ commit_alert_events(host);
+ continue;
+ }
+
+ if (wc->send_snapshot) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ if (wc->send_snapshot == 1)
+ continue;
+ (void)process_alert_pending_queue(host);
+ commit_alert_events(host);
+ rebuild_host_alert_version_table(host);
+ send_alert_snapshot_to_cloud(host);
+ wc->snapshot_count++;
+ wc->send_snapshot = 0;
+ }
+ else
+ aclk_push_alert_event(host);
+ }
+ dfe_done(host);
}
-void aclk_send_alarm_configuration(char *config_hash)
+void aclk_send_alert_configuration(char *config_hash)
{
if (unlikely(!config_hash))
return;
@@ -484,7 +670,6 @@ void aclk_send_alarm_configuration(char *config_hash)
void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
{
-#ifdef ENABLE_ACLK
sqlite3_stmt *res = NULL;
struct aclk_sync_cfg_t *wc;
@@ -586,91 +771,60 @@ done:
SQLITE_FINALIZE(res);
freez(config_hash);
freez(node_id);
-#endif
}
+#define SQL_ALERT_VERSION_CALC \
+ "SELECT SUM(version) FROM health_log hl, alert_version av" \
+ " WHERE hl.host_id = @host_uuid AND hl.health_log_id = av.health_log_id AND av.status <> -2"
-// Start streaming alerts
-void aclk_start_alert_streaming(char *node_id, bool resets)
-{
- nd_uuid_t node_uuid;
-
- if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
- return;
-
- struct aclk_sync_cfg_t *wc;
-
- RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host || !(wc = host->aclk_config)))
- return;
-
- if (unlikely(!host->health.health_enabled)) {
- nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
- return;
- }
-
- if (resets) {
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
- sql_queue_existing_alerts_to_aclk(host);
- } else
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
-
- wc->alert_updates = 1;
- wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
-}
-
-#define SQL_QUEUE_REMOVE_ALERTS \
- "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
- "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \
- "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
- "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
- "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \
- "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \
- "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \
- "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING"
-
-void sql_process_queue_removed_alerts_to_aclk(char *node_id)
+static uint64_t calculate_node_alert_version(RRDHOST *host)
{
- struct aclk_sync_cfg_t *wc;
- RRDHOST *host = find_host_by_node_id(node_id);
- freez(node_id);
-
- if (unlikely(!host || !(wc = host->aclk_config)))
- return;
-
- sqlite3_stmt *res = NULL;
-
- CLEAN_BUFFER *wb = buffer_create(1024, NULL); // Note buffer auto free on function return
- buffer_sprintf(wb, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str);
+ static __thread sqlite3_stmt *res = NULL;
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(wb), &res))
- return;
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_ALERT_VERSION_CALC, &res))
+ return 0;
+ uint64_t version = 0;
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
param = 0;
- int rc = execute_insert(res);
- if (likely(rc == SQLITE_DONE)) {
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
- rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- wc->alert_queue_removed = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ version = (uint64_t)sqlite3_column_int64(res, 0);
}
done:
REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
+ SQLITE_RESET(res);
+ return version;
}
-void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
+static void schedule_alert_snapshot_if_needed(struct aclk_sync_cfg_t *wc, uint64_t cloud_version)
{
- if (unlikely(!host->aclk_config || !claimed() || !host->node_id))
- return;
-
- char node_id[UUID_STR_LEN];
- uuid_unparse_lower(*host->node_id, node_id);
+ uint64_t local_version = calculate_node_alert_version(wc->host);
+ if (local_version != cloud_version) {
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_NOTICE,
+ "Scheduling alert snapshot for host \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
+ rrdhost_hostname(wc->host),
+ wc->node_id,
+ cloud_version,
+ local_version);
- aclk_push_node_removed_alerts(node_id);
+ wc->send_snapshot = 1;
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ }
+ else
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "Alert check on \"%s\", node \"%s\" (version: cloud %zu, local %zu)",
+ rrdhost_hostname(wc->host),
+ wc->node_id,
+ cloud_version,
+ local_version);
+ wc->checkpoint_count++;
}
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
@@ -699,371 +853,202 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unus
wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
- aclk_push_node_alert_snapshot(node_id);
+ wc->send_snapshot = 1;
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
-#ifdef ENABLE_ACLK
-void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
-{
- char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
- char config_hash_id[UUID_STR_LEN];
- uuid_unparse_lower(ae->config_hash_id, config_hash_id);
- char transition_id[UUID_STR_LEN];
- uuid_unparse_lower(ae->transition_id, transition_id);
-
- alarm_log->chart = strdupz(ae_chart_id(ae));
- alarm_log->name = strdupz(ae_name(ae));
-
- alarm_log->when = ae->when;
-
- alarm_log->config_hash = strdupz((char *)config_hash_id);
-
- alarm_log->utc_offset = host->utc_offset;
- alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
- alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec));
- alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
+#define SQL_COUNT_SNAPSHOT_ENTRIES \
+ "SELECT COUNT(1) FROM alert_version av, health_log hl " \
+ "WHERE hl.host_id = @host_id AND hl.health_log_id = av.health_log_id AND av.status <> -2"
- alarm_log->command = strdupz((char *)edit_command);
-
- alarm_log->duration = (time_t)ae->duration;
- alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
- alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
- alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
- alarm_log->delay = ae->delay;
- alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
- alarm_log->last_repeat = (time_t)ae->last_repeat;
-
- alarm_log->silenced =
- ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
- 1 :
- 0;
+static int calculate_alert_snapshot_entries(nd_uuid_t *host_uuid)
+{
+ int count = 0;
- alarm_log->value_string = strdupz(ae_new_value_string(ae));
- alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
+ sqlite3_stmt *res = NULL;
- alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
- alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
+ if (!PREPARE_STATEMENT(db_meta, SQL_COUNT_SNAPSHOT_ENTRIES, &res))
+ return 0;
- alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log->rendered_info = strdupz(ae_info(ae));
- alarm_log->chart_context = strdupz(ae_chart_context(ae));
- alarm_log->chart_name = strdupz(ae_chart_name(ae));
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC));
- alarm_log->transition_id = strdupz((char *)transition_id);
- alarm_log->event_id = (uint64_t) ae->alarm_event_id;
+ param = 0;
+ int rc = sqlite3_step_monitored(res);
+ if (rc == SQLITE_ROW)
+ count = sqlite3_column_int(res, 0);
+ else
+ error_report("Failed to select snapshot count");
- alarm_log->summary = strdupz(ae_summary(ae));
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_FINALIZE(res);
- freez(edit_command);
+ return count;
}
-#endif
-
-#ifdef ENABLE_ACLK
-static bool have_recent_alarm_unsafe(RRDHOST *host, int64_t alarm_id, int64_t mark)
-{
- ALARM_ENTRY *ae = host->health_log.alarms;
-
- while (ae) {
- if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
- (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
- return true;
- ae = ae->next;
- }
- return false;
-}
-#endif
+#define SQL_GET_SNAPSHOT_ENTRIES \
+ " SELECT 0, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \
+ " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \
+ " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \
+ " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " \
+ " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, av.version " \
+ " FROM health_log hl, alert_hash ah, health_log_detail hld, alert_version av " \
+ " WHERE hl.config_hash_id = ah.hash_id" \
+ " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \
+ " AND hld.health_log_id = av.health_log_id AND av.unique_id = hld.unique_id AND av.status <> -2"
#define ALARM_EVENTS_PER_CHUNK 1000
-void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
+void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused)
{
-#ifdef ENABLE_ACLK
- RRDHOST *host = find_host_by_node_id(node_id);
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!host)) {
- nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id);
- freez(node_id);
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", wc->node_id);
return;
}
- freez(node_id);
- struct aclk_sync_cfg_t *wc = host->aclk_config;
-
- // we perhaps we don't need this for snapshots
- if (unlikely(!wc->alert_updates)) {
- nd_log(NDLS_ACCESS, NDLP_NOTICE,
- "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
- wc->node_id,
- wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ char *claim_id = get_agent_claimid();
+ if (unlikely(!claim_id))
return;
- }
- if (unlikely(!wc->alerts_snapshot_uuid))
+ // Check database for this node to see how many alerts we will need to put in the snapshot
+ int cnt = calculate_alert_snapshot_entries(&host->host_uuid);
+ if (!cnt) {
+ freez(claim_id);
return;
+ }
- char *claim_id = get_agent_claimid();
- if (unlikely(!claim_id))
+ sqlite3_stmt *res = NULL;
+ if (!PREPARE_STATEMENT(db_meta, SQL_GET_SNAPSHOT_ENTRIES, &res))
return;
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
-
- uint32_t cnt = 0;
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
- rw_spinlock_read_lock(&host->health_log.spinlock);
+ nd_uuid_t local_snapshot_uuid;
+ char snapshot_uuid_str[UUID_STR_LEN];
+ uuid_generate_random(local_snapshot_uuid);
+ uuid_unparse_lower(local_snapshot_uuid, snapshot_uuid_str);
+ char *snapshot_uuid = &snapshot_uuid_str[0];
- ALARM_ENTRY *ae = host->health_log.alarms;
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: Sending %d alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(host),
+ cnt, snapshot_uuid);
- for (; ae; ae = ae->next) {
- if (likely(ae->updated_by_id))
- continue;
+ uint32_t chunks;
+ chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
- if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
- continue;
+ alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
+ struct alarm_snapshot alarm_snap;
+ struct alarm_log_entry alarm_log;
- if (have_recent_alarm_unsafe(host, ae->alarm_id, ae->unique_id))
- continue;
+ alarm_snap.node_id = wc->node_id;
+ alarm_snap.claim_id = claim_id;
+ alarm_snap.snapshot_uuid = snapshot_uuid;
+ alarm_snap.chunks = chunks;
+ alarm_snap.chunk = 1;
- if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
- continue;
+ alarm_log.node_id = wc->node_id;
+ alarm_log.claim_id = claim_id;
+ cnt = 0;
+ param = 0;
+ uint64_t version = 0;
+ int total_count = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
cnt++;
- }
-
- if (cnt) {
- uint32_t chunks;
-
- chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
- ae = host->health_log.alarms;
-
- cnt = 0;
- struct alarm_snapshot alarm_snap;
- alarm_snap.node_id = wc->node_id;
- alarm_snap.claim_id = claim_id;
- alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
- alarm_snap.chunks = chunks;
- alarm_snap.chunk = 1;
-
- alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
-
- for (; ae; ae = ae->next) {
- if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
- continue;
-
- if (have_recent_alarm_unsafe(host, ae->alarm_id, ae->unique_id))
- continue;
-
- if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
- continue;
-
- cnt++;
+ total_count++;
- struct alarm_log_entry alarm_log;
- alarm_log.node_id = wc->node_id;
- alarm_log.claim_id = claim_id;
+ if (!snapshot_proto)
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
- if (!snapshot_proto)
- snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+ health_alarm_log_populate(&alarm_log, res, host, NULL);
- health_alarm_entry2proto_nolock(&alarm_log, ae, host);
- add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
+ add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
+ version += alarm_log.version;
- if (cnt == ALARM_EVENTS_PER_CHUNK) {
+ if (cnt == ALARM_EVENTS_PER_CHUNK) {
+ if (aclk_connected)
aclk_send_alarm_snapshot(snapshot_proto);
- cnt = 0;
- if (alarm_snap.chunk < chunks) {
- alarm_snap.chunk++;
- snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
- }
+ cnt = 0;
+ if (alarm_snap.chunk < chunks) {
+ alarm_snap.chunk++;
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
}
- destroy_alarm_log_entry(&alarm_log);
}
- if (cnt)
- aclk_send_alarm_snapshot(snapshot_proto);
+ destroy_alarm_log_entry(&alarm_log);
}
+ if (cnt)
+ aclk_send_alarm_snapshot(snapshot_proto);
- rw_spinlock_read_unlock(&host->health_log.spinlock);
- wc->alerts_snapshot_uuid = NULL;
-
- freez(claim_id);
-#endif
-}
-
-#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period"
-
-void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
-{
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (unlikely(!wc))
- return;
-
- char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str);
-
- sqlite3_stmt *res = NULL;
-
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
- return;
-
- int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, MAX_REMOVED_PERIOD));
-
- param = 0;
- int rc = sqlite3_step_monitored(res);
- if (rc != SQLITE_DONE)
- error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries.");
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: Sent! %d alerts snapshot, snapshot_uuid %s (version = %zu)",
+ wc->node_id,
+ rrdhost_hostname(host),
+ cnt,
+ snapshot_uuid,
+ version);
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
-}
-#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
- "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
- "FROM aclk_alert_%s WHERE date_submitted IS NULL"
-int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
-{
-
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (!wc)
- return 1;
-
- proto_alert_status->alert_updates = wc->alert_updates;
-
- char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
-
- sqlite3_stmt *res = NULL;
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
- return 1;
-
- while (sqlite3_step_monitored(res) == SQLITE_ROW) {
- proto_alert_status->pending_min_sequence_id =
- sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0;
- proto_alert_status->pending_max_sequence_id =
- sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0;
- proto_alert_status->last_submitted_sequence_id =
- sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0;
- }
-
- SQLITE_FINALIZE(res);
-
- return 0;
+ freez(claim_id);
}
-void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
+// Start streaming alerts
+void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version)
{
- if (unlikely(!node_id))
+ nd_uuid_t node_uuid;
+
+ if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host || !(wc = host->aclk_config)))
- nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
- else {
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
- wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
- }
-}
-
-typedef struct active_alerts {
- char *name;
- char *chart;
- RRDCALC_STATUS status;
-} active_alerts_t;
-
-static inline int compare_active_alerts(const void *a, const void *b)
-{
- active_alerts_t *active_alerts_a = (active_alerts_t *)a;
- active_alerts_t *active_alerts_b = (active_alerts_t *)b;
-
- if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) {
- return strcmp(active_alerts_a->chart, active_alerts_b->chart);
- } else
- return strcmp(active_alerts_a->name, active_alerts_b->name);
-}
-
-#define BATCH_ALLOCATED 10
-void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
-{
-#ifdef ENABLE_ACLK
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (unlikely(!wc)) {
- nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
+ if (unlikely(!host || !(wc = host->aclk_config))) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, invalid node.", node_id);
return;
}
- if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
- //postpone checkpoint send
- wc->alert_checkpoint_req += 3;
- nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
+ if (unlikely(!host->health.health_enabled)) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
return;
}
- RRDCALC *rc;
- uint32_t cnt = 0;
- size_t len = 0;
-
- active_alerts_t *active_alerts = callocz(BATCH_ALLOCATED, sizeof(active_alerts_t));
- foreach_rrdcalc_in_rrdhost_read(host, rc) {
- if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
- continue;
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ schedule_alert_snapshot_if_needed(wc, cloud_version);
+ wc->stream_alerts = true;
+}
- if (rc->status == RRDCALC_STATUS_WARNING ||
- rc->status == RRDCALC_STATUS_CRITICAL) {
+// Do checkpoint alert version check
+void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version)
+{
+ nd_uuid_t node_uuid;
- if (cnt && !(cnt % BATCH_ALLOCATED)) {
- active_alerts = reallocz(active_alerts, (BATCH_ALLOCATED * ((cnt / BATCH_ALLOCATED) + 1)) * sizeof(active_alerts_t));
- }
+ if (unlikely(!node_id || !claim_id || !claimed() || uuid_parse(node_id, node_uuid)))
+ return;
- active_alerts[cnt].name = (char *)rrdcalc_name(rc);
- len += string_strlen(rc->config.name);
- active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
- len += string_strlen(rc->chart);
- active_alerts[cnt].status = rc->status;
- len++;
- cnt++;
- }
- }
- foreach_rrdcalc_in_rrdhost_done(rc);
-
- BUFFER *alarms_to_hash;
- if (cnt) {
- qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
-
- alarms_to_hash = buffer_create(len, NULL);
- for (uint32_t i = 0; i < cnt; i++) {
- buffer_strcat(alarms_to_hash, active_alerts[i].name);
- buffer_strcat(alarms_to_hash, active_alerts[i].chart);
- if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
- buffer_fast_strcat(alarms_to_hash, "W", 1);
- else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
- buffer_fast_strcat(alarms_to_hash, "C", 1);
- }
- } else {
- alarms_to_hash = buffer_create(1, NULL);
- buffer_strcat(alarms_to_hash, "");
- len = 0;
+ char *agent_claim_id = get_agent_claimid();
+ if (claim_id && agent_claim_id && strcmp(agent_claim_id, claim_id) != 0) {
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED WITH INVALID CLAIM ID", node_id);
+ goto done;
}
- freez(active_alerts);
- char hash[SHA256_DIGEST_LENGTH + 1];
- if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
- hash[SHA256_DIGEST_LENGTH] = 0;
+ struct aclk_sync_cfg_t *wc;
+ RRDHOST *host = find_host_by_node_id(node_id);
- struct alarm_checkpoint alarm_checkpoint;
- char *claim_id = get_agent_claimid();
- alarm_checkpoint.claim_id = claim_id;
- alarm_checkpoint.node_id = wc->node_id;
- alarm_checkpoint.checksum = (char *)hash;
+ if ((!host || !(wc = host->aclk_config)))
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED FOR INVALID NODE", node_id);
+ else
+ schedule_alert_snapshot_if_needed(wc, cloud_version);
- aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
- freez(claim_id);
- nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
- } else
- nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
+done:
+ freez(agent_claim_id);
+}
- wc->alert_checkpoint_req = 0;
- buffer_free(alarms_to_hash);
#endif
-}
diff --git a/src/database/sqlite/sqlite_aclk_alert.h b/src/database/sqlite/sqlite_aclk_alert.h
index cfb3468b9..17a58154f 100644
--- a/src/database/sqlite/sqlite_aclk_alert.h
+++ b/src/database/sqlite/sqlite_aclk_alert.h
@@ -5,28 +5,14 @@
extern sqlite3 *db_meta;
-#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3
-#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4
-
-struct proto_alert_status {
- int alert_updates;
- uint64_t pending_min_sequence_id;
- uint64_t pending_max_sequence_id;
- uint64_t last_submitted_sequence_id;
-};
-
-void aclk_send_alarm_configuration (char *config_hash);
+void aclk_send_alert_configuration(char *config_hash);
void aclk_push_alert_config_event(char *node_id, char *config_hash);
-void aclk_start_alert_streaming(char *node_id, bool resets);
-void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
-void sql_process_queue_removed_alerts_to_aclk(char *node_id);
-void aclk_send_alarm_checkpoint(char *node_id, char *claim_id);
-void aclk_push_alarm_checkpoint(RRDHOST *host);
+void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version);
+void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version);
-void aclk_push_alert_snapshot_event(char *node_id);
+void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused);
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid);
-int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
-void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter);
+bool process_alert_pending_queue(RRDHOST *host);
void aclk_push_alert_events_for_all_hosts(void);
#endif //NETDATA_SQLITE_ACLK_ALERT_H
diff --git a/src/database/sqlite/sqlite_aclk_node.c b/src/database/sqlite/sqlite_aclk_node.c
index 3134438db..70d1ebda1 100644
--- a/src/database/sqlite/sqlite_aclk_node.c
+++ b/src/database/sqlite/sqlite_aclk_node.c
@@ -90,13 +90,7 @@ static void build_node_info(RRDHOST *host)
node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", "");
node_info.data.machine_guid = host->machine_guid;
- struct capability node_caps[] = {
- {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled},
- {.name = "mc",
- .version = host->system_info->mc_version ? host->system_info->mc_version : 0,
- .enabled = host->system_info->mc_version ? 1 : 0},
- {.name = NULL, .version = 0, .enabled = 0}};
- node_info.node_capabilities = node_caps;
+ node_info.node_capabilities = (struct capability *)aclk_get_agent_capas();
node_info.data.ml_info.ml_capable = host->system_info->ml_capable;
node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled;
diff --git a/src/database/sqlite/sqlite_context.c b/src/database/sqlite/sqlite_context.c
index 1e49dd2bf..1d0c768e5 100644
--- a/src/database/sqlite/sqlite_context.c
+++ b/src/database/sqlite/sqlite_context.c
@@ -43,7 +43,7 @@ int sql_init_context_database(int memory)
return 1;
}
- errno = 0;
+ errno_clear();
netdata_log_info("SQLite database %s initialization", sqlite_database);
char buf[1024 + 1] = "";
diff --git a/src/database/sqlite/sqlite_db_migration.c b/src/database/sqlite/sqlite_db_migration.c
index 88abd8492..44a5e97c2 100644
--- a/src/database/sqlite/sqlite_db_migration.c
+++ b/src/database/sqlite/sqlite_db_migration.c
@@ -518,7 +518,7 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name
}
if (likely(user_version == target_version)) {
- errno = 0;
+ errno_clear();
netdata_log_info("%s database version is %d (no migration needed)", db_name, target_version);
return target_version;
}
diff --git a/src/database/sqlite/sqlite_functions.c b/src/database/sqlite/sqlite_functions.c
index 5c18ff8ed..e62743f59 100644
--- a/src/database/sqlite/sqlite_functions.c
+++ b/src/database/sqlite/sqlite_functions.c
@@ -43,7 +43,7 @@ SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt) {
return rc;
}
-static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database)
+static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database, int rc)
{
if (!res && !database)
@@ -54,7 +54,7 @@ static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database)
if (db_meta == database) {
char recover_file[FILENAME_MAX + 1];
- snprintfz(recover_file, FILENAME_MAX, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir);
+ snprintfz(recover_file, FILENAME_MAX, "%s/.netdata-meta.db.%s", netdata_configured_cache_dir, SQLITE_CORRUPT == rc ? "recover" : "delete" );
int fd = open(recover_file, O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 444);
if (fd >= 0) {
close(fd);
@@ -69,7 +69,7 @@ int execute_insert(sqlite3_stmt *res)
int rc;
rc = sqlite3_step_monitored(res);
if (rc == SQLITE_CORRUPT) {
- (void)mark_database_to_recover(res, NULL);
+ (void)mark_database_to_recover(res, NULL, rc);
error_report("SQLite error %d", rc);
}
return rc;
@@ -229,8 +229,8 @@ int init_database_batch(sqlite3 *database, const char *batch[], const char *desc
analytics_set_data_str(&analytics_data.netdata_fail_reason, error_str);
sqlite3_free(err_msg);
freez(error_str);
- if (SQLITE_CORRUPT == rc) {
- if (mark_database_to_recover(NULL, database))
+ if (SQLITE_CORRUPT == rc || SQLITE_NOTADB == rc) {
+ if (mark_database_to_recover(NULL, database, rc))
error_report("Database is corrupted will attempt to fix");
return SQLITE_CORRUPT;
}
@@ -263,7 +263,7 @@ int db_execute(sqlite3 *db, const char *cmd)
}
if (rc == SQLITE_CORRUPT)
- mark_database_to_recover(NULL, db);
+ mark_database_to_recover(NULL, db, rc);
break;
}
return (rc != SQLITE_OK);
diff --git a/src/database/sqlite/sqlite_health.c b/src/database/sqlite/sqlite_health.c
index 51e38d05a..a632fd494 100644
--- a/src/database/sqlite/sqlite_health.c
+++ b/src/database/sqlite/sqlite_health.c
@@ -55,15 +55,137 @@ done:
}
/* Health related SQL queries
- Inserts an entry in the table
+ *
+ * Inserts an entry in the tables
+ * alert_queue
+ * health_log
+ * health_log_detail
+ *
*/
+int calculate_delay(RRDCALC_STATUS old_status, RRDCALC_STATUS new_status)
+{
+ int delay = ALERT_TRANSITION_DELAY_NONE;
+ switch(old_status) {
+ case RRDCALC_STATUS_REMOVED:
+ switch (new_status) {
+ case RRDCALC_STATUS_UNINITIALIZED:
+ delay = ALERT_TRANSITION_DELAY_LONG;
+ break;
+ case RRDCALC_STATUS_CLEAR:
+ delay = ALERT_TRANSITION_DELAY_SHORT;
+ break;
+ default:
+ delay = ALERT_TRANSITION_DELAY_NONE;
+ break;
+ }
+ break;
+ case RRDCALC_STATUS_UNDEFINED:
+ case RRDCALC_STATUS_UNINITIALIZED:
+ switch (new_status) {
+ case RRDCALC_STATUS_REMOVED:
+ case RRDCALC_STATUS_UNINITIALIZED:
+ case RRDCALC_STATUS_UNDEFINED:
+ delay = ALERT_TRANSITION_DELAY_LONG;
+ break;
+ case RRDCALC_STATUS_CLEAR:
+ delay = ALERT_TRANSITION_DELAY_SHORT;
+ break;
+ default:
+ delay = ALERT_TRANSITION_DELAY_NONE;
+ break;
+ }
+ break;
+ case RRDCALC_STATUS_CLEAR:
+ switch (new_status) {
+ case RRDCALC_STATUS_REMOVED:
+ case RRDCALC_STATUS_UNINITIALIZED:
+ case RRDCALC_STATUS_UNDEFINED:
+ delay = ALERT_TRANSITION_DELAY_LONG;
+ break;
+ case RRDCALC_STATUS_WARNING:
+ case RRDCALC_STATUS_CRITICAL:
+ default:
+ delay = ALERT_TRANSITION_DELAY_NONE;
+ break;
+
+ }
+ break;
+ case RRDCALC_STATUS_WARNING:
+ case RRDCALC_STATUS_CRITICAL:
+ switch (new_status) {
+ case RRDCALC_STATUS_REMOVED:
+ case RRDCALC_STATUS_UNINITIALIZED:
+ case RRDCALC_STATUS_UNDEFINED:
+ delay = ALERT_TRANSITION_DELAY_LONG;
+ break;
+ case RRDCALC_STATUS_CLEAR:
+ delay = ALERT_TRANSITION_DELAY_SHORT;
+ break;
+ default:
+ delay = ALERT_TRANSITION_DELAY_NONE;
+ break;
+ }
+ break;
+ default:
+ delay = ALERT_TRANSITION_DELAY_NONE;
+ break;
+ }
+ return delay;
+}
+
+#ifdef ENABLE_ACLK
+#define SQL_INSERT_ALERT_PENDING_QUEUE \
+ "INSERT INTO alert_queue (host_id, health_log_id, unique_id, alarm_id, status, date_scheduled)" \
+ " VALUES (@host_id, @health_log_id, @unique_id, @alarm_id, @new_status, UNIXEPOCH() + @delay)" \
+ " ON CONFLICT (host_id, health_log_id, alarm_id)" \
+ " DO UPDATE SET status = excluded.status, unique_id = excluded.unique_id, " \
+ " date_scheduled = MIN(date_scheduled, excluded.date_scheduled)"
+
+static void insert_alert_queue(
+ RRDHOST *host,
+ uint64_t health_log_id,
+ int64_t unique_id,
+ uint32_t alarm_id,
+ RRDCALC_STATUS old_status,
+ RRDCALC_STATUS new_status)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc;
+
+ if (!host->aclk_config)
+ return;
+
+ if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_INSERT_ALERT_PENDING_QUEUE, &res))
+ return;
+
+ int submit_delay = calculate_delay(old_status, new_status);
+
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64)health_log_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, alarm_id));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, new_status));
+ SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, submit_delay));
+
+ param = 0;
+ rc = execute_insert(res);
+ if (rc != SQLITE_DONE)
+ error_report(
+ "HEALTH [%s]: Failed to execute insert_alert_queue, rc = %d", rrdhost_hostname(host), rc);
+
+done:
+ REPORT_BIND_FAIL(res, param);
+ SQLITE_RESET(res);
+}
+#endif
#define SQL_INSERT_HEALTH_LOG_DETAIL \
"INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \
"updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \
"info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \
- "VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \
+ " VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \
"@non_clear_duration,@flags,@exec_run_timestamp,@delay_up_to_timestamp, @info,@exec_code,@new_status,@old_status," \
"@delay,@new_value,@old_value,@last_repeat,@transition_id,@global_id,@summary)"
@@ -150,6 +272,11 @@ static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae)
if (rc == SQLITE_ROW) {
health_log_id = (size_t)sqlite3_column_int64(res, 0);
sql_health_alarm_log_insert_detail(host, health_log_id, ae);
+#ifdef ENABLE_ACLK
+ if (netdata_cloud_enabled)
+ insert_alert_queue(
+ host, health_log_id, (int64_t)ae->unique_id, (int64_t)ae->alarm_id, ae->old_status, ae->new_status);
+#endif
} else
error_report("HEALTH [%s]: Failed to execute SQL_INSERT_HEALTH_LOG, rc = %d", rrdhost_hostname(host), rc);
@@ -162,14 +289,8 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae)
{
if (ae->flags & HEALTH_ENTRY_FLAG_SAVED)
sql_health_alarm_log_update(host, ae);
- else {
+ else
sql_health_alarm_log_insert(host, ae);
-#ifdef ENABLE_ACLK
- if (netdata_cloud_enabled) {
- sql_queue_alarm_to_aclk(host, ae, false);
- }
-#endif
- }
}
/*
@@ -179,44 +300,18 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae)
*
*/
-#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED \
+#define SQL_CLEANUP_HEALTH_LOG_DETAIL \
"DELETE FROM health_log_detail WHERE health_log_id IN " \
- "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \
- "AND updated_by_id <> 0 AND transition_id NOT IN " \
- "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)"
-
-#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) \
- "DELETE from health_log_detail WHERE unique_id NOT IN " \
- "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \
- "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \
- "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \
- "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \
- "AND when_key < unixepoch() - @history " \
- "AND updated_by_id <> 0 AND transition_id NOT IN " \
- "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)", \
- guid
-
-void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) {
+ " (SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \
+ " AND updated_by_id <> 0 AND transition_id NOT IN " \
+ " (SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)"
+
+void sql_health_alarm_log_cleanup(RRDHOST *host)
+{
sqlite3_stmt *res = NULL;
int rc;
- char command[MAX_HEALTH_SQL_SIZE + 1];
-
- REQUIRE_DB(db_meta);
-
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- snprintfz(command, sizeof(command) - 1, "aclk_alert_%s", uuid_str);
- bool aclk_table_exists = table_exists_in_database(db_meta, command);
-
- char *sql = SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED;
-
- if (claimed && aclk_table_exists) {
- snprintfz(command, sizeof(command) - 1, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str));
- sql = command;
- }
-
- if (!PREPARE_STATEMENT(db_meta, sql, &res))
+ if (!PREPARE_STATEMENT(db_meta, SQL_CLEANUP_HEALTH_LOG_DETAIL, &res))
return;
int param = 0;
@@ -228,33 +323,21 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) {
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to cleanup health log detail table, rc = %d", rc);
- if (aclk_table_exists)
- sql_aclk_alert_clean_dead_entries(host);
-
done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
-#define SQL_INJECT_REMOVED \
- "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \
- "duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \
- "delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \
- "SELECT health_log_id, ?1, ?2, ?3, 0, ?4, UNIXEPOCH(), 0, 0, flags, exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \
- "new_status, delay, NULL, new_value, 0, ?5, NOW_USEC(0), summary FROM health_log_detail WHERE unique_id = ?6 AND transition_id = ?7"
-
-#define SQL_INJECT_REMOVED_UPDATE_DETAIL \
- "UPDATE health_log_detail SET flags = flags | ?1, updated_by_id = ?2 WHERE unique_id = ?3 AND transition_id = ?4"
-
-#define SQL_INJECT_REMOVED_UPDATE_LOG \
- "UPDATE health_log SET last_transition_id = ?1 WHERE alarm_id = ?2 AND last_transition_id = ?3 AND host_id = ?4"
+#define SQL_UPDATE_TRANSITION_IN_HEALTH_LOG \
+ "UPDATE health_log SET last_transition_id = @transition WHERE alarm_id = @alarm_id AND " \
+ " last_transition_id = @prev_trans AND host_id = @host_id"
-bool sql_update_removed_in_health_log(RRDHOST *host, uint32_t alarm_id, nd_uuid_t *transition_id, nd_uuid_t *last_transition)
+bool sql_update_transition_in_health_log(RRDHOST *host, uint32_t alarm_id, nd_uuid_t *transition_id, nd_uuid_t *last_transition)
{
int rc = 0;
sqlite3_stmt *res;
- if (!PREPARE_STATEMENT(db_meta, SQL_INJECT_REMOVED_UPDATE_LOG, &res))
+ if (!PREPARE_STATEMENT(db_meta, SQL_UPDATE_TRANSITION_IN_HEALTH_LOG, &res))
return false;
int param = 0;
@@ -275,12 +358,16 @@ done:
return (param == 0 && rc == SQLITE_DONE);
}
-bool sql_update_removed_in_health_log_detail(uint32_t unique_id, uint32_t max_unique_id, nd_uuid_t *prev_transition_id)
+#define SQL_SET_UPDATED_BY_IN_HEALTH_LOG_DETAIL \
+ "UPDATE health_log_detail SET flags = flags | @flag, updated_by_id = @updated_by WHERE" \
+ " unique_id = @unique_id AND transition_id = @transition_id"
+
+bool sql_set_updated_by_in_health_log_detail(uint32_t unique_id, uint32_t max_unique_id, nd_uuid_t *prev_transition_id)
{
int rc = 0;
sqlite3_stmt *res;
- if (!PREPARE_STATEMENT(db_meta, SQL_INJECT_REMOVED_UPDATE_DETAIL, &res))
+ if (!PREPARE_STATEMENT(db_meta, SQL_SET_UPDATED_BY_IN_HEALTH_LOG_DETAIL, &res))
return false;
int param = 0;
@@ -301,7 +388,16 @@ done:
return (param == 0 && rc == SQLITE_DONE);
}
-void sql_inject_removed_status(
+#define SQL_INJECT_REMOVED \
+ "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \
+ "duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \
+ "delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \
+ "SELECT health_log_id, @max_unique_id, @alarm_id, @alarm_event_id, 0, @unique_id, UNIXEPOCH(), 0, 0, flags, " \
+ " exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \
+ " new_status, delay, NULL, new_value, 0, @transition_id, NOW_USEC(0), summary FROM health_log_detail " \
+ " WHERE unique_id = @unique_id AND transition_id = @last_transition_id RETURNING health_log_id, old_status"
+
+static void sql_inject_removed_status(
RRDHOST *host,
uint32_t alarm_id,
uint32_t alarm_event_id,
@@ -326,19 +422,27 @@ void sql_inject_removed_status(
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) alarm_event_id + 1));
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) unique_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &transition_id, sizeof(transition_id), SQLITE_STATIC));
- SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) unique_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, last_transition, sizeof(*last_transition), SQLITE_STATIC));
param = 0;
- int rc = execute_insert(res);
- if (rc == SQLITE_DONE) {
+ //int rc = execute_insert(res);
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
//update the old entry in health_log_detail
- sql_update_removed_in_health_log_detail(unique_id, max_unique_id, last_transition);
+ sql_set_updated_by_in_health_log_detail(unique_id, max_unique_id, last_transition);
//update the old entry in health_log
- sql_update_removed_in_health_log(host, alarm_id, &transition_id, last_transition);
+ sql_update_transition_in_health_log(host, alarm_id, &transition_id, last_transition);
+
+#ifdef ENABLE_ACLK
+ if (netdata_cloud_enabled) {
+ int64_t health_log_id = sqlite3_column_int64(res, 0);
+ RRDCALC_STATUS old_status = (RRDCALC_STATUS)sqlite3_column_double(res, 1);
+ insert_alert_queue(
+ host, health_log_id, (int64_t)unique_id, (int64_t)alarm_id, old_status, RRDCALC_STATUS_REMOVED);
+ }
+#endif
}
- else
- error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED, rc = %d", rc);
+ //else
+ // error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED, rc = %d", rc);
done:
REPORT_BIND_FAIL(res, param);
@@ -461,7 +565,7 @@ void sql_alert_cleanup(bool cli)
{
UNUSED(cli);
- errno = 0;
+ errno_clear();
if (sql_init_meta_database(DB_CHECK_NONE, 0)) {
netdata_log_error("Failed to open database");
return;
@@ -489,7 +593,6 @@ void sql_alert_cleanup(bool cli)
void sql_health_alarm_log_load(RRDHOST *host)
{
sqlite3_stmt *res = NULL;
- int ret;
ssize_t errored = 0, loaded = 0;
if (!REQUIRE_DB(db_meta))
@@ -500,21 +603,19 @@ void sql_health_alarm_log_load(RRDHOST *host)
if (!PREPARE_STATEMENT(db_meta, SQL_LOAD_HEALTH_LOG, &res))
return;
- ret = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
- if (unlikely(ret != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter for SQL_LOAD_HEALTH_LOG.");
- SQLITE_FINALIZE(res);
- return;
- }
+ int param = 0;
+ SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC));
DICTIONARY *all_rrdcalcs = dictionary_create(
DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE);
+
RRDCALC *rc;
foreach_rrdcalc_in_rrdhost_read(host, rc) {
dictionary_set(all_rrdcalcs, rrdcalc_name(rc), rc, sizeof(*rc));
}
foreach_rrdcalc_in_rrdhost_done(rc);
+ param = 0;
rw_spinlock_read_lock(&host->health_log.spinlock);
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
@@ -618,8 +719,6 @@ void sql_health_alarm_log_load(RRDHOST *host)
ae->summary = SQLITE3_COLUMN_STRINGDUP_OR_NULL(res, 33);
char value_string[100 + 1];
- string_freez(ae->old_value_string);
- string_freez(ae->new_value_string);
ae->old_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->old_value, ae_units(ae), -1));
ae->new_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->new_value, ae_units(ae), -1));
@@ -631,7 +730,6 @@ void sql_health_alarm_log_load(RRDHOST *host)
if(unlikely(ae->alarm_id >= host->health_max_alarm_id))
host->health_max_alarm_id = ae->alarm_id;
-
loaded++;
}
@@ -650,7 +748,8 @@ void sql_health_alarm_log_load(RRDHOST *host)
nd_log(NDLS_DAEMON, errored ? NDLP_WARNING : NDLP_DEBUG,
"[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.",
rrdhost_hostname(host), loaded, errored);
-
+done:
+ REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}
diff --git a/src/database/sqlite/sqlite_health.h b/src/database/sqlite/sqlite_health.h
index 99f67a3a6..73a85e2b2 100644
--- a/src/database/sqlite/sqlite_health.h
+++ b/src/database/sqlite/sqlite_health.h
@@ -6,14 +6,17 @@
#include "daemon/common.h"
#include "sqlite3.h"
+#define ALERT_TRANSITION_DELAY_LONG (600)
+#define ALERT_TRANSITION_DELAY_SHORT (10)
+#define ALERT_TRANSITION_DELAY_NONE (0)
+
struct sql_alert_transition_data;
struct sql_alert_config_data;
struct rrd_alert_prototype;
void sql_health_alarm_log_load(RRDHOST *host);
void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae);
-void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed);
+void sql_health_alarm_log_cleanup(RRDHOST *host);
void sql_alert_store_config(struct rrd_alert_prototype *ap);
-void sql_aclk_alert_clean_dead_entries(RRDHOST *host);
int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status);
void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart);
int health_migrate_old_health_log_table(char *table);
diff --git a/src/database/sqlite/sqlite_metadata.c b/src/database/sqlite/sqlite_metadata.c
index 5573f7994..1b801b731 100644
--- a/src/database/sqlite/sqlite_metadata.c
+++ b/src/database/sqlite/sqlite_metadata.c
@@ -28,6 +28,17 @@ const char *database_config[] = {
"CREATE TABLE IF NOT EXISTS chart_label(chart_id blob, source_type int, label_key text, "
"label_value text, date_created int, PRIMARY KEY (chart_id, label_key))",
+ "CREATE TRIGGER IF NOT EXISTS del_chart_label AFTER DELETE ON chart "
+ "BEGIN DELETE FROM chart_label WHERE chart_id = old.chart_id; END",
+
+ "CREATE TRIGGER IF NOT EXISTS del_chart "
+ "AFTER DELETE ON dimension "
+ "FOR EACH ROW "
+ "BEGIN"
+ " DELETE FROM chart WHERE chart_id = OLD.chart_id "
+ " AND NOT EXISTS (SELECT 1 FROM dimension WHERE chart_id = OLD.chart_id);"
+ "END",
+
"CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created)",
"CREATE TABLE IF NOT EXISTS alert_hash(hash_id blob PRIMARY KEY, date_updated int, alarm text, template text, "
@@ -67,6 +78,18 @@ const char *database_config[] = {
"CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id)",
"CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id)",
+#ifdef ENABLE_ACLK
+ "CREATE TABLE IF NOT EXISTS alert_queue "
+ " (host_id BLOB, health_log_id INT, unique_id INT, alarm_id INT, status INT, date_scheduled INT, "
+ " UNIQUE(host_id, health_log_id, alarm_id))",
+
+ "CREATE TABLE IF NOT EXISTS alert_version (health_log_id INTEGER PRIMARY KEY, unique_id INT, status INT, "
+ "version INT, date_submitted INT)",
+
+ "CREATE TABLE IF NOT EXISTS aclk_queue (sequence_id INTEGER PRIMARY KEY, host_id blob, health_log_id INT, "
+ "unique_id INT, date_created INT, UNIQUE(host_id, health_log_id))",
+#endif
+
NULL
};
@@ -251,7 +274,7 @@ static inline void set_host_node_id(RRDHOST *host, nd_uuid_t *node_id)
}
if (unlikely(!wc))
- sql_create_aclk_table(host, &host->host_uuid, node_id);
+ create_aclk_config(host, &host->host_uuid, node_id);
else
uuid_unparse_lower(*node_id, wc->node_id);
}
@@ -668,6 +691,18 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory)
if (rebuild & DB_CHECK_RECOVER)
return 0;
}
+
+ snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/.netdata-meta.db.delete", netdata_configured_cache_dir);
+ rc = unlink(sqlite_database);
+ snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir);
+ if (rc == 0) {
+ char new_sqlite_database[FILENAME_MAX + 1];
+ snprintfz(new_sqlite_database, sizeof(new_sqlite_database) - 1, "%s/netdata-meta.bad", netdata_configured_cache_dir);
+ rc = rename(sqlite_database, new_sqlite_database);
+ if (rc)
+ error_report("Failed to rename %s to %s", sqlite_database, new_sqlite_database);
+ }
+ // note: sqlite_database contains the right name
}
else
strncpyz(sqlite_database, ":memory:", sizeof(sqlite_database) - 1);
@@ -699,7 +734,7 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory)
}
if (rebuild & DB_CHECK_ANALYZE) {
- errno = 0;
+ errno_clear();
netdata_log_info("Running ANALYZE on %s", sqlite_database);
rc = sqlite3_exec_monitored(db_meta, "ANALYZE", 0, 0, &err_msg);
if (rc != SQLITE_OK) {
@@ -713,7 +748,7 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory)
return 1;
}
- errno = 0;
+ errno_clear();
netdata_log_info("SQLite database %s initialization", sqlite_database);
rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0);
@@ -1430,11 +1465,10 @@ static void cleanup_health_log(struct metadata_wc *wc)
RRDHOST *host;
- bool is_claimed = claimed();
dfe_start_reentrant(rrdhost_root_index, host){
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
continue;
- sql_health_alarm_log_cleanup(host, is_claimed);
+ sql_health_alarm_log_cleanup(host);
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
break;
}
@@ -1445,6 +1479,9 @@ static void cleanup_health_log(struct metadata_wc *wc)
(void) db_execute(db_meta,"DELETE FROM health_log WHERE host_id NOT IN (SELECT host_id FROM host)");
(void) db_execute(db_meta,"DELETE FROM health_log_detail WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)");
+#ifdef ENABLE_ACLK
+ (void) db_execute(db_meta,"DELETE FROM alert_version WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)");
+#endif
}
//