diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:20 +0000 |
commit | 87d772a7d708fec12f48cd8adc0dedff6e1025da (patch) | |
tree | 1fee344c64cc3f43074a01981e21126c8482a522 /src/database | |
parent | Adding upstream version 1.46.3. (diff) | |
download | netdata-87d772a7d708fec12f48cd8adc0dedff6e1025da.tar.xz netdata-87d772a7d708fec12f48cd8adc0dedff6e1025da.zip |
Adding upstream version 1.47.0.upstream/1.47.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/engine/rrdengine.c | 2 | ||||
-rw-r--r-- | src/database/rrd.h | 3 | ||||
-rw-r--r-- | src/database/rrdhost.c | 16 | ||||
-rw-r--r-- | src/database/rrdlabels.c | 38 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_aclk.c | 228 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_aclk.h | 44 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_aclk_alert.c | 1269 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_aclk_alert.h | 24 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_aclk_node.c | 8 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_context.c | 2 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_db_migration.c | 2 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_functions.c | 12 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_health.c | 263 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_health.h | 7 | ||||
-rw-r--r-- | src/database/sqlite/sqlite_metadata.c | 47 |
15 files changed, 915 insertions, 1050 deletions
diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index 2d6583ea..a989877f 100644 --- a/src/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -1517,7 +1517,7 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb break; } - errno = 0; + errno_clear(); if(count) nd_log(NDLS_DAEMON, NDLP_DEBUG, "DBENGINE: journal indexing done; %u files processed", diff --git a/src/database/rrd.h b/src/database/rrd.h index 097e2502..bd31e21e 100644 --- a/src/database/rrd.h +++ b/src/database/rrd.h @@ -1043,7 +1043,6 @@ struct alarm_entry { STRING *recipient; time_t exec_run_timestamp; int exec_code; - uint64_t exec_spawn_serial; STRING *source; STRING *units; @@ -1069,6 +1068,8 @@ struct alarm_entry { time_t last_repeat; + POPEN_INSTANCE *popen_instance; + struct alarm_entry *next; struct alarm_entry *next_in_progress; struct alarm_entry *prev_in_progress; diff --git a/src/database/rrdhost.c b/src/database/rrdhost.c index 6bf2c255..b3d786cf 100644 --- a/src/database/rrdhost.c +++ b/src/database/rrdhost.c @@ -935,7 +935,13 @@ void dbengine_init(char *hostname) { config_set_number(CONFIG_SECTION_DB, "dbengine tier 0 disk space MB", default_multidb_disk_quota_mb); } +#ifdef OS_WINDOWS + // FIXME: for whatever reason joining the initialization threads + // fails on Windows. + bool parallel_initialization = false; +#else bool parallel_initialization = (storage_tiers <= (size_t)get_netdata_cpus()) ? true : false; +#endif struct dbengine_initialization tiers_init[RRD_STORAGE_TIERS] = {}; @@ -1488,18 +1494,16 @@ static void rrdhost_load_kubernetes_labels(void) { return; } - pid_t pid; - FILE *fp_child_input; - FILE *fp_child_output = netdata_popen(label_script, &pid, &fp_child_input); - if(!fp_child_output) return; + POPEN_INSTANCE *instance = spawn_popen_run(label_script); + if(!instance) return; char buffer[1000 + 1]; - while (fgets(buffer, 1000, fp_child_output) != NULL) + while (fgets(buffer, 1000, instance->child_stdout_fp) != NULL) rrdlabels_add_pair(localhost->rrdlabels, buffer, RRDLABEL_SRC_AUTO|RRDLABEL_SRC_K8S); // Non-zero exit code means that all the script output is error messages. We've shown already any message that didn't include a ':' // Here we'll inform with an ERROR that the script failed, show whatever (if anything) was added to the list of labels, free the memory and set the return to null - int rc = netdata_pclose(fp_child_input, fp_child_output, pid); + int rc = spawn_popen_wait(instance); if(rc) nd_log(NDLS_DAEMON, NDLP_ERR, "%s exited abnormally. Failed to get kubernetes labels.", diff --git a/src/database/rrdlabels.c b/src/database/rrdlabels.c index b82fa76d..65e2dc9e 100644 --- a/src/database/rrdlabels.c +++ b/src/database/rrdlabels.c @@ -412,32 +412,6 @@ __attribute__((constructor)) void initialize_labels_keys_char_map(void) { label_names_char_map[i] = label_values_char_map[i]; // apply overrides to the label names map - label_names_char_map['A'] = 'a'; - label_names_char_map['B'] = 'b'; - label_names_char_map['C'] = 'c'; - label_names_char_map['D'] = 'd'; - label_names_char_map['E'] = 'e'; - label_names_char_map['F'] = 'f'; - label_names_char_map['G'] = 'g'; - label_names_char_map['H'] = 'h'; - label_names_char_map['I'] = 'i'; - label_names_char_map['J'] = 'j'; - label_names_char_map['K'] = 'k'; - label_names_char_map['L'] = 'l'; - label_names_char_map['M'] = 'm'; - label_names_char_map['N'] = 'n'; - label_names_char_map['O'] = 'o'; - label_names_char_map['P'] = 'p'; - label_names_char_map['Q'] = 'q'; - label_names_char_map['R'] = 'r'; - label_names_char_map['S'] = 's'; - label_names_char_map['T'] = 't'; - label_names_char_map['U'] = 'u'; - label_names_char_map['V'] = 'v'; - label_names_char_map['W'] = 'w'; - label_names_char_map['X'] = 'x'; - label_names_char_map['Y'] = 'y'; - label_names_char_map['Z'] = 'z'; label_names_char_map['='] = '_'; label_names_char_map[':'] = '_'; label_names_char_map['+'] = '_'; @@ -1652,13 +1626,13 @@ static int rrdlabels_unittest_add_pairs() { errors += rrdlabels_unittest_add_a_pair("\"tag=1\": country:\"Gre\\\"ece\"", "tag_1", "country:Gre_ece"); errors += rrdlabels_unittest_add_a_pair("\"tag=1\" = country:\"Gre\\\"ece\"", "tag_1", "country:Gre_ece"); - errors += rrdlabels_unittest_add_a_pair("\t'LABE=L'\t=\t\"World\" peace", "labe_l", "World peace"); - errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry:\"World\":\"Europe\":\"Greece\"", "la_b_el", "country:World:Europe:Greece"); - errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry\\\"World\"\\\"Europe\"\\\"Greece\"", "la_b_el", "country/World/Europe/Greece"); + errors += rrdlabels_unittest_add_a_pair("\t'LABE=L'\t=\t\"World\" peace", "LABE_L", "World peace"); + errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry:\"World\":\"Europe\":\"Greece\"", "LA_B_EL", "country:World:Europe:Greece"); + errors += rrdlabels_unittest_add_a_pair("\t'LA\\'B:EL'\t=\tcountry\\\"World\"\\\"Europe\"\\\"Greece\"", "LA_B_EL", "country/World/Europe/Greece"); - errors += rrdlabels_unittest_add_a_pair("NAME=\"VALUE\"", "name", "VALUE"); - errors += rrdlabels_unittest_add_a_pair("\"NAME\" : \"VALUE\"", "name", "VALUE"); - errors += rrdlabels_unittest_add_a_pair("NAME: \"VALUE\"", "name", "VALUE"); + errors += rrdlabels_unittest_add_a_pair("NAME=\"VALUE\"", "NAME", "VALUE"); + errors += rrdlabels_unittest_add_a_pair("\"NAME\" : \"VALUE\"", "NAME", "VALUE"); + errors += rrdlabels_unittest_add_a_pair("NAME: \"VALUE\"", "NAME", "VALUE"); return errors; } diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 8dc2231b..027ee8f9 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -9,7 +9,6 @@ struct aclk_sync_config_s { uv_thread_t thread; uv_loop_t loop; uv_timer_t timer_req; - time_t cleanup_after; // Start a cleanup after this timestamp uv_async_t async; bool initialized; SPINLOCK cmd_queue_lock; @@ -24,7 +23,7 @@ void sanity_check(void) { #ifdef ENABLE_ACLK static struct aclk_database_cmd aclk_database_deq_cmd(void) { - struct aclk_database_cmd ret; + struct aclk_database_cmd ret = { 0 }; spinlock_lock(&aclk_sync_config.cmd_queue_lock); if(aclk_sync_config.cmd_base) { @@ -35,7 +34,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void) } else { ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; } spinlock_unlock(&aclk_sync_config.cmd_queue_lock); @@ -176,70 +174,24 @@ static int create_host_callback(void *data, int argc, char **argv, char **column #ifdef ENABLE_ACLK -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id" -static int is_host_available(nd_uuid_t *host_id) -{ - sqlite3_stmt *res = NULL; - int rc = 0; - - if (!REQUIRE_DB(db_meta)) - return 1; - - if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_BY_UUID, &res)) - return 1; - - int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC)); - - param = 0; - rc = sqlite3_step_monitored(res); - -done: - REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); - return (rc == SQLITE_ROW); -} +#define SQL_SELECT_ACLK_ALERT_TABLES \ + "SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')" -// OPCODE: ACLK_DATABASE_DELETE_HOST -static void sql_delete_aclk_table_list(char *host_guid) +static void sql_delete_aclk_table_list(void) { - char uuid_str[UUID_STR_LEN]; - char host_str[UUID_STR_LEN]; - - int rc; - nd_uuid_t host_uuid; - - if (unlikely(!host_guid)) - return; - - rc = uuid_parse(host_guid, host_uuid); - freez(host_guid); - if (rc) - return; - - uuid_unparse_lower(host_uuid, host_str); - uuid_unparse_lower_fix(&host_uuid, uuid_str); - - if (is_host_available(&host_uuid)) - return; - sqlite3_stmt *res = NULL; - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); - buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str); + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, NULL); - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) + if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ACLK_ALERT_TABLES, &res)) goto fail; - buffer_flush(sql); - while (sqlite3_step_monitored(res) == SQLITE_ROW) buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); SQLITE_FINALIZE(res); - rc = db_execute(db_meta, buffer_tostring(sql)); + int rc = db_execute(db_meta, buffer_tostring(sql)); if (unlikely(rc)) netdata_log_error("Failed to drop unused ACLK tables"); @@ -285,53 +237,6 @@ skip: freez(machine_guid); } - -static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) -{ - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.param[0] = strdupz((char *) argv[0]); - aclk_database_enq_cmd(&cmd); - return 0; -} - -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')" - -static void sql_check_aclk_table_list(void) -{ - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } -} - -#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d" - -static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) -{ - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); - if (unlikely(db_execute(db_meta, sql))) - error_report("Failed to clean stale ACLK alert entries"); - return 0; -} - -#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')" - -static void sql_maint_aclk_sync_database_all(void) -{ - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } -} - static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { char uuid_str[UUID_STR_LEN]; @@ -339,7 +244,7 @@ static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_un RRDHOST *host = rrdhost_find_by_guid(uuid_str); if (host != localhost) - sql_create_aclk_table(host, (nd_uuid_t *) argv[0], (nd_uuid_t *) argv[1]); + create_aclk_config(host, (nd_uuid_t *)argv[0], (nd_uuid_t *)argv[1]); return 0; } @@ -356,16 +261,7 @@ static void timer_cb(uv_timer_t *handle) uv_stop(handle->loop); uv_update_time(handle->loop); - struct aclk_sync_config_s *config = handle->data; - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - - if (config->cleanup_after < now_realtime_sec()) { - cmd.opcode = ACLK_DATABASE_CLEANUP; - aclk_database_enq_cmd(&cmd); - config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; - } - + struct aclk_database_cmd cmd = { 0 }; if (aclk_connected) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; aclk_database_enq_cmd(&cmd); @@ -373,7 +269,7 @@ static void timer_cb(uv_timer_t *handle) } } -static void aclk_synchronization(void *arg __maybe_unused) +static void aclk_synchronization(void *arg) { struct aclk_sync_config_s *config = arg; uv_thread_set_name_np("ACLKSYNC"); @@ -381,14 +277,9 @@ static void aclk_synchronization(void *arg __maybe_unused) service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); - worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); - worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint"); - worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); - worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); uv_loop_t *loop = &config->loop; @@ -401,9 +292,10 @@ static void aclk_synchronization(void *arg __maybe_unused) netdata_log_info("Starting ACLK synchronization thread"); - config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST; config->initialized = true; + sql_delete_aclk_table_list(); + while (likely(service_running(SERVICE_ACLKSYNC))) { enum aclk_database_opcode opcode; worker_is_idle(); @@ -422,26 +314,17 @@ static void aclk_synchronization(void *arg __maybe_unused) worker_is_busy(opcode); switch (opcode) { + default: case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ break; -// MAINTENANCE - case ACLK_DATABASE_CLEANUP: - // Scan all aclk_alert_ tables and cleanup as needed - sql_maint_aclk_sync_database_all(); - sql_check_aclk_table_list(); - break; - - case ACLK_DATABASE_DELETE_HOST: - sql_delete_aclk_table_list(cmd.param[0]); - break; // NODE STATE case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; struct aclk_sync_cfg_t *ahc = host->aclk_config; if (unlikely(!ahc)) - sql_create_aclk_table(host, &host->host_uuid, host->node_id); + create_aclk_config(host, &host->host_uuid, host->node_id); aclk_host_state_update(host, live, 1); break; case ACLK_DATABASE_NODE_UNREGISTER: @@ -455,17 +338,7 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_PUSH_ALERT: aclk_push_alert_events_for_all_hosts(); break; - case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; - aclk_push_alert_snapshot_event(cmd.param[0]); - break; - case ACLK_DATABASE_QUEUE_REMOVED_ALERTS: - sql_process_queue_removed_alerts_to_aclk(cmd.param[0]); - break; - default: - break; } - if (cmd.completion) - completion_mark_complete(cmd.completion); } while (opcode != ACLK_DATABASE_NOOP); } @@ -489,39 +362,11 @@ static void aclk_synchronization_init(void) // ------------------------------------------------------------- -void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused) +void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK - char uuid_str[UUID_STR_LEN]; - char host_guid[UUID_STR_LEN]; - int rc; - - uuid_unparse_lower_fix(host_uuid, uuid_str); - uuid_unparse_lower(*host_uuid, host_guid); - - char sql[ACLK_SYNC_QUERY_SIZE]; - - snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid); - else { - snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report( - "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid); - - snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report( - "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); - } - if (likely(host) && unlikely(host->aclk_config)) - return; - if (unlikely(!host)) + if (!host || host->aclk_config) return; struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t)); @@ -535,8 +380,7 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __ } wc->host = host; - strcpy(wc->uuid_str, uuid_str); - wc->alert_updates = 0; + wc->stream_alerts = false; time_t now = now_realtime_sec(); wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; #endif @@ -579,7 +423,7 @@ void sql_aclk_sync_init(void) if (!number_of_children) aclk_queue_node_info(localhost, true); - rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg); + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES, aclk_config_parameters, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg); @@ -591,15 +435,12 @@ void sql_aclk_sync_init(void) #endif } -// Public - static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1) { struct aclk_database_cmd cmd; cmd.opcode = opcode; cmd.param[0] = (void *) param0; cmd.param[1] = (void *) param1; - cmd.completion = NULL; aclk_database_enq_cmd(&cmd); } @@ -612,35 +453,12 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash) queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash)); } -void aclk_push_node_alert_snapshot(const char *node_id) -{ - if (unlikely(!aclk_sync_config.initialized)) - return; - - queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL); -} - - -void aclk_push_node_removed_alerts(const char *node_id) -{ - if (unlikely(!aclk_sync_config.initialized)) - return; - - queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL); -} - void schedule_node_info_update(RRDHOST *host __maybe_unused) { #ifdef ENABLE_ACLK if (unlikely(!host)) return; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_NODE_STATE; - cmd.param[0] = host; - cmd.completion = NULL; - aclk_database_enq_cmd(&cmd); + queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL); #endif } @@ -649,12 +467,6 @@ void unregister_node(const char *machine_guid) { if (unlikely(!machine_guid)) return; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER; - cmd.param[0] = strdupz(machine_guid); - cmd.completion = NULL; - aclk_database_enq_cmd(&cmd); + queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL); } #endif diff --git a/src/database/sqlite/sqlite_aclk.h b/src/database/sqlite/sqlite_aclk.h index ce9fed84..ec8cfa9d 100644 --- a/src/database/sqlite/sqlite_aclk.h +++ b/src/database/sqlite/sqlite_aclk.h @@ -3,21 +3,9 @@ #ifndef NETDATA_SQLITE_ACLK_H #define NETDATA_SQLITE_ACLK_H -#define ACLK_MAX_ALERT_UPDATES "5" -#define ACLK_DATABASE_CLEANUP_FIRST (1200) -#define ACLK_DATABASE_CLEANUP_INTERVAL (3600) -#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400) +#define ACLK_MAX_ALERT_UPDATES "50" #define ACLK_SYNC_QUERY_SIZE 512 -static inline void uuid_unparse_lower_fix(nd_uuid_t *uuid, char *out) -{ - uuid_unparse_lower(*uuid, out); - out[8] = '_'; - out[13] = '_'; - out[18] = '_'; - out[23] = '_'; -} - static inline int uuid_parse_fix(char *in, nd_uuid_t uuid) { in[8] = '-'; @@ -32,25 +20,11 @@ static inline int claimed() return localhost->aclk_state.claimed_id != NULL; } -#define TABLE_ACLK_ALERT \ - "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ - "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ - "UNIQUE(alert_unique_id))" - -#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id)" -#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted)" - enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, - - ACLK_DATABASE_CLEANUP, - ACLK_DATABASE_DELETE_HOST, ACLK_DATABASE_NODE_STATE, ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT_CONFIG, - ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, - ACLK_DATABASE_PUSH_ALERT_CHECKPOINT, - ACLK_DATABASE_QUEUE_REMOVED_ALERTS, ACLK_DATABASE_NODE_UNREGISTER, ACLK_DATABASE_TIMER, @@ -62,29 +36,25 @@ enum aclk_database_opcode { struct aclk_database_cmd { enum aclk_database_opcode opcode; void *param[2]; - struct completion *completion; struct aclk_database_cmd *prev, *next; }; typedef struct aclk_sync_cfg_t { RRDHOST *host; - int alert_updates; - int alert_checkpoint_req; - int alert_queue_removed; + int8_t send_snapshot; + bool stream_alerts; + int alert_count; + int snapshot_count; + int checkpoint_count; time_t node_info_send_time; time_t node_collectors_send; - char uuid_str[UUID_STR_LEN]; char node_id[UUID_STR_LEN]; char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested - uint64_t alerts_log_first_sequence_id; - uint64_t alerts_log_last_sequence_id; } aclk_sync_cfg_t; -void sql_create_aclk_table(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id); +void create_aclk_config(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id); void sql_aclk_sync_init(void); void aclk_push_alert_config(const char *node_id, const char *config_hash); -void aclk_push_node_alert_snapshot(const char *node_id); -void aclk_push_node_removed_alerts(const char *node_id); void schedule_node_info_update(RRDHOST *host); #ifdef ENABLE_ACLK void unregister_node(const char *machine_guid); diff --git a/src/database/sqlite/sqlite_aclk_alert.c b/src/database/sqlite/sqlite_aclk_alert.c index 0982d32b..3e707616 100644 --- a/src/database/sqlite/sqlite_aclk_alert.c +++ b/src/database/sqlite/sqlite_aclk_alert.c @@ -5,7 +5,6 @@ #ifdef ENABLE_ACLK #include "../../aclk/aclk_alarm_api.h" -#endif #define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \ ({ \ @@ -13,33 +12,6 @@ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \ }) -#define SQL_UPDATE_FILTERED_ALERT \ - "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \ - "WHERE filtered_alert_unique_id = @old_alert" - -static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str) -{ - sqlite3_stmt *res = NULL; - - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str); - - if (!PREPARE_STATEMENT(db_meta, sql, &res)) - return; - - int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, ae->unique_id)); - SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); - - param = 0; - if (likely(sqlite3_step_monitored(res) == SQLITE_DONE)) - ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; - -done: - REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); -} - #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \ "SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \ "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \ @@ -47,9 +19,9 @@ done: static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uuid_t *host_id) { - sqlite3_stmt *res = NULL; + static __thread sqlite3_stmt *res = NULL; - if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res)) + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, &res)) return false; bool ret = false; @@ -63,115 +35,141 @@ static inline bool is_event_from_alert_variable_config(int64_t unique_id, nd_uui done: REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); + SQLITE_RESET(res); return ret; } #define MAX_REMOVED_PERIOD 604800 //a week -//decide if some events should be sent or not -#define SQL_SELECT_ALERT_BY_ID \ - "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \ - "AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \ - "ORDER BY hld.rowid DESC LIMIT 1" +#define SQL_UPDATE_ALERT_VERSION_TRANSITION \ + "UPDATE alert_version SET unique_id = @unique_id WHERE health_log_id = @health_log_id" -static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) +static void update_alert_version_transition(int64_t health_log_id, int64_t unique_id) { - sqlite3_stmt *res = NULL; + static __thread sqlite3_stmt *res = NULL; - if (ae->new_status == RRDCALC_STATUS_UNINITIALIZED || - (ae->new_status == RRDCALC_STATUS_REMOVED && - !(ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) - return 0; + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION_TRANSITION, &res)) + return; - if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config)) - return 0; + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); + + param = 0; + int rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to update alert_version to latest transition"); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_RESET(res); +} - char sql[ACLK_SYNC_QUERY_SIZE]; +//decide if some events should be sent or not + +#define SQL_SELECT_LAST_ALERT_STATUS "SELECT status FROM alert_version WHERE health_log_id = @health_log_id " - //get the previous sent event of this alarm_id - //base the search on the last filtered event - snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str); +static bool cloud_status_matches(int64_t health_log_id, RRDCALC_STATUS status) +{ + static __thread sqlite3_stmt *res = NULL; - if (!PREPARE_STATEMENT(db_meta, sql, &res)) + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_SELECT_LAST_ALERT_STATUS, &res)) return true; bool send = false; int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); - SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, (int) ae->alarm_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, health_log_id)); param = 0; int rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { - nd_uuid_t config_hash_id; - RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); - - if (sqlite3_column_type(res, 1) != SQLITE_NULL) - uuid_copy(config_hash_id, *((nd_uuid_t *)sqlite3_column_blob(res, 1))); - - int64_t unique_id = sqlite3_column_int64(res, 2); - - if (ae->new_status != (RRDCALC_STATUS)status || !uuid_eq(ae->config_hash_id, config_hash_id)) - send = true; - else - update_filtered(ae, unique_id, host->aclk_config->uuid_str); - } else - send = true; + RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); + send = (current_status == status); + } done: REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); + SQLITE_RESET(res); return send; } #define SQL_QUEUE_ALERT_TO_CLOUD \ - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING" - -void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) + "INSERT INTO aclk_queue (host_id, health_log_id, unique_id, date_created)" \ + " VALUES (@host_id, @health_log_id, @unique_id, UNIXEPOCH())" \ + " ON CONFLICT(host_id, health_log_id) DO UPDATE SET unique_id=excluded.unique_id, " \ + " date_created=excluded.date_created" + +// +// Attempt to insert an alert to the submit queue to reach the cloud +// +// The alert will NOT be added in the submit queue if +// - Cloud is already aware of the alert status +// - The transition refers to a variable +// +static int insert_alert_to_submit_queue(RRDHOST *host, int64_t health_log_id, uint32_t unique_id, RRDCALC_STATUS status) { - sqlite3_stmt *res = NULL; - char sql[ACLK_SYNC_QUERY_SIZE]; + static __thread sqlite3_stmt *res = NULL; - if (!service_running(SERVICE_ACLK)) - return; + if (cloud_status_matches(health_log_id, status)) { + update_alert_version_transition(health_log_id, unique_id); + return 1; + } - if (!claimed() || ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) - return; + if (is_event_from_alert_variable_config(unique_id, &host->host_uuid)) + return 2; - if (false == skip_filter && !should_send_to_cloud(host, ae)) - return; + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_QUEUE_ALERT_TO_CLOUD, &res)) + return -1; - if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) - return; + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (int64_t) unique_id)); - snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str); + param = 0; + int rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to insert alert in the submit queue %"PRIu32", rc = %d", unique_id, rc); - if (!PREPARE_STATEMENT(db_meta, sql, &res)) - return; +done: + REPORT_BIND_FAIL(res, param); + SQLITE_RESET(res); + return 0; +} + +#define SQL_DELETE_QUEUE_ALERT_TO_CLOUD \ + "DELETE FROM aclk_queue WHERE host_id = @host_id AND sequence_id BETWEEN @seq1 AND @seq2" + +// +// Delete a range of alerts from the submit queue (after being sent to the the cloud) +// +static int delete_alert_from_submit_queue(RRDHOST *host, int64_t first_seq_id, int64_t last_seq_id) +{ + static __thread sqlite3_stmt *res = NULL; + + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_QUEUE_ALERT_TO_CLOUD, &res)) + return -1; int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, ae->unique_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, first_seq_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, last_seq_id)); param = 0; - int rc = execute_insert(res); - if (unlikely(rc == SQLITE_DONE)) { - ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - } else - error_report("Failed to store alert event %"PRIu32", rc = %d", ae->unique_id, rc); + int rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to delete submitted to ACLK"); done: REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); + SQLITE_RESET(res); + return 0; } int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) { -#ifdef ENABLE_ACLK + switch(status) { case RRDCALC_STATUS_REMOVED: return ALARM_STATUS_REMOVED; @@ -191,10 +189,6 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) default: return ALARM_STATUS_UNKNOWN; } -#else - UNUSED(status); - return 1; -#endif } static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) { @@ -219,245 +213,437 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) { return strdupz(ret); } +#define SQL_UPDATE_ALERT_VERSION \ + "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted)" \ + " VALUES (@health_log_id, @unique_id, @status, @version, UNIXEPOCH())" \ + " ON CONFLICT(health_log_id) DO UPDATE SET status = excluded.status, version = excluded.version, " \ + " unique_id=excluded.unique_id, date_submitted=excluded.date_submitted" + +// +// Store a new alert transition along with the version after sending to the cloud +// - Update an existing alert with the updated version, status, transition and date submitted +// +static void sql_update_alert_version(int64_t health_log_id, int64_t unique_id, RRDCALC_STATUS status, uint64_t version) +{ + static __thread sqlite3_stmt *res = NULL; + + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_UPDATE_ALERT_VERSION, &res)) + return; + + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, health_log_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, status)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, version)); + + param = 0; + int rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to execute sql_update_alert_version"); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_RESET(res); +} -static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused) +#define SQL_SELECT_ALERT_TO_DUMMY \ + "SELECT aq.sequence_id, hld.unique_id, hld.when_key, hld.new_status, hld.health_log_id" \ + " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \ + " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \ + " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \ + " ORDER BY aq.sequence_id ASC" + +// +// Check all queued alerts for a host and commit them as if they have been send to the cloud +// this will produce new versions as needed. We need this because we are about to send a +// a snapshot so we can include the latest transition. +// +static void commit_alert_events(RRDHOST *host) { -#ifdef ENABLE_ACLK - int rc; + sqlite3_stmt *res = NULL; - if (unlikely(!wc->alert_updates)) { - nd_log(NDLS_ACCESS, NDLP_NOTICE, - "ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A"); + if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_DUMMY, &res)) return; + + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + + int64_t first_sequence_id = 0; + int64_t last_sequence_id = 0; + + param = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + + last_sequence_id = sqlite3_column_int64(res, 0); + if (first_sequence_id == 0) + first_sequence_id = last_sequence_id; + + int64_t unique_id = sqlite3_column_int(res, 1); + int64_t version = sqlite3_column_int64(res, 2); + RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 3); + int64_t health_log_id = sqlite3_column_int64(res, 4); + + sql_update_alert_version(health_log_id, unique_id, status, version); } + if (first_sequence_id) + delete_alert_from_submit_queue(host, first_sequence_id, last_sequence_id); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_FINALIZE(res); +} + +typedef enum { + SEQUENCE_ID, + UNIQUE_ID, + ALARM_ID, + CONFIG_HASH_ID, + UPDATED_BY_ID, + WHEN_KEY, + DURATION, + NON_CLEAR_DURATION, + FLAGS, + EXEC_RUN_TIMESTAMP, + DELAY_UP_TO_TIMESTAMP, + NAME, + CHART, + EXEC, + RECIPIENT, + SOURCE, + UNITS, + INFO, + EXEC_CODE, + NEW_STATUS, + OLD_STATUS, + DELAY, + NEW_VALUE, + OLD_VALUE, + LAST_REPEAT, + CHART_CONTEXT, + TRANSITION_ID, + ALARM_EVENT_ID, + CHART_NAME, + SUMMARY, + HEALTH_LOG_ID, + VERSION +} HealthLogDetails; + +void health_alarm_log_populate( + struct alarm_log_entry *alarm_log, + sqlite3_stmt *res, + RRDHOST *host, + RRDCALC_STATUS *status) +{ + char old_value_string[100 + 1]; + char new_value_string[100 + 1]; + + RRDCALC_STATUS current_status = (RRDCALC_STATUS)sqlite3_column_int(res, NEW_STATUS); + if (status) + *status = current_status; + + char *source = (char *) sqlite3_column_text(res, SOURCE); + alarm_log->command = source ? health_edit_command_from_source(source) : strdupz("UNKNOWN=0=UNKNOWN"); + + alarm_log->chart = strdupz((char *) sqlite3_column_text(res, CHART)); + alarm_log->name = strdupz((char *) sqlite3_column_text(res, NAME)); + + alarm_log->when = sqlite3_column_int64(res, WHEN_KEY); + + alarm_log->config_hash = sqlite3_uuid_unparse_strdupz(res, CONFIG_HASH_ID); + + alarm_log->utc_offset = host->utc_offset; + alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host)); + alarm_log->exec_path = sqlite3_column_bytes(res, EXEC) ? + strdupz((char *)sqlite3_column_text(res, EXEC)) : + strdupz((char *)string2str(host->health.health_default_exec)); + + alarm_log->conf_source = source ? strdupz(source) : strdupz(""); + + time_t duration = sqlite3_column_int64(res, DURATION); + alarm_log->duration = (duration > 0) ? duration : 0; + + alarm_log->non_clear_duration = sqlite3_column_int64(res, NON_CLEAR_DURATION); + + alarm_log->status = rrdcalc_status_to_proto_enum(current_status); + alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)sqlite3_column_int64(res, OLD_STATUS)); + alarm_log->delay = sqlite3_column_int64(res, DELAY); + alarm_log->delay_up_to_timestamp = sqlite3_column_int64(res, DELAY_UP_TO_TIMESTAMP); + alarm_log->last_repeat = sqlite3_column_int64(res, LAST_REPEAT); + + uint64_t flags = sqlite3_column_int64(res, FLAGS); + char *recipient = (char *) sqlite3_column_text(res, RECIPIENT); + alarm_log->silenced = + ((flags & HEALTH_ENTRY_FLAG_SILENCED) || (recipient && !strncmp(recipient, "silent", 6))) ? 1 : 0; + + double value = sqlite3_column_double(res, NEW_VALUE); + double old_value = sqlite3_column_double(res, OLD_VALUE); + + alarm_log->value_string = + sqlite3_column_type(res, NEW_VALUE) == SQLITE_NULL ? + strdupz((char *)"-") : + strdupz((char *)format_value_and_unit( + new_value_string, 100, value, (char *)sqlite3_column_text(res, UNITS), -1)); + + alarm_log->old_value_string = + sqlite3_column_type(res, OLD_VALUE) == SQLITE_NULL ? + strdupz((char *)"-") : + strdupz((char *)format_value_and_unit( + old_value_string, 100, old_value, (char *)sqlite3_column_text(res, UNITS), -1)); + + alarm_log->value = (!isnan(value)) ? (NETDATA_DOUBLE)value : 0; + alarm_log->old_value = (!isnan(old_value)) ? (NETDATA_DOUBLE)old_value : 0; + + alarm_log->updated = (flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; + alarm_log->rendered_info = sqlite3_text_strdupz_empty(res, INFO); + alarm_log->chart_context = sqlite3_text_strdupz_empty(res, CHART_CONTEXT); + alarm_log->chart_name = sqlite3_text_strdupz_empty(res, CHART_NAME); + + alarm_log->transition_id = sqlite3_uuid_unparse_strdupz(res, TRANSITION_ID); + alarm_log->event_id = sqlite3_column_int64(res, ALARM_EVENT_ID); + alarm_log->version = sqlite3_column_int64(res, VERSION); + + alarm_log->summary = sqlite3_text_strdupz_empty(res, SUMMARY); + + alarm_log->health_log_id = sqlite3_column_int64(res, HEALTH_LOG_ID); + alarm_log->unique_id = sqlite3_column_int64(res, UNIQUE_ID); + alarm_log->alarm_id = sqlite3_column_int64(res, ALARM_ID); + alarm_log->sequence_id = sqlite3_column_int64(res, SEQUENCE_ID); +} + +#define SQL_SELECT_ALERT_TO_PUSH \ + "SELECT aq.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key," \ + " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name," \ + " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status," \ + " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id," \ + " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, hld.when_key" \ + " FROM health_log hl, aclk_queue aq, alert_hash ah, health_log_detail hld" \ + " WHERE hld.unique_id = aq.unique_id AND hl.config_hash_id = ah.hash_id" \ + " AND hl.host_id = @host_id AND aq.host_id = hl.host_id AND hl.health_log_id = hld.health_log_id" \ + " ORDER BY aq.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES + +static void aclk_push_alert_event(RRDHOST *host __maybe_unused) +{ + char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) + if (!claim_id || !host->node_id) return; - if (unlikely(!wc->host)) { + sqlite3_stmt *res = NULL; + + if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ALERT_TO_PUSH, &res)) { freez(claim_id); return; } - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - - sqlite3_stmt *res = NULL; + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); - buffer_sprintf( - sql, - "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " - " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " - " hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, " - " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " - " hld.alarm_event_id, hl.chart_name, hld.summary " - " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " - " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL " - " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " - " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES, - wc->uuid_str); - - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) { - - BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str); - - rc = db_execute(db_meta, buffer_tostring(sql_fix)); - if (unlikely(rc)) - error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); - buffer_free(sql_fix); - - // Try again - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) { - buffer_free(sql); - freez(claim_id); - return; - } - } + char node_id_str[UUID_STR_LEN]; + uuid_unparse_lower(*host->node_id, node_id_str); - rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id for pushing alert event."); - goto done; - } + struct alarm_log_entry alarm_log; + alarm_log.node_id = node_id_str; + alarm_log.claim_id = claim_id; - uint64_t first_sequence_id = 0; - uint64_t last_sequence_id = 0; + int64_t first_id = 0; + int64_t last_id = 0; + param = 0; + RRDCALC_STATUS status; + struct aclk_sync_cfg_t *wc = host->aclk_config; while (sqlite3_step_monitored(res) == SQLITE_ROW) { - struct alarm_log_entry alarm_log; - char old_value_string[100 + 1]; - char new_value_string[100 + 1]; - - alarm_log.node_id = wc->node_id; - alarm_log.claim_id = claim_id; - alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12)); - alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11)); - alarm_log.when = (time_t) sqlite3_column_int64(res, 5); - alarm_log.config_hash = sqlite3_uuid_unparse_strdupz(res, 3); - alarm_log.utc_offset = wc->host->utc_offset; - alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host)); - alarm_log.exec_path = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : - strdupz((char *)string2str(wc->host->health.health_default_exec)); - alarm_log.conf_source = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : strdupz(""); - - char *edit_command = sqlite3_column_bytes(res, 15) > 0 ? - health_edit_command_from_source((char *)sqlite3_column_text(res, 15)) : - strdupz("UNKNOWN=0=UNKNOWN"); - alarm_log.command = strdupz(edit_command); - - time_t duration = (time_t) sqlite3_column_int64(res, 6); - alarm_log.duration = (duration > 0) ? duration : 0; - alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7); - alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 19)); - alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20)); - alarm_log.delay = (int) sqlite3_column_int(res, 21); - alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10); - alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 24); - alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) || - (sqlite3_column_type(res, 14) != SQLITE_NULL && - !strncmp((char *)sqlite3_column_text(res, 14), "silent", 6))) ? - 1 : - 0; - alarm_log.value_string = - sqlite3_column_type(res, 22) == SQLITE_NULL ? - strdupz((char *)"-") : - strdupz((char *)format_value_and_unit( - new_value_string, 100, sqlite3_column_double(res, 22), (char *)sqlite3_column_text(res, 16), -1)); - alarm_log.old_value_string = - sqlite3_column_type(res, 23) == SQLITE_NULL ? - strdupz((char *)"-") : - strdupz((char *)format_value_and_unit( - old_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 16), -1)); - alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 22); - alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23); - alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log.rendered_info = sqlite3_text_strdupz_empty(res, 17); - alarm_log.chart_context = sqlite3_text_strdupz_empty(res, 25); - alarm_log.transition_id = sqlite3_uuid_unparse_strdupz(res, 26); - alarm_log.event_id = (time_t) sqlite3_column_int64(res, 27); - alarm_log.chart_name = sqlite3_text_strdupz_empty(res, 28); - alarm_log.summary = sqlite3_text_strdupz_empty(res, 29); - + health_alarm_log_populate(&alarm_log, res, host, &status); aclk_send_alarm_log_entry(&alarm_log); + wc->alert_count++; - if (first_sequence_id == 0) - first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + last_id = alarm_log.sequence_id; + if (first_id == 0) + first_id = last_id; - if (wc->alerts_log_first_sequence_id == 0) - wc->alerts_log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); - - last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); - wc->alerts_log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + sql_update_alert_version(alarm_log.health_log_id, alarm_log.unique_id, status, alarm_log.version); destroy_alarm_log_entry(&alarm_log); - freez(edit_command); } - if (first_sequence_id) { - buffer_flush(sql); - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " - "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64, - wc->uuid_str, - first_sequence_id, - last_sequence_id); - - if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) - error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); - + if (first_id) { + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "ACLK RES [%s (%s)]: ALERTS SENT from %ld - %ld", + node_id_str, + rrdhost_hostname(host), + first_id, + last_id); + + delete_alert_from_submit_queue(host, first_id, last_id); // Mark to do one more check - rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - - } else { - if (wc->alerts_log_first_sequence_id) - nd_log(NDLS_ACCESS, NDLP_DEBUG, - "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A", - wc->alerts_log_first_sequence_id, - wc->alerts_log_last_sequence_id); - wc->alerts_log_first_sequence_id = 0; - wc->alerts_log_last_sequence_id = 0; + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } done: + REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); freez(claim_id); - buffer_free(sql); -#endif } -void aclk_push_alert_events_for_all_hosts(void) +#define SQL_DELETE_PROCESSED_ROWS \ + "DELETE FROM alert_queue WHERE host_id = @host_id AND rowid between @row1 AND @row2" + +static void delete_alert_from_pending_queue(RRDHOST *host, int64_t row1, int64_t row2) { - RRDHOST *host; + static __thread sqlite3_stmt *res = NULL; - dfe_start_reentrant(rrdhost_root_index, host) { - if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || - !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) - continue; + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_DELETE_PROCESSED_ROWS, &res)) + return; - rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row1)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, row2)); - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (likely(wc)) - aclk_push_alert_event(wc); - } - dfe_done(host); + param = 0; + int rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to delete processed rows, rc = %d", rc); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_RESET(res); } -void sql_queue_existing_alerts_to_aclk(RRDHOST *host) +#define SQL_REBUILD_HOST_ALERT_VERSION_TABLE \ + "INSERT INTO alert_version (health_log_id, unique_id, status, version, date_submitted) " \ + " SELECT hl.health_log_id, hld.unique_id, hld.new_status, hld.when_key, UNIXEPOCH() " \ + " FROM health_log hl, health_log_detail hld WHERE " \ + " hl.host_id = @host_id AND hld.health_log_id = hl.health_log_id AND hld.transition_id = hl.last_transition_id" + +#define SQL_DELETE_HOST_ALERT_VERSION_TABLE \ + "DELETE FROM alert_version WHERE health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id)" + +void rebuild_host_alert_version_table(RRDHOST *host) { sqlite3_stmt *res = NULL; - int rc; - struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_HOST_ALERT_VERSION_TABLE, &res)) + return; - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); - rw_spinlock_write_lock(&host->health_log.spinlock); + param = 0; + int rc = execute_insert(res); + if (rc != SQLITE_DONE) { + netdata_log_error("Failed to delete the host alert version table"); + goto done; + } - buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) - goto skip; + SQLITE_FINALIZE(res); + if (!PREPARE_STATEMENT(db_meta, SQL_REBUILD_HOST_ALERT_VERSION_TABLE, &res)) + return; + + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + + param = 0; + rc = execute_insert(res); + if (rc != SQLITE_DONE) + netdata_log_error("Failed to rebuild the host alert version table"); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_FINALIZE(res); +} - buffer_flush(sql); +#define SQL_PROCESS_ALERT_PENDING_QUEUE \ + "SELECT health_log_id, unique_id, status, rowid" \ + " FROM alert_queue WHERE host_id = @host_id AND date_scheduled <= UNIXEPOCH() ORDER BY rowid ASC" - buffer_sprintf( - sql, - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " - "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl " - "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL " - "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING", - wc->uuid_str); +bool process_alert_pending_queue(RRDHOST *host) +{ + static __thread sqlite3_stmt *res = NULL; - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) - goto skip; + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_PROCESS_ALERT_PENDING_QUEUE, &res)) + return false; int param = 0; + int added =0, count = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to queue existing alerts, rc = %d", rc); - else - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + int64_t start_row = 0; + int64_t end_row = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + + int64_t health_log_id = sqlite3_column_int64(res, 0); + uint32_t unique_id = sqlite3_column_int64(res, 1); + RRDCALC_STATUS new_status = sqlite3_column_int(res, 2); + int64_t row = sqlite3_column_int64(res, 3); + + if (host->aclk_config) { + int ret = insert_alert_to_submit_queue(host, health_log_id, unique_id, new_status); + if (ret == 0) + added++; + } + + if (!start_row) + start_row = row; + end_row = row; + + count++; + } + if (start_row) + delete_alert_from_pending_queue(host, start_row, end_row); + + if(count) + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Processed %d entries, queued %d", rrdhost_hostname(host), count, added); done: REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); + SQLITE_RESET(res); + return added > 0; +} + +void aclk_push_alert_events_for_all_hosts(void) +{ + RRDHOST *host; -skip: - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); + // Checking if we shutting down + if (!service_running(SERVICE_ACLK)) + return; + + dfe_start_reentrant(rrdhost_root_index, host) { + if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) || + rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) + continue; + + rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!wc || false == wc->stream_alerts || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { + (void)process_alert_pending_queue(host); + commit_alert_events(host); + continue; + } + + if (wc->send_snapshot) { + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + if (wc->send_snapshot == 1) + continue; + (void)process_alert_pending_queue(host); + commit_alert_events(host); + rebuild_host_alert_version_table(host); + send_alert_snapshot_to_cloud(host); + wc->snapshot_count++; + wc->send_snapshot = 0; + } + else + aclk_push_alert_event(host); + } + dfe_done(host); } -void aclk_send_alarm_configuration(char *config_hash) +void aclk_send_alert_configuration(char *config_hash) { if (unlikely(!config_hash)) return; @@ -484,7 +670,6 @@ void aclk_send_alarm_configuration(char *config_hash) void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { -#ifdef ENABLE_ACLK sqlite3_stmt *res = NULL; struct aclk_sync_cfg_t *wc; @@ -586,91 +771,60 @@ done: SQLITE_FINALIZE(res); freez(config_hash); freez(node_id); -#endif } +#define SQL_ALERT_VERSION_CALC \ + "SELECT SUM(version) FROM health_log hl, alert_version av" \ + " WHERE hl.host_id = @host_uuid AND hl.health_log_id = av.health_log_id AND av.status <> -2" -// Start streaming alerts -void aclk_start_alert_streaming(char *node_id, bool resets) -{ - nd_uuid_t node_uuid; - - if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) - return; - - struct aclk_sync_cfg_t *wc; - - RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host || !(wc = host->aclk_config))) - return; - - if (unlikely(!host->health.health_enabled)) { - nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); - return; - } - - if (resets) { - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); - sql_queue_existing_alerts_to_aclk(host); - } else - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); - - wc->alert_updates = 1; - wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; -} - -#define SQL_QUEUE_REMOVE_ALERTS \ - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ - "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ - "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \ - "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \ - "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \ - "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING" - -void sql_process_queue_removed_alerts_to_aclk(char *node_id) +static uint64_t calculate_node_alert_version(RRDHOST *host) { - struct aclk_sync_cfg_t *wc; - RRDHOST *host = find_host_by_node_id(node_id); - freez(node_id); - - if (unlikely(!host || !(wc = host->aclk_config))) - return; - - sqlite3_stmt *res = NULL; - - CLEAN_BUFFER *wb = buffer_create(1024, NULL); // Note buffer auto free on function return - buffer_sprintf(wb, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str); + static __thread sqlite3_stmt *res = NULL; - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(wb), &res)) - return; + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_ALERT_VERSION_CALC, &res)) + return 0; + uint64_t version = 0; int param = 0; SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); param = 0; - int rc = execute_insert(res); - if (likely(rc == SQLITE_DONE)) { - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - wc->alert_queue_removed = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + version = (uint64_t)sqlite3_column_int64(res, 0); } done: REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); + SQLITE_RESET(res); + return version; } -void sql_queue_removed_alerts_to_aclk(RRDHOST *host) +static void schedule_alert_snapshot_if_needed(struct aclk_sync_cfg_t *wc, uint64_t cloud_version) { - if (unlikely(!host->aclk_config || !claimed() || !host->node_id)) - return; - - char node_id[UUID_STR_LEN]; - uuid_unparse_lower(*host->node_id, node_id); + uint64_t local_version = calculate_node_alert_version(wc->host); + if (local_version != cloud_version) { + nd_log( + NDLS_ACCESS, + NDLP_NOTICE, + "Scheduling alert snapshot for host \"%s\", node \"%s\" (version: cloud %zu, local %zu)", + rrdhost_hostname(wc->host), + wc->node_id, + cloud_version, + local_version); - aclk_push_node_removed_alerts(node_id); + wc->send_snapshot = 1; + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + } + else + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "Alert check on \"%s\", node \"%s\" (version: cloud %zu, local %zu)", + rrdhost_hostname(wc->host), + wc->node_id, + cloud_version, + local_version); + wc->checkpoint_count++; } void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) @@ -699,371 +853,202 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unus wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); - aclk_push_node_alert_snapshot(node_id); + wc->send_snapshot = 1; + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } -#ifdef ENABLE_ACLK -void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host) -{ - char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN"); - char config_hash_id[UUID_STR_LEN]; - uuid_unparse_lower(ae->config_hash_id, config_hash_id); - char transition_id[UUID_STR_LEN]; - uuid_unparse_lower(ae->transition_id, transition_id); - - alarm_log->chart = strdupz(ae_chart_id(ae)); - alarm_log->name = strdupz(ae_name(ae)); - - alarm_log->when = ae->when; - - alarm_log->config_hash = strdupz((char *)config_hash_id); - - alarm_log->utc_offset = host->utc_offset; - alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host)); - alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec)); - alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)""); +#define SQL_COUNT_SNAPSHOT_ENTRIES \ + "SELECT COUNT(1) FROM alert_version av, health_log hl " \ + "WHERE hl.host_id = @host_id AND hl.health_log_id = av.health_log_id AND av.status <> -2" - alarm_log->command = strdupz((char *)edit_command); - - alarm_log->duration = (time_t)ae->duration; - alarm_log->non_clear_duration = (time_t)ae->non_clear_duration; - alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status); - alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status); - alarm_log->delay = ae->delay; - alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp; - alarm_log->last_repeat = (time_t)ae->last_repeat; - - alarm_log->silenced = - ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ? - 1 : - 0; +static int calculate_alert_snapshot_entries(nd_uuid_t *host_uuid) +{ + int count = 0; - alarm_log->value_string = strdupz(ae_new_value_string(ae)); - alarm_log->old_value_string = strdupz(ae_old_value_string(ae)); + sqlite3_stmt *res = NULL; - alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0; - alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0; + if (!PREPARE_STATEMENT(db_meta, SQL_COUNT_SNAPSHOT_ENTRIES, &res)) + return 0; - alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log->rendered_info = strdupz(ae_info(ae)); - alarm_log->chart_context = strdupz(ae_chart_context(ae)); - alarm_log->chart_name = strdupz(ae_chart_name(ae)); + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC)); - alarm_log->transition_id = strdupz((char *)transition_id); - alarm_log->event_id = (uint64_t) ae->alarm_event_id; + param = 0; + int rc = sqlite3_step_monitored(res); + if (rc == SQLITE_ROW) + count = sqlite3_column_int(res, 0); + else + error_report("Failed to select snapshot count"); - alarm_log->summary = strdupz(ae_summary(ae)); +done: + REPORT_BIND_FAIL(res, param); + SQLITE_FINALIZE(res); - freez(edit_command); + return count; } -#endif - -#ifdef ENABLE_ACLK -static bool have_recent_alarm_unsafe(RRDHOST *host, int64_t alarm_id, int64_t mark) -{ - ALARM_ENTRY *ae = host->health_log.alarms; - - while (ae) { - if (ae->alarm_id == alarm_id && ae->unique_id >mark && - (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) - return true; - ae = ae->next; - } - return false; -} -#endif +#define SQL_GET_SNAPSHOT_ENTRIES \ + " SELECT 0, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \ + " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \ + " hl.chart, hl.exec, hl.recipient, ah.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \ + " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " \ + " hld.alarm_event_id, hl.chart_name, hld.summary, hld.health_log_id, av.version " \ + " FROM health_log hl, alert_hash ah, health_log_detail hld, alert_version av " \ + " WHERE hl.config_hash_id = ah.hash_id" \ + " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \ + " AND hld.health_log_id = av.health_log_id AND av.unique_id = hld.unique_id AND av.status <> -2" #define ALARM_EVENTS_PER_CHUNK 1000 -void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) +void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused) { -#ifdef ENABLE_ACLK - RRDHOST *host = find_host_by_node_id(node_id); + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!host)) { - nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id); - freez(node_id); + nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", wc->node_id); return; } - freez(node_id); - struct aclk_sync_cfg_t *wc = host->aclk_config; - - // we perhaps we don't need this for snapshots - if (unlikely(!wc->alert_updates)) { - nd_log(NDLS_ACCESS, NDLP_NOTICE, - "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A"); + char *claim_id = get_agent_claimid(); + if (unlikely(!claim_id)) return; - } - if (unlikely(!wc->alerts_snapshot_uuid)) + // Check database for this node to see how many alerts we will need to put in the snapshot + int cnt = calculate_alert_snapshot_entries(&host->host_uuid); + if (!cnt) { + freez(claim_id); return; + } - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) + sqlite3_stmt *res = NULL; + if (!PREPARE_STATEMENT(db_meta, SQL_GET_SNAPSHOT_ENTRIES, &res)) return; - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); - - uint32_t cnt = 0; + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); - rw_spinlock_read_lock(&host->health_log.spinlock); + nd_uuid_t local_snapshot_uuid; + char snapshot_uuid_str[UUID_STR_LEN]; + uuid_generate_random(local_snapshot_uuid); + uuid_unparse_lower(local_snapshot_uuid, snapshot_uuid_str); + char *snapshot_uuid = &snapshot_uuid_str[0]; - ALARM_ENTRY *ae = host->health_log.alarms; + nd_log(NDLS_ACCESS, NDLP_DEBUG, + "ACLK REQ [%s (%s)]: Sending %d alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(host), + cnt, snapshot_uuid); - for (; ae; ae = ae->next) { - if (likely(ae->updated_by_id)) - continue; + uint32_t chunks; + chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); - if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) - continue; + alarm_snapshot_proto_ptr_t snapshot_proto = NULL; + struct alarm_snapshot alarm_snap; + struct alarm_log_entry alarm_log; - if (have_recent_alarm_unsafe(host, ae->alarm_id, ae->unique_id)) - continue; + alarm_snap.node_id = wc->node_id; + alarm_snap.claim_id = claim_id; + alarm_snap.snapshot_uuid = snapshot_uuid; + alarm_snap.chunks = chunks; + alarm_snap.chunk = 1; - if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) - continue; + alarm_log.node_id = wc->node_id; + alarm_log.claim_id = claim_id; + cnt = 0; + param = 0; + uint64_t version = 0; + int total_count = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { cnt++; - } - - if (cnt) { - uint32_t chunks; - - chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); - ae = host->health_log.alarms; - - cnt = 0; - struct alarm_snapshot alarm_snap; - alarm_snap.node_id = wc->node_id; - alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; - alarm_snap.chunks = chunks; - alarm_snap.chunk = 1; - - alarm_snapshot_proto_ptr_t snapshot_proto = NULL; - - for (; ae; ae = ae->next) { - if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) - continue; - - if (have_recent_alarm_unsafe(host, ae->alarm_id, ae->unique_id)) - continue; - - if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) - continue; - - cnt++; + total_count++; - struct alarm_log_entry alarm_log; - alarm_log.node_id = wc->node_id; - alarm_log.claim_id = claim_id; + if (!snapshot_proto) + snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); - if (!snapshot_proto) - snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); + health_alarm_log_populate(&alarm_log, res, host, NULL); - health_alarm_entry2proto_nolock(&alarm_log, ae, host); - add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log); + add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log); + version += alarm_log.version; - if (cnt == ALARM_EVENTS_PER_CHUNK) { + if (cnt == ALARM_EVENTS_PER_CHUNK) { + if (aclk_connected) aclk_send_alarm_snapshot(snapshot_proto); - cnt = 0; - if (alarm_snap.chunk < chunks) { - alarm_snap.chunk++; - snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); - } + cnt = 0; + if (alarm_snap.chunk < chunks) { + alarm_snap.chunk++; + snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); } - destroy_alarm_log_entry(&alarm_log); } - if (cnt) - aclk_send_alarm_snapshot(snapshot_proto); + destroy_alarm_log_entry(&alarm_log); } + if (cnt) + aclk_send_alarm_snapshot(snapshot_proto); - rw_spinlock_read_unlock(&host->health_log.spinlock); - wc->alerts_snapshot_uuid = NULL; - - freez(claim_id); -#endif -} - -#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period" - -void sql_aclk_alert_clean_dead_entries(RRDHOST *host) -{ - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (unlikely(!wc)) - return; - - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str); - - sqlite3_stmt *res = NULL; - - if (!PREPARE_STATEMENT(db_meta, sql, &res)) - return; - - int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, MAX_REMOVED_PERIOD)); - - param = 0; - int rc = sqlite3_step_monitored(res); - if (rc != SQLITE_DONE) - error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries."); + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "ACLK REQ [%s (%s)]: Sent! %d alerts snapshot, snapshot_uuid %s (version = %zu)", + wc->node_id, + rrdhost_hostname(host), + cnt, + snapshot_uuid, + version); done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); -} -#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ - "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ - "FROM aclk_alert_%s WHERE date_submitted IS NULL" -int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) -{ - - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (!wc) - return 1; - - proto_alert_status->alert_updates = wc->alert_updates; - - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); - - sqlite3_stmt *res = NULL; - if (!PREPARE_STATEMENT(db_meta, sql, &res)) - return 1; - - while (sqlite3_step_monitored(res) == SQLITE_ROW) { - proto_alert_status->pending_min_sequence_id = - sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0; - proto_alert_status->pending_max_sequence_id = - sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_submitted_sequence_id = - sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0; - } - - SQLITE_FINALIZE(res); - - return 0; + freez(claim_id); } -void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) +// Start streaming alerts +void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version) { - if (unlikely(!node_id)) + nd_uuid_t node_uuid; + + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host || !(wc = host->aclk_config))) - nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); - else { - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); - wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; - } -} - -typedef struct active_alerts { - char *name; - char *chart; - RRDCALC_STATUS status; -} active_alerts_t; - -static inline int compare_active_alerts(const void *a, const void *b) -{ - active_alerts_t *active_alerts_a = (active_alerts_t *)a; - active_alerts_t *active_alerts_b = (active_alerts_t *)b; - - if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) { - return strcmp(active_alerts_a->chart, active_alerts_b->chart); - } else - return strcmp(active_alerts_a->name, active_alerts_b->name); -} - -#define BATCH_ALLOCATED 10 -void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) -{ -#ifdef ENABLE_ACLK - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (unlikely(!wc)) { - nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + if (unlikely(!host || !(wc = host->aclk_config))) { + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, invalid node.", node_id); return; } - if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { - //postpone checkpoint send - wc->alert_checkpoint_req += 3; - nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + if (unlikely(!host->health.health_enabled)) { + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } - RRDCALC *rc; - uint32_t cnt = 0; - size_t len = 0; - - active_alerts_t *active_alerts = callocz(BATCH_ALLOCATED, sizeof(active_alerts_t)); - foreach_rrdcalc_in_rrdhost_read(host, rc) { - if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) - continue; + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + schedule_alert_snapshot_if_needed(wc, cloud_version); + wc->stream_alerts = true; +} - if (rc->status == RRDCALC_STATUS_WARNING || - rc->status == RRDCALC_STATUS_CRITICAL) { +// Do checkpoint alert version check +void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version) +{ + nd_uuid_t node_uuid; - if (cnt && !(cnt % BATCH_ALLOCATED)) { - active_alerts = reallocz(active_alerts, (BATCH_ALLOCATED * ((cnt / BATCH_ALLOCATED) + 1)) * sizeof(active_alerts_t)); - } + if (unlikely(!node_id || !claim_id || !claimed() || uuid_parse(node_id, node_uuid))) + return; - active_alerts[cnt].name = (char *)rrdcalc_name(rc); - len += string_strlen(rc->config.name); - active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc); - len += string_strlen(rc->chart); - active_alerts[cnt].status = rc->status; - len++; - cnt++; - } - } - foreach_rrdcalc_in_rrdhost_done(rc); - - BUFFER *alarms_to_hash; - if (cnt) { - qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); - - alarms_to_hash = buffer_create(len, NULL); - for (uint32_t i = 0; i < cnt; i++) { - buffer_strcat(alarms_to_hash, active_alerts[i].name); - buffer_strcat(alarms_to_hash, active_alerts[i].chart); - if (active_alerts[i].status == RRDCALC_STATUS_WARNING) - buffer_fast_strcat(alarms_to_hash, "W", 1); - else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL) - buffer_fast_strcat(alarms_to_hash, "C", 1); - } - } else { - alarms_to_hash = buffer_create(1, NULL); - buffer_strcat(alarms_to_hash, ""); - len = 0; + char *agent_claim_id = get_agent_claimid(); + if (claim_id && agent_claim_id && strcmp(agent_claim_id, claim_id) != 0) { + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED WITH INVALID CLAIM ID", node_id); + goto done; } - freez(active_alerts); - char hash[SHA256_DIGEST_LENGTH + 1]; - if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) { - hash[SHA256_DIGEST_LENGTH] = 0; + struct aclk_sync_cfg_t *wc; + RRDHOST *host = find_host_by_node_id(node_id); - struct alarm_checkpoint alarm_checkpoint; - char *claim_id = get_agent_claimid(); - alarm_checkpoint.claim_id = claim_id; - alarm_checkpoint.node_id = wc->node_id; - alarm_checkpoint.checksum = (char *)hash; + if ((!host || !(wc = host->aclk_config))) + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT VALIDATION REQUEST RECEIVED FOR INVALID NODE", node_id); + else + schedule_alert_snapshot_if_needed(wc, cloud_version); - aclk_send_provide_alarm_checkpoint(&alarm_checkpoint); - freez(claim_id); - nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); - } else - nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); +done: + freez(agent_claim_id); +} - wc->alert_checkpoint_req = 0; - buffer_free(alarms_to_hash); #endif -} diff --git a/src/database/sqlite/sqlite_aclk_alert.h b/src/database/sqlite/sqlite_aclk_alert.h index cfb3468b..17a58154 100644 --- a/src/database/sqlite/sqlite_aclk_alert.h +++ b/src/database/sqlite/sqlite_aclk_alert.h @@ -5,28 +5,14 @@ extern sqlite3 *db_meta; -#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3 -#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4 - -struct proto_alert_status { - int alert_updates; - uint64_t pending_min_sequence_id; - uint64_t pending_max_sequence_id; - uint64_t last_submitted_sequence_id; -}; - -void aclk_send_alarm_configuration (char *config_hash); +void aclk_send_alert_configuration(char *config_hash); void aclk_push_alert_config_event(char *node_id, char *config_hash); -void aclk_start_alert_streaming(char *node_id, bool resets); -void sql_queue_removed_alerts_to_aclk(RRDHOST *host); -void sql_process_queue_removed_alerts_to_aclk(char *node_id); -void aclk_send_alarm_checkpoint(char *node_id, char *claim_id); -void aclk_push_alarm_checkpoint(RRDHOST *host); +void aclk_start_alert_streaming(char *node_id, uint64_t cloud_version); +void aclk_alert_version_check(char *node_id, char *claim_id, uint64_t cloud_version); -void aclk_push_alert_snapshot_event(char *node_id); +void send_alert_snapshot_to_cloud(RRDHOST *host __maybe_unused); void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid); -int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); -void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter); +bool process_alert_pending_queue(RRDHOST *host); void aclk_push_alert_events_for_all_hosts(void); #endif //NETDATA_SQLITE_ACLK_ALERT_H diff --git a/src/database/sqlite/sqlite_aclk_node.c b/src/database/sqlite/sqlite_aclk_node.c index 3134438d..70d1ebda 100644 --- a/src/database/sqlite/sqlite_aclk_node.c +++ b/src/database/sqlite/sqlite_aclk_node.c @@ -90,13 +90,7 @@ static void build_node_info(RRDHOST *host) node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", ""); node_info.data.machine_guid = host->machine_guid; - struct capability node_caps[] = { - {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled}, - {.name = "mc", - .version = host->system_info->mc_version ? host->system_info->mc_version : 0, - .enabled = host->system_info->mc_version ? 1 : 0}, - {.name = NULL, .version = 0, .enabled = 0}}; - node_info.node_capabilities = node_caps; + node_info.node_capabilities = (struct capability *)aclk_get_agent_capas(); node_info.data.ml_info.ml_capable = host->system_info->ml_capable; node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled; diff --git a/src/database/sqlite/sqlite_context.c b/src/database/sqlite/sqlite_context.c index 1e49dd2b..1d0c768e 100644 --- a/src/database/sqlite/sqlite_context.c +++ b/src/database/sqlite/sqlite_context.c @@ -43,7 +43,7 @@ int sql_init_context_database(int memory) return 1; } - errno = 0; + errno_clear(); netdata_log_info("SQLite database %s initialization", sqlite_database); char buf[1024 + 1] = ""; diff --git a/src/database/sqlite/sqlite_db_migration.c b/src/database/sqlite/sqlite_db_migration.c index 88abd849..44a5e97c 100644 --- a/src/database/sqlite/sqlite_db_migration.c +++ b/src/database/sqlite/sqlite_db_migration.c @@ -518,7 +518,7 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name } if (likely(user_version == target_version)) { - errno = 0; + errno_clear(); netdata_log_info("%s database version is %d (no migration needed)", db_name, target_version); return target_version; } diff --git a/src/database/sqlite/sqlite_functions.c b/src/database/sqlite/sqlite_functions.c index 5c18ff8e..e62743f5 100644 --- a/src/database/sqlite/sqlite_functions.c +++ b/src/database/sqlite/sqlite_functions.c @@ -43,7 +43,7 @@ SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt) { return rc; } -static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database) +static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database, int rc) { if (!res && !database) @@ -54,7 +54,7 @@ static bool mark_database_to_recover(sqlite3_stmt *res, sqlite3 *database) if (db_meta == database) { char recover_file[FILENAME_MAX + 1]; - snprintfz(recover_file, FILENAME_MAX, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir); + snprintfz(recover_file, FILENAME_MAX, "%s/.netdata-meta.db.%s", netdata_configured_cache_dir, SQLITE_CORRUPT == rc ? "recover" : "delete" ); int fd = open(recover_file, O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 444); if (fd >= 0) { close(fd); @@ -69,7 +69,7 @@ int execute_insert(sqlite3_stmt *res) int rc; rc = sqlite3_step_monitored(res); if (rc == SQLITE_CORRUPT) { - (void)mark_database_to_recover(res, NULL); + (void)mark_database_to_recover(res, NULL, rc); error_report("SQLite error %d", rc); } return rc; @@ -229,8 +229,8 @@ int init_database_batch(sqlite3 *database, const char *batch[], const char *desc analytics_set_data_str(&analytics_data.netdata_fail_reason, error_str); sqlite3_free(err_msg); freez(error_str); - if (SQLITE_CORRUPT == rc) { - if (mark_database_to_recover(NULL, database)) + if (SQLITE_CORRUPT == rc || SQLITE_NOTADB == rc) { + if (mark_database_to_recover(NULL, database, rc)) error_report("Database is corrupted will attempt to fix"); return SQLITE_CORRUPT; } @@ -263,7 +263,7 @@ int db_execute(sqlite3 *db, const char *cmd) } if (rc == SQLITE_CORRUPT) - mark_database_to_recover(NULL, db); + mark_database_to_recover(NULL, db, rc); break; } return (rc != SQLITE_OK); diff --git a/src/database/sqlite/sqlite_health.c b/src/database/sqlite/sqlite_health.c index 51e38d05..a632fd49 100644 --- a/src/database/sqlite/sqlite_health.c +++ b/src/database/sqlite/sqlite_health.c @@ -55,15 +55,137 @@ done: } /* Health related SQL queries - Inserts an entry in the table + * + * Inserts an entry in the tables + * alert_queue + * health_log + * health_log_detail + * */ +int calculate_delay(RRDCALC_STATUS old_status, RRDCALC_STATUS new_status) +{ + int delay = ALERT_TRANSITION_DELAY_NONE; + switch(old_status) { + case RRDCALC_STATUS_REMOVED: + switch (new_status) { + case RRDCALC_STATUS_UNINITIALIZED: + delay = ALERT_TRANSITION_DELAY_LONG; + break; + case RRDCALC_STATUS_CLEAR: + delay = ALERT_TRANSITION_DELAY_SHORT; + break; + default: + delay = ALERT_TRANSITION_DELAY_NONE; + break; + } + break; + case RRDCALC_STATUS_UNDEFINED: + case RRDCALC_STATUS_UNINITIALIZED: + switch (new_status) { + case RRDCALC_STATUS_REMOVED: + case RRDCALC_STATUS_UNINITIALIZED: + case RRDCALC_STATUS_UNDEFINED: + delay = ALERT_TRANSITION_DELAY_LONG; + break; + case RRDCALC_STATUS_CLEAR: + delay = ALERT_TRANSITION_DELAY_SHORT; + break; + default: + delay = ALERT_TRANSITION_DELAY_NONE; + break; + } + break; + case RRDCALC_STATUS_CLEAR: + switch (new_status) { + case RRDCALC_STATUS_REMOVED: + case RRDCALC_STATUS_UNINITIALIZED: + case RRDCALC_STATUS_UNDEFINED: + delay = ALERT_TRANSITION_DELAY_LONG; + break; + case RRDCALC_STATUS_WARNING: + case RRDCALC_STATUS_CRITICAL: + default: + delay = ALERT_TRANSITION_DELAY_NONE; + break; + + } + break; + case RRDCALC_STATUS_WARNING: + case RRDCALC_STATUS_CRITICAL: + switch (new_status) { + case RRDCALC_STATUS_REMOVED: + case RRDCALC_STATUS_UNINITIALIZED: + case RRDCALC_STATUS_UNDEFINED: + delay = ALERT_TRANSITION_DELAY_LONG; + break; + case RRDCALC_STATUS_CLEAR: + delay = ALERT_TRANSITION_DELAY_SHORT; + break; + default: + delay = ALERT_TRANSITION_DELAY_NONE; + break; + } + break; + default: + delay = ALERT_TRANSITION_DELAY_NONE; + break; + } + return delay; +} + +#ifdef ENABLE_ACLK +#define SQL_INSERT_ALERT_PENDING_QUEUE \ + "INSERT INTO alert_queue (host_id, health_log_id, unique_id, alarm_id, status, date_scheduled)" \ + " VALUES (@host_id, @health_log_id, @unique_id, @alarm_id, @new_status, UNIXEPOCH() + @delay)" \ + " ON CONFLICT (host_id, health_log_id, alarm_id)" \ + " DO UPDATE SET status = excluded.status, unique_id = excluded.unique_id, " \ + " date_scheduled = MIN(date_scheduled, excluded.date_scheduled)" + +static void insert_alert_queue( + RRDHOST *host, + uint64_t health_log_id, + int64_t unique_id, + uint32_t alarm_id, + RRDCALC_STATUS old_status, + RRDCALC_STATUS new_status) +{ + static __thread sqlite3_stmt *res = NULL; + int rc; + + if (!host->aclk_config) + return; + + if (!PREPARE_COMPILED_STATEMENT(db_meta, SQL_INSERT_ALERT_PENDING_QUEUE, &res)) + return; + + int submit_delay = calculate_delay(old_status, new_status); + + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64)health_log_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, unique_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, alarm_id)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, new_status)); + SQLITE_BIND_FAIL(done, sqlite3_bind_int(res, ++param, submit_delay)); + + param = 0; + rc = execute_insert(res); + if (rc != SQLITE_DONE) + error_report( + "HEALTH [%s]: Failed to execute insert_alert_queue, rc = %d", rrdhost_hostname(host), rc); + +done: + REPORT_BIND_FAIL(res, param); + SQLITE_RESET(res); +} +#endif #define SQL_INSERT_HEALTH_LOG_DETAIL \ "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \ "updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \ "info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \ - "VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \ + " VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \ "@non_clear_duration,@flags,@exec_run_timestamp,@delay_up_to_timestamp, @info,@exec_code,@new_status,@old_status," \ "@delay,@new_value,@old_value,@last_repeat,@transition_id,@global_id,@summary)" @@ -150,6 +272,11 @@ static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) if (rc == SQLITE_ROW) { health_log_id = (size_t)sqlite3_column_int64(res, 0); sql_health_alarm_log_insert_detail(host, health_log_id, ae); +#ifdef ENABLE_ACLK + if (netdata_cloud_enabled) + insert_alert_queue( + host, health_log_id, (int64_t)ae->unique_id, (int64_t)ae->alarm_id, ae->old_status, ae->new_status); +#endif } else error_report("HEALTH [%s]: Failed to execute SQL_INSERT_HEALTH_LOG, rc = %d", rrdhost_hostname(host), rc); @@ -162,14 +289,8 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) { if (ae->flags & HEALTH_ENTRY_FLAG_SAVED) sql_health_alarm_log_update(host, ae); - else { + else sql_health_alarm_log_insert(host, ae); -#ifdef ENABLE_ACLK - if (netdata_cloud_enabled) { - sql_queue_alarm_to_aclk(host, ae, false); - } -#endif - } } /* @@ -179,44 +300,18 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) * */ -#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED \ +#define SQL_CLEANUP_HEALTH_LOG_DETAIL \ "DELETE FROM health_log_detail WHERE health_log_id IN " \ - "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \ - "AND updated_by_id <> 0 AND transition_id NOT IN " \ - "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)" - -#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) \ - "DELETE from health_log_detail WHERE unique_id NOT IN " \ - "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \ - "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \ - "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \ - "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \ - "AND when_key < unixepoch() - @history " \ - "AND updated_by_id <> 0 AND transition_id NOT IN " \ - "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)", \ - guid - -void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) { + " (SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \ + " AND updated_by_id <> 0 AND transition_id NOT IN " \ + " (SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)" + +void sql_health_alarm_log_cleanup(RRDHOST *host) +{ sqlite3_stmt *res = NULL; int rc; - char command[MAX_HEALTH_SQL_SIZE + 1]; - - REQUIRE_DB(db_meta); - - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(command, sizeof(command) - 1, "aclk_alert_%s", uuid_str); - bool aclk_table_exists = table_exists_in_database(db_meta, command); - - char *sql = SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED; - - if (claimed && aclk_table_exists) { - snprintfz(command, sizeof(command) - 1, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str)); - sql = command; - } - - if (!PREPARE_STATEMENT(db_meta, sql, &res)) + if (!PREPARE_STATEMENT(db_meta, SQL_CLEANUP_HEALTH_LOG_DETAIL, &res)) return; int param = 0; @@ -228,33 +323,21 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) { if (unlikely(rc != SQLITE_DONE)) error_report("Failed to cleanup health log detail table, rc = %d", rc); - if (aclk_table_exists) - sql_aclk_alert_clean_dead_entries(host); - done: REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); } -#define SQL_INJECT_REMOVED \ - "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \ - "duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \ - "delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \ - "SELECT health_log_id, ?1, ?2, ?3, 0, ?4, UNIXEPOCH(), 0, 0, flags, exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \ - "new_status, delay, NULL, new_value, 0, ?5, NOW_USEC(0), summary FROM health_log_detail WHERE unique_id = ?6 AND transition_id = ?7" - -#define SQL_INJECT_REMOVED_UPDATE_DETAIL \ - "UPDATE health_log_detail SET flags = flags | ?1, updated_by_id = ?2 WHERE unique_id = ?3 AND transition_id = ?4" - -#define SQL_INJECT_REMOVED_UPDATE_LOG \ - "UPDATE health_log SET last_transition_id = ?1 WHERE alarm_id = ?2 AND last_transition_id = ?3 AND host_id = ?4" +#define SQL_UPDATE_TRANSITION_IN_HEALTH_LOG \ + "UPDATE health_log SET last_transition_id = @transition WHERE alarm_id = @alarm_id AND " \ + " last_transition_id = @prev_trans AND host_id = @host_id" -bool sql_update_removed_in_health_log(RRDHOST *host, uint32_t alarm_id, nd_uuid_t *transition_id, nd_uuid_t *last_transition) +bool sql_update_transition_in_health_log(RRDHOST *host, uint32_t alarm_id, nd_uuid_t *transition_id, nd_uuid_t *last_transition) { int rc = 0; sqlite3_stmt *res; - if (!PREPARE_STATEMENT(db_meta, SQL_INJECT_REMOVED_UPDATE_LOG, &res)) + if (!PREPARE_STATEMENT(db_meta, SQL_UPDATE_TRANSITION_IN_HEALTH_LOG, &res)) return false; int param = 0; @@ -275,12 +358,16 @@ done: return (param == 0 && rc == SQLITE_DONE); } -bool sql_update_removed_in_health_log_detail(uint32_t unique_id, uint32_t max_unique_id, nd_uuid_t *prev_transition_id) +#define SQL_SET_UPDATED_BY_IN_HEALTH_LOG_DETAIL \ + "UPDATE health_log_detail SET flags = flags | @flag, updated_by_id = @updated_by WHERE" \ + " unique_id = @unique_id AND transition_id = @transition_id" + +bool sql_set_updated_by_in_health_log_detail(uint32_t unique_id, uint32_t max_unique_id, nd_uuid_t *prev_transition_id) { int rc = 0; sqlite3_stmt *res; - if (!PREPARE_STATEMENT(db_meta, SQL_INJECT_REMOVED_UPDATE_DETAIL, &res)) + if (!PREPARE_STATEMENT(db_meta, SQL_SET_UPDATED_BY_IN_HEALTH_LOG_DETAIL, &res)) return false; int param = 0; @@ -301,7 +388,16 @@ done: return (param == 0 && rc == SQLITE_DONE); } -void sql_inject_removed_status( +#define SQL_INJECT_REMOVED \ + "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \ + "duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \ + "delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \ + "SELECT health_log_id, @max_unique_id, @alarm_id, @alarm_event_id, 0, @unique_id, UNIXEPOCH(), 0, 0, flags, " \ + " exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \ + " new_status, delay, NULL, new_value, 0, @transition_id, NOW_USEC(0), summary FROM health_log_detail " \ + " WHERE unique_id = @unique_id AND transition_id = @last_transition_id RETURNING health_log_id, old_status" + +static void sql_inject_removed_status( RRDHOST *host, uint32_t alarm_id, uint32_t alarm_event_id, @@ -326,19 +422,27 @@ void sql_inject_removed_status( SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) alarm_event_id + 1)); SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) unique_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &transition_id, sizeof(transition_id), SQLITE_STATIC)); - SQLITE_BIND_FAIL(done, sqlite3_bind_int64(res, ++param, (sqlite3_int64) unique_id)); SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, last_transition, sizeof(*last_transition), SQLITE_STATIC)); param = 0; - int rc = execute_insert(res); - if (rc == SQLITE_DONE) { + //int rc = execute_insert(res); + while (sqlite3_step_monitored(res) == SQLITE_ROW) { //update the old entry in health_log_detail - sql_update_removed_in_health_log_detail(unique_id, max_unique_id, last_transition); + sql_set_updated_by_in_health_log_detail(unique_id, max_unique_id, last_transition); //update the old entry in health_log - sql_update_removed_in_health_log(host, alarm_id, &transition_id, last_transition); + sql_update_transition_in_health_log(host, alarm_id, &transition_id, last_transition); + +#ifdef ENABLE_ACLK + if (netdata_cloud_enabled) { + int64_t health_log_id = sqlite3_column_int64(res, 0); + RRDCALC_STATUS old_status = (RRDCALC_STATUS)sqlite3_column_double(res, 1); + insert_alert_queue( + host, health_log_id, (int64_t)unique_id, (int64_t)alarm_id, old_status, RRDCALC_STATUS_REMOVED); + } +#endif } - else - error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED, rc = %d", rc); + //else + // error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED, rc = %d", rc); done: REPORT_BIND_FAIL(res, param); @@ -461,7 +565,7 @@ void sql_alert_cleanup(bool cli) { UNUSED(cli); - errno = 0; + errno_clear(); if (sql_init_meta_database(DB_CHECK_NONE, 0)) { netdata_log_error("Failed to open database"); return; @@ -489,7 +593,6 @@ void sql_alert_cleanup(bool cli) void sql_health_alarm_log_load(RRDHOST *host) { sqlite3_stmt *res = NULL; - int ret; ssize_t errored = 0, loaded = 0; if (!REQUIRE_DB(db_meta)) @@ -500,21 +603,19 @@ void sql_health_alarm_log_load(RRDHOST *host) if (!PREPARE_STATEMENT(db_meta, SQL_LOAD_HEALTH_LOG, &res)) return; - ret = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(ret != SQLITE_OK)) { - error_report("Failed to bind host_id parameter for SQL_LOAD_HEALTH_LOG."); - SQLITE_FINALIZE(res); - return; - } + int param = 0; + SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC)); DICTIONARY *all_rrdcalcs = dictionary_create( DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE); + RRDCALC *rc; foreach_rrdcalc_in_rrdhost_read(host, rc) { dictionary_set(all_rrdcalcs, rrdcalc_name(rc), rc, sizeof(*rc)); } foreach_rrdcalc_in_rrdhost_done(rc); + param = 0; rw_spinlock_read_lock(&host->health_log.spinlock); while (sqlite3_step_monitored(res) == SQLITE_ROW) { @@ -618,8 +719,6 @@ void sql_health_alarm_log_load(RRDHOST *host) ae->summary = SQLITE3_COLUMN_STRINGDUP_OR_NULL(res, 33); char value_string[100 + 1]; - string_freez(ae->old_value_string); - string_freez(ae->new_value_string); ae->old_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->old_value, ae_units(ae), -1)); ae->new_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->new_value, ae_units(ae), -1)); @@ -631,7 +730,6 @@ void sql_health_alarm_log_load(RRDHOST *host) if(unlikely(ae->alarm_id >= host->health_max_alarm_id)) host->health_max_alarm_id = ae->alarm_id; - loaded++; } @@ -650,7 +748,8 @@ void sql_health_alarm_log_load(RRDHOST *host) nd_log(NDLS_DAEMON, errored ? NDLP_WARNING : NDLP_DEBUG, "[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.", rrdhost_hostname(host), loaded, errored); - +done: + REPORT_BIND_FAIL(res, param); SQLITE_FINALIZE(res); } diff --git a/src/database/sqlite/sqlite_health.h b/src/database/sqlite/sqlite_health.h index 99f67a3a..73a85e2b 100644 --- a/src/database/sqlite/sqlite_health.h +++ b/src/database/sqlite/sqlite_health.h @@ -6,14 +6,17 @@ #include "daemon/common.h" #include "sqlite3.h" +#define ALERT_TRANSITION_DELAY_LONG (600) +#define ALERT_TRANSITION_DELAY_SHORT (10) +#define ALERT_TRANSITION_DELAY_NONE (0) + struct sql_alert_transition_data; struct sql_alert_config_data; struct rrd_alert_prototype; void sql_health_alarm_log_load(RRDHOST *host); void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae); -void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed); +void sql_health_alarm_log_cleanup(RRDHOST *host); void sql_alert_store_config(struct rrd_alert_prototype *ap); -void sql_aclk_alert_clean_dead_entries(RRDHOST *host); int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status); void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart); int health_migrate_old_health_log_table(char *table); diff --git a/src/database/sqlite/sqlite_metadata.c b/src/database/sqlite/sqlite_metadata.c index 5573f799..1b801b73 100644 --- a/src/database/sqlite/sqlite_metadata.c +++ b/src/database/sqlite/sqlite_metadata.c @@ -28,6 +28,17 @@ const char *database_config[] = { "CREATE TABLE IF NOT EXISTS chart_label(chart_id blob, source_type int, label_key text, " "label_value text, date_created int, PRIMARY KEY (chart_id, label_key))", + "CREATE TRIGGER IF NOT EXISTS del_chart_label AFTER DELETE ON chart " + "BEGIN DELETE FROM chart_label WHERE chart_id = old.chart_id; END", + + "CREATE TRIGGER IF NOT EXISTS del_chart " + "AFTER DELETE ON dimension " + "FOR EACH ROW " + "BEGIN" + " DELETE FROM chart WHERE chart_id = OLD.chart_id " + " AND NOT EXISTS (SELECT 1 FROM dimension WHERE chart_id = OLD.chart_id);" + "END", + "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created)", "CREATE TABLE IF NOT EXISTS alert_hash(hash_id blob PRIMARY KEY, date_updated int, alarm text, template text, " @@ -67,6 +78,18 @@ const char *database_config[] = { "CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id)", "CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id)", +#ifdef ENABLE_ACLK + "CREATE TABLE IF NOT EXISTS alert_queue " + " (host_id BLOB, health_log_id INT, unique_id INT, alarm_id INT, status INT, date_scheduled INT, " + " UNIQUE(host_id, health_log_id, alarm_id))", + + "CREATE TABLE IF NOT EXISTS alert_version (health_log_id INTEGER PRIMARY KEY, unique_id INT, status INT, " + "version INT, date_submitted INT)", + + "CREATE TABLE IF NOT EXISTS aclk_queue (sequence_id INTEGER PRIMARY KEY, host_id blob, health_log_id INT, " + "unique_id INT, date_created INT, UNIQUE(host_id, health_log_id))", +#endif + NULL }; @@ -251,7 +274,7 @@ static inline void set_host_node_id(RRDHOST *host, nd_uuid_t *node_id) } if (unlikely(!wc)) - sql_create_aclk_table(host, &host->host_uuid, node_id); + create_aclk_config(host, &host->host_uuid, node_id); else uuid_unparse_lower(*node_id, wc->node_id); } @@ -668,6 +691,18 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory) if (rebuild & DB_CHECK_RECOVER) return 0; } + + snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/.netdata-meta.db.delete", netdata_configured_cache_dir); + rc = unlink(sqlite_database); + snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir); + if (rc == 0) { + char new_sqlite_database[FILENAME_MAX + 1]; + snprintfz(new_sqlite_database, sizeof(new_sqlite_database) - 1, "%s/netdata-meta.bad", netdata_configured_cache_dir); + rc = rename(sqlite_database, new_sqlite_database); + if (rc) + error_report("Failed to rename %s to %s", sqlite_database, new_sqlite_database); + } + // note: sqlite_database contains the right name } else strncpyz(sqlite_database, ":memory:", sizeof(sqlite_database) - 1); @@ -699,7 +734,7 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory) } if (rebuild & DB_CHECK_ANALYZE) { - errno = 0; + errno_clear(); netdata_log_info("Running ANALYZE on %s", sqlite_database); rc = sqlite3_exec_monitored(db_meta, "ANALYZE", 0, 0, &err_msg); if (rc != SQLITE_OK) { @@ -713,7 +748,7 @@ int sql_init_meta_database(db_check_action_type_t rebuild, int memory) return 1; } - errno = 0; + errno_clear(); netdata_log_info("SQLite database %s initialization", sqlite_database); rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0); @@ -1430,11 +1465,10 @@ static void cleanup_health_log(struct metadata_wc *wc) RRDHOST *host; - bool is_claimed = claimed(); dfe_start_reentrant(rrdhost_root_index, host){ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) continue; - sql_health_alarm_log_cleanup(host, is_claimed); + sql_health_alarm_log_cleanup(host); if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) break; } @@ -1445,6 +1479,9 @@ static void cleanup_health_log(struct metadata_wc *wc) (void) db_execute(db_meta,"DELETE FROM health_log WHERE host_id NOT IN (SELECT host_id FROM host)"); (void) db_execute(db_meta,"DELETE FROM health_log_detail WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)"); +#ifdef ENABLE_ACLK + (void) db_execute(db_meta,"DELETE FROM alert_version WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)"); +#endif } // |