From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- database/sqlite/sqlite_aclk.c | 332 +++++++++++++---------- database/sqlite/sqlite_aclk.h | 35 +-- database/sqlite/sqlite_aclk_alert.c | 452 +++++++++++++++----------------- database/sqlite/sqlite_aclk_alert.h | 3 +- database/sqlite/sqlite_aclk_node.c | 130 +++++---- database/sqlite/sqlite_context.c | 54 ++-- database/sqlite/sqlite_db_migration.c | 163 ++++++++---- database/sqlite/sqlite_functions.c | 157 ++++++----- database/sqlite/sqlite_functions.h | 5 +- database/sqlite/sqlite_health.c | 479 +++++++++++++++------------------- database/sqlite/sqlite_health.h | 2 +- database/sqlite/sqlite_metadata.c | 354 +++++++++++++------------ database/sqlite/sqlite_metadata.h | 1 + 13 files changed, 1126 insertions(+), 1041 deletions(-) (limited to 'database/sqlite') diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 1298045c2..ac574879c 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -11,60 +11,46 @@ struct aclk_sync_config_s { uv_timer_t timer_req; time_t cleanup_after; // Start a cleanup after this timestamp uv_async_t async; - /* FIFO command queue */ - uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; bool initialized; - volatile unsigned queue_size; - struct aclk_database_cmdqueue cmd_queue; + SPINLOCK cmd_queue_lock; + struct aclk_database_cmd *cmd_base; } aclk_sync_config = { 0 }; - void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } - -int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) +static struct aclk_database_cmd aclk_database_deq_cmd(void) { - unsigned queue_size; + struct aclk_database_cmd ret; - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 1; + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + if(aclk_sync_config.cmd_base) { + struct aclk_database_cmd *t = aclk_sync_config.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + ret = *t; + freez(t); } + else { + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; + } + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 0; + return ret; } static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { - unsigned queue_size; + struct aclk_database_cmd *t = mallocz(sizeof(*t)); + *t = *cmd; + t->prev = t->next = NULL; + + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - - /* wake up event loop */ (void) uv_async_send(&aclk_sync_config.async); } @@ -84,6 +70,8 @@ enum { IDX_PROGRAM_VERSION, IDX_ENTRIES, IDX_HEALTH_ENABLED, + IDX_LAST_CONNECTED, + IDX_IS_EPHEMERAL, }; static int create_host_callback(void *data, int argc, char **argv, char **column) @@ -92,9 +80,31 @@ static int create_host_callback(void *data, int argc, char **argv, char **column UNUSED(argc); UNUSED(column); + time_t last_connected = + (time_t)(argv[IDX_LAST_CONNECTED] ? str2uint64_t(argv[IDX_LAST_CONNECTED], NULL) : 0); + + if (!last_connected) + last_connected = now_realtime_sec(); + + time_t age = now_realtime_sec() - last_connected; + int is_ephemeral = 0; + + if (argv[IDX_IS_EPHEMERAL]) + is_ephemeral = str2i(argv[IDX_IS_EPHEMERAL]); + char guid[UUID_STR_LEN]; uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); + if (is_ephemeral && age > rrdhost_free_ephemeral_time_s) { + netdata_log_info( + "Skipping ephemeral hostname \"%s\" with GUID \"%s\", age = %ld seconds (limit %ld seconds)", + (const char *)argv[IDX_HOSTNAME], + guid, + age, + rrdhost_free_ephemeral_time_s); + return 0; + } + struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); @@ -103,32 +113,48 @@ static int create_host_callback(void *data, int argc, char **argv, char **column sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); RRDHOST *host = rrdhost_find_or_create( - (const char *) argv[IDX_HOSTNAME] - , (const char *) argv[IDX_REGISTRY] - , guid - , (const char *) argv[IDX_OS] - , (const char *) argv[IDX_TIMEZONE] - , (const char *) argv[IDX_ABBREV_TIMEZONE] - , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0) - , (const char *) argv[IDX_TAGS] - , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") - , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") - , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 - , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0 - , default_rrd_memory_mode - , 0 // health - , 0 // rrdpush enabled - , NULL //destination - , NULL // api key - , NULL // send charts matching - , false // rrdpush_enable_replication - , 0 // rrdpush_seconds_to_replicate - , 0 // rrdpush_replication_step - , system_info - , 1 - ); - if (likely(host)) + (const char *)argv[IDX_HOSTNAME], + (const char *)argv[IDX_REGISTRY], + guid, + (const char *)argv[IDX_OS], + (const char *)argv[IDX_TIMEZONE], + (const char *)argv[IDX_ABBREV_TIMEZONE], + (int32_t)(argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0), + (const char *)argv[IDX_TAGS], + (const char *)(argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown"), + (const char *)(argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown"), + argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1, + argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0, + default_rrd_memory_mode, + 0 // health + , + 0 // rrdpush enabled + , + NULL //destination + , + NULL // api key + , + NULL // send charts matching + , + false // rrdpush_enable_replication + , + 0 // rrdpush_seconds_to_replicate + , + 0 // rrdpush_replication_step + , + system_info, + 1); + + if (likely(host)) { + if (is_ephemeral) + rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST); + + if (is_ephemeral) + host->child_disconnected_time = now_realtime_sec(); + host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + host->last_connected = last_connected; + } (*number_of_chidren)++; @@ -136,43 +162,14 @@ static int create_host_callback(void *data, int argc, char **argv, char **column char node_str[UUID_STR_LEN] = ""; if (likely(host->node_id)) uuid_unparse_lower(*host->node_id, node_str); - internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d", rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral); #endif return 0; } #ifdef ENABLE_ACLK -static struct aclk_database_cmd aclk_database_deq_cmd(void) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - queue_size = aclk_sync_config.queue_size; - if (queue_size == 0) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - - } else { - /* dequeue command */ - ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; - if (queue_size == 1) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - } else { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.head + 1 : 0; - } - aclk_sync_config.queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&aclk_sync_config.cmd_cond); - } - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return ret; -} - -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" +#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id" static int is_host_available(uuid_t *host_id) { sqlite3_stmt *res = NULL; @@ -231,7 +228,7 @@ static void sql_delete_aclk_table_list(char *host_guid) BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str); + "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { @@ -255,18 +252,63 @@ fail: buffer_free(sql); } +// OPCODE: ACLK_DATABASE_NODE_UNREGISTER +static void sql_unregister_node(char *machine_guid) +{ + int rc; + uuid_t host_uuid; + + if (unlikely(!machine_guid)) + return; + + rc = uuid_parse(machine_guid, host_uuid); + if (rc) { + freez(machine_guid); + return; + } + + sqlite3_stmt *res = NULL; + + rc = sqlite3_prepare_v2(db_meta, "UPDATE node_instance SET node_id = NULL WHERE host_id = @host_id", -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to remove the host node id"); + freez(machine_guid); + return; + } + + rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to remove host node id"); + goto skip; + } + rc = sqlite3_step_monitored(res); + if (unlikely(rc != SQLITE_DONE)) { + error_report("Failed to execute command to remove host node id"); + } else { + // node: machine guid will be freed after processing + metadata_delete_host_chart_labels(machine_guid); + machine_guid = NULL; + } + +skip: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize statement to remove host node id"); + freez(machine_guid); +} + + static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) { struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_DELETE_HOST; cmd.param[0] = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(&cmd); + aclk_database_enq_cmd(&cmd); return 0; } #define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')" static void sql_check_aclk_table_list(void) { @@ -278,19 +320,18 @@ static void sql_check_aclk_table_list(void) } } -#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;" +#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d" static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - char sql[512]; - snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); + char sql[ACLK_SYNC_QUERY_SIZE]; + snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); if (unlikely(db_execute(db_meta, sql))) error_report("Failed to clean stale ACLK alert entries"); return 0; } - -#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" +#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')" static void sql_maint_aclk_sync_database_all(void) { @@ -304,7 +345,7 @@ static void sql_maint_aclk_sync_database_all(void) static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); RRDHOST *host = rrdhost_find_by_guid(uuid_str); @@ -332,18 +373,15 @@ static void timer_cb(uv_timer_t *handle) struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - time_t now = now_realtime_sec(); - - if (config->cleanup_after && config->cleanup_after < now) { + if (config->cleanup_after < now_realtime_sec()) { cmd.opcode = ACLK_DATABASE_CLEANUP; - if (!aclk_database_enq_cmd_noblock(&cmd)) - config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; + aclk_database_enq_cmd(&cmd); + config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } if (aclk_connected) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - aclk_database_enq_cmd_noblock(&cmd); - + aclk_database_enq_cmd(&cmd); aclk_check_node_info_and_collectors(); } } @@ -414,12 +452,16 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; - struct aclk_sync_host_config *ahc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *ahc = host->aclk_config; if (unlikely(!ahc)) sql_create_aclk_table(host, &host->host_uuid, host->node_id); - aclk_host_state_update(host, live); + aclk_host_state_update(host, live, 1); break; -// ALERTS + case ACLK_DATABASE_NODE_UNREGISTER: + sql_unregister_node(cmd.param[0]); + + break; + // ALERTS case ACLK_DATABASE_PUSH_ALERT_CONFIG: aclk_push_alert_config_event(cmd.param[0], cmd.param[1]); break; @@ -444,8 +486,6 @@ static void aclk_synchronization(void *arg __maybe_unused) uv_close((uv_handle_t *)&config->timer_req, NULL); uv_close((uv_handle_t *)&config->async, NULL); -// uv_close((uv_handle_t *)&config->async_exit, NULL); - uv_cond_destroy(&config->cmd_cond); (void) uv_loop_close(loop); worker_unregister(); @@ -455,11 +495,7 @@ static void aclk_synchronization(void *arg __maybe_unused) static void aclk_synchronization_init(void) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - aclk_sync_config.queue_size = 0; - fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond)); - fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex)); - + memset(&aclk_sync_config, 0, sizeof(aclk_sync_config)); fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } #endif @@ -469,8 +505,8 @@ static void aclk_synchronization_init(void) void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK - char uuid_str[GUID_LEN + 1]; - char host_guid[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_guid[UUID_STR_LEN]; int rc; uuid_unparse_lower_fix(host_uuid, uuid_str); @@ -478,37 +514,34 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, TABLE_ACLK_ALERT, uuid_str); + snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str); rc = db_execute(db_meta, sql); if (unlikely(rc)) error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid); else { - snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT, uuid_str, uuid_str); + snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str); rc = db_execute(db_meta, sql); if (unlikely(rc)) - error_report("Failed to create ACLK alert table index for host %s", host ? string2str(host->hostname) : host_guid); + error_report( + "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid); - snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT1, uuid_str, uuid_str); + snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str); rc = db_execute(db_meta, sql); if (unlikely(rc)) - error_report("Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid); - - snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT2, uuid_str, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); + error_report( + "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); } - if (likely(host) && unlikely(host->aclk_sync_host_config)) + if (likely(host) && unlikely(host->aclk_config)) return; if (unlikely(!host)) return; - struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config)); + struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t)); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); - host->aclk_sync_host_config = (void *)wc; + host->aclk_config = wc; if (node_id && !host->node_id) { host->node_id = mallocz(sizeof(*host->node_id)); uuid_copy(*host->node_id, *node_id); @@ -522,12 +555,18 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may #endif } -#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \ - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ - "program_version, entries, health_enabled FROM host WHERE hops >0;" +#define SQL_FETCH_ALL_HOSTS \ + "SELECT host_id, hostname, registry_hostname, update_every, os, " \ + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ + "program_version, entries, health_enabled, last_connected, " \ + "(SELECT CASE WHEN hl.label_value = 'true' THEN 1 ELSE 0 END FROM " \ + "host_label hl WHERE hl.host_id = h.host_id AND hl.label_key = '_is_ephemeral') " \ + "FROM host h WHERE hops > 0" + +#define SQL_FETCH_ALL_INSTANCES \ + "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ + "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL" -#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ - "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " void sql_aclk_sync_init(void) { char *err_msg = NULL; @@ -622,3 +661,18 @@ void schedule_node_info_update(RRDHOST *host __maybe_unused) aclk_database_enq_cmd(&cmd); #endif } + +#ifdef ENABLE_ACLK +void unregister_node(const char *machine_guid) +{ + if (unlikely(!machine_guid)) + return; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER; + cmd.param[0] = strdupz(machine_guid); + cmd.completion = NULL; + aclk_database_enq_cmd(&cmd); +} +#endif \ No newline at end of file diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 850ca434e..0db2647bf 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -5,14 +5,13 @@ #include "sqlite3.h" - #ifndef ACLK_MAX_CHART_BATCH #define ACLK_MAX_CHART_BATCH (200) #endif #ifndef ACLK_MAX_CHART_BATCH_COUNT #define ACLK_MAX_CHART_BATCH_COUNT (10) #endif -#define ACLK_MAX_ALERT_UPDATES (5) +#define ACLK_MAX_ALERT_UPDATES "5" #define ACLK_DATABASE_CLEANUP_FIRST (1200) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400) @@ -41,13 +40,13 @@ static inline int claimed() return localhost->aclk_state.claimed_id != NULL; } -#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ - "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ - "unique(alert_unique_id));" +#define TABLE_ACLK_ALERT \ + "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ + "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ + "UNIQUE(alert_unique_id))" -#define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);" -#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id);" -#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted);" +#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id)" +#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted)" enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, @@ -60,6 +59,7 @@ enum aclk_database_opcode { ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, ACLK_DATABASE_PUSH_ALERT_CHECKPOINT, ACLK_DATABASE_QUEUE_REMOVED_ALERTS, + ACLK_DATABASE_NODE_UNREGISTER, ACLK_DATABASE_TIMER, // leave this last @@ -71,16 +71,10 @@ struct aclk_database_cmd { enum aclk_database_opcode opcode; void *param[2]; struct completion *completion; + struct aclk_database_cmd *prev, *next; }; -#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024) - -struct aclk_database_cmdqueue { - unsigned head, tail; - struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE]; -}; - -struct aclk_sync_host_config { +typedef struct aclk_sync_cfg_t { RRDHOST *host; int alert_updates; int alert_checkpoint_req; @@ -92,17 +86,16 @@ struct aclk_sync_host_config { char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested uint64_t alerts_log_first_sequence_id; uint64_t alerts_log_last_sequence_id; -}; - -extern sqlite3 *db_meta; +} aclk_sync_cfg_t; -int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd); void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); void sql_aclk_sync_init(void); void aclk_push_alert_config(const char *node_id, const char *config_hash); void aclk_push_node_alert_snapshot(const char *node_id); -void aclk_push_node_health_log(const char *node_id); void aclk_push_node_removed_alerts(const char *node_id); void schedule_node_info_update(RRDHOST *host); +#ifdef ENABLE_ACLK +void unregister_node(const char *machine_guid); +#endif #endif //NETDATA_SQLITE_ACLK_H diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 60bf5dbdc..9bd060f96 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -13,16 +13,42 @@ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \ }) - #define SQL_UPDATE_FILTERED_ALERT \ - "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u" + "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \ + "WHERE filtered_alert_unique_id = @old_alert" -static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) +static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id); - sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); - ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to update_filtered"); + return; + } + + rc = sqlite3_bind_int64(res, 1, ae->unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind ae unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_bind_int64(res, 2, unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_DONE)) + ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc); } #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \ @@ -30,35 +56,35 @@ static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \ "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL" -static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id) +static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id) { sqlite3_stmt *res = NULL; - int rc = 0; - bool ret = false; - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to check for alert variables."); return false; } - rc = sqlite3_bind_int(res, 1, (int) unique_id); + bool ret = false; + + rc = sqlite3_bind_int64(res, 1, unique_id); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind unique_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) ret = true; -fail: +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc); @@ -71,32 +97,25 @@ fail: //decide if some events should be sent or not #define SQL_SELECT_ALERT_BY_ID \ "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND +hld.unique_id = aa.filtered_alert_unique_id " \ + "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \ "AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \ - "ORDER BY hld.rowid DESC LIMIT 1;" + "ORDER BY hld.rowid DESC LIMIT 1" static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - - bool send = false; if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) return 0; - if (unlikely(uuid_is_null(ae->config_hash_id))) + if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config)) return 0; char sql[ACLK_SYNC_QUERY_SIZE]; - uuid_t config_hash_id; - RRDCALC_STATUS status; - uint32_t unique_id; //get the previous sent event of this alarm_id //base the search on the last filtered event - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -104,6 +123,8 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) return true; } + bool send = false; + rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking should_send_to_cloud"); @@ -119,17 +140,18 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { - status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); + uuid_t config_hash_id; + RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); if (sqlite3_column_type(res, 1) != SQLITE_NULL) uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1))); - unique_id = (uint32_t)sqlite3_column_int64(res, 2); + int64_t unique_id = sqlite3_column_int64(res, 2); if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id)) send = true; else - update_filtered(ae, unique_id, uuid_str); + update_filtered(ae, unique_id, host->aclk_config->uuid_str); } else send = true; @@ -143,13 +165,12 @@ done: #define SQL_QUEUE_ALERT_TO_CLOUD \ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING;" + "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING" void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) { sqlite3_stmt *res_alert = NULL; char sql[ACLK_SYNC_QUERY_SIZE]; - char uuid_str[UUID_STR_LEN]; if (!service_running(SERVICE_ACLK)) return; @@ -163,8 +184,7 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) return; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0); if (unlikely(rc != SQLITE_OK)) { @@ -172,18 +192,18 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) return; } - rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id); + rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id); if (unlikely(rc != SQLITE_OK)) - goto bind_fail; + goto done; rc = execute_insert(res_alert); if (unlikely(rc == SQLITE_DONE)) { ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } else - error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc); + error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc); -bind_fail: +done: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) error_report("Failed to reset statement in store alert event, rc = %d", rc); } @@ -239,15 +259,13 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) { } -void aclk_push_alert_event(struct aclk_sync_host_config *wc) +static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused) { -#ifndef ENABLE_ACLK - UNUSED(wc); -#else +#ifdef ENABLE_ACLK int rc; if (unlikely(!wc->alert_updates)) { - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); @@ -265,23 +283,20 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - int limit = ACLK_MAX_ALERT_UPDATES; - sqlite3_stmt *res = NULL; buffer_sprintf( sql, - "select aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " + "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " " hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, " " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, " " hld.alarm_event_id, hl.chart_name, hld.summary " - " from health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " - " where hld.unique_id = aa.alert_unique_id and hl.config_hash_id = ha.hash_id and aa.date_submitted is null " - " and hl.host_id = @host_id and hl.health_log_id = hld.health_log_id " - " order by aa.sequence_id asc limit %d;", - wc->uuid_str, - limit); + " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " + " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL " + " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " + " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES, + wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { @@ -292,13 +307,6 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) rc = db_execute(db_meta, buffer_tostring(sql_fix)); if (unlikely(rc)) error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); - - else { - buffer_flush(sql_fix); - buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix)))) - error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); - } buffer_free(sql_fix); // Try again @@ -315,10 +323,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for pushing alert event."); - sqlite3_finalize(res); - buffer_free(sql); - freez(claim_id); - return; + goto done; } uint64_t first_sequence_id = 0; @@ -395,9 +400,13 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) if (first_sequence_id) { buffer_flush(sql); - buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " - "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", - wc->uuid_str, first_sequence_id, last_sequence_id); + buffer_sprintf( + sql, + "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " + "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64, + wc->uuid_str, + first_sequence_id, + last_sequence_id); if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); @@ -407,7 +416,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) } else { if (wc->alerts_log_first_sequence_id) - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", @@ -417,6 +426,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) wc->alerts_log_last_sequence_id = 0; } +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc); @@ -437,7 +447,7 @@ void aclk_push_alert_events_for_all_hosts(void) rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (likely(wc)) aclk_push_alert_event(wc); } @@ -446,59 +456,54 @@ void aclk_push_alert_events_for_all_hosts(void) void sql_queue_existing_alerts_to_aclk(RRDHOST *host) { - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); sqlite3_stmt *res = NULL; int rc; + struct aclk_sync_cfg_t *wc = host->aclk_config; + + BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + rw_spinlock_write_lock(&host->health_log.spinlock); - buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) { - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; - } + buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str); + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + goto skip; buffer_flush(sql); + buffer_sprintf( sql, - "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " - "select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl " - "where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null " - "and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;", - uuid_str); + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " + "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl " + "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL " + "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING", + wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to queue existing alerts."); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto skip; } rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue existing alerts."); - sqlite3_finalize(res); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto done; } rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to queue existing alerts, rc = %d", rc); - + else + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc); +skip: rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } void aclk_send_alarm_configuration(char *config_hash) @@ -506,12 +511,12 @@ void aclk_send_alarm_configuration(char *config_hash) if (unlikely(!config_hash)) return; - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = localhost->aclk_config; if (unlikely(!wc)) return; - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", @@ -524,43 +529,33 @@ void aclk_send_alarm_configuration(char *config_hash) "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ "module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \ "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ - "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id;" + "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id" -int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) +void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { - int rc = 0; - #ifdef ENABLE_ACLK - - CHECK_SQLITE_CONNECTION(db_meta); + int rc; sqlite3_stmt *res = NULL; + struct aclk_sync_cfg_t *wc; - struct aclk_sync_host_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { + if (unlikely(!host || !(wc = host->aclk_config))) { freez(config_hash); freez(node_id); - return 1; - } - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { - freez(config_hash); - freez(node_id); - return 1; + return; } rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to fetch an alarm hash configuration"); - return 1; + return; } uuid_t hash_uuid; if (uuid_parse(config_hash, hash_uuid)) - return 1; + return; rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) @@ -632,13 +627,13 @@ int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash } if (likely(p_alarm_config.cfg_hash)) { - netdata_log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else - netdata_log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); bind_fail: rc = sqlite3_finalize(res); @@ -648,7 +643,6 @@ bind_fail: freez(config_hash); freez(node_id); #endif - return rc; } @@ -660,51 +654,50 @@ void aclk_start_alert_streaming(char *node_id, bool resets) if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc; - if (unlikely(!wc)) + RRDHOST *host = find_host_by_node_id(node_id); + if (unlikely(!host || !(wc = host->aclk_config))) return; if (unlikely(!host->health.health_enabled)) { - netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } if (resets) { - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); sql_queue_existing_alerts_to_aclk(host); } else - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); wc->alert_updates = 1; wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } -#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ +#define SQL_QUEUE_REMOVE_ALERTS \ + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ - "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ - "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ - "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \ - "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;" + "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ + "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \ + "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \ + "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \ + "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING" + void sql_process_queue_removed_alerts_to_aclk(char *node_id) { - struct aclk_sync_host_config *wc; + struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); freez(node_id); - if (unlikely(!host || !(wc = host->aclk_sync_host_config))) + if (unlikely(!host || !(wc = host->aclk_config))) return; char sql[ACLK_SYNC_QUERY_SIZE * 2]; sqlite3_stmt *res = NULL; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str); + snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -715,33 +708,25 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id) rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue remvoed alerts."); - sqlite3_finalize(res); - return; + goto skip; } rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) { - sqlite3_finalize(res); - error_report("Failed to queue removed alerts, rc = %d", rc); - return; + if (likely(rc == SQLITE_DONE)) { + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } +skip: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc); - - netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - - rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) { - if (unlikely(!host->aclk_sync_host_config)) - return; - - if (!claimed() || !host->node_id) + if (unlikely(!host->aclk_config || !claimed() || !host->node_id)) return; char node_id[UUID_STR_LEN]; @@ -753,32 +738,28 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { uuid_t node_uuid; + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; + struct aclk_sync_cfg_t *wc; + RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { - netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + if (unlikely(!host || !(wc = host->aclk_config))) { + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); return; } - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - - if (unlikely(!wc)) { - netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); - return; - } - - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_DEBUG, "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", snapshot_uuid); + if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) return; - __sync_synchronize(); + wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); - __sync_synchronize(); aclk_push_node_alert_snapshot(node_id); } @@ -795,9 +776,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->chart = strdupz(ae_chart_id(ae)); alarm_log->name = strdupz(ae_name(ae)); - alarm_log->batch_id = 0; - alarm_log->sequence_id = 0; - alarm_log->when = (time_t)ae->when; + alarm_log->when = ae->when; alarm_log->config_hash = strdupz((char *)config_hash_id); @@ -812,7 +791,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->non_clear_duration = (time_t)ae->non_clear_duration; alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status); alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status); - alarm_log->delay = (int)ae->delay; + alarm_log->delay = ae->delay; alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp; alarm_log->last_repeat = (time_t)ae->last_repeat; @@ -842,18 +821,18 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN #endif #ifdef ENABLE_ACLK -static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) +static bool have_recent_alarm(RRDHOST *host, int64_t alarm_id, int64_t mark) { ALARM_ENTRY *ae = host->health_log.alarms; while (ae) { if (ae->alarm_id == alarm_id && ae->unique_id >mark && (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) - return 1; + return true; ae = ae->next; } - return 0; + return false; } #endif @@ -864,17 +843,17 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) RRDHOST *host = find_host_by_node_id(node_id); if (unlikely(!host)) { - netdata_log_access("AC [%s (N/A)]: Node id not found", node_id); + nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id); freez(node_id); return; } freez(node_id); - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - netdata_log_access( + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); @@ -888,11 +867,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) if (unlikely(!claim_id)) return; - netdata_log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); uint32_t cnt = 0; - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); rw_spinlock_read_lock(&host->health_log.spinlock); @@ -915,7 +892,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) } if (cnt) { - uint32_t chunk = 1, chunks = 0; + uint32_t chunks; chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); ae = host->health_log.alarms; @@ -926,15 +903,12 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) alarm_snap.claim_id = claim_id; alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; - alarm_snap.chunk = chunk; + alarm_snap.chunk = 1; alarm_snapshot_proto_ptr_t snapshot_proto = NULL; for (; ae; ae = ae->next) { - if (likely(ae->updated_by_id)) - continue; - - if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) + if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) continue; if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) @@ -957,19 +931,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) if (cnt == ALARM_EVENTS_PER_CHUNK) { aclk_send_alarm_snapshot(snapshot_proto); - cnt = 0; - - if (chunk < chunks) { - chunk++; - - struct alarm_snapshot alarm_snap; - alarm_snap.node_id = wc->node_id; - alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; - alarm_snap.chunks = chunks; - alarm_snap.chunk = chunk; - + if (alarm_snap.chunk < chunks) { + alarm_snap.chunk++; snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); } } @@ -986,51 +950,70 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) #endif } -#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created + %d < UNIXEPOCH();" +#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period" + void sql_aclk_alert_clean_dead_entries(RRDHOST *host) { - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (unlikely(!wc)) + return; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_DELETE_ALERT_ENTRIES, uuid_str, MAX_REMOVED_PERIOD); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str); - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg); + sqlite3_stmt *res = NULL; + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { - error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg); - sqlite3_free(err_msg); + error_report("Failed to prepare statement for cleaning stale ACLK alert entries."); + return; } + + rc = sqlite3_bind_int64(res, 1, MAX_REMOVED_PERIOD); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind MAX_REMOVED_PERIOD parameter."); + goto skip; + } + + rc = sqlite3_step_monitored(res); + if (rc != SQLITE_DONE) + error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries."); + +skip: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement for cleaning stale ACLK alert entries."); } #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ - "FROM aclk_alert_%s WHERE date_submitted IS NULL;" + "FROM aclk_alert_%s WHERE date_submitted IS NULL" int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) { - int rc; - struct aclk_sync_host_config *wc = NULL; - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + + struct aclk_sync_cfg_t *wc = host->aclk_config; if (!wc) return 1; proto_alert_status->alert_updates = wc->alert_updates; char sql[ACLK_SYNC_QUERY_SIZE]; - sqlite3_stmt *res = NULL; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); + sqlite3_stmt *res = NULL; + snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to get alert log status from the database."); return 1; } while (sqlite3_step_monitored(res) == SQLITE_ROW) { - proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; - proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; + proto_alert_status->pending_min_sequence_id = + sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0; + proto_alert_status->pending_max_sequence_id = + sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0; + proto_alert_status->last_submitted_sequence_id = + sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0; } rc = sqlite3_finalize(res); @@ -1045,21 +1028,15 @@ void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) if (unlikely(!node_id)) return; - struct aclk_sync_host_config *wc = NULL; + struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) - return; - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); - return; + if (unlikely(!host || !(wc = host->aclk_config))) + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); + else { + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); + wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; } - - netdata_log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); - - wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; } typedef struct active_alerts { @@ -1068,15 +1045,14 @@ typedef struct active_alerts { RRDCALC_STATUS status; } active_alerts_t; -static inline int compare_active_alerts(const void * a, const void * b) { +static inline int compare_active_alerts(const void *a, const void *b) +{ active_alerts_t *active_alerts_a = (active_alerts_t *)a; active_alerts_t *active_alerts_b = (active_alerts_t *)b; - if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) ) - { - return strcmp(active_alerts_a->chart, active_alerts_b->chart); - } - else + if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) { + return strcmp(active_alerts_a->chart, active_alerts_b->chart); + } else return strcmp(active_alerts_a->name, active_alerts_b->name); } @@ -1084,16 +1060,16 @@ static inline int compare_active_alerts(const void * a, const void * b) { void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) { #ifdef ENABLE_ACLK - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!wc)) { - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); return; } if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { //postpone checkpoint send - wc->alert_checkpoint_req+=3; - netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + wc->alert_checkpoint_req += 3; + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); return; } @@ -1126,16 +1102,16 @@ void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) BUFFER *alarms_to_hash; if (cnt) { - qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); + qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); alarms_to_hash = buffer_create(len, NULL); - for (uint32_t i=0;inode_id, rrdhost_hostname(host)); - } else { - netdata_log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); - } + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); + } else + nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); + wc->alert_checkpoint_req = 0; buffer_free(alarms_to_hash); #endif diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index c92aef083..cfb3468b9 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -15,9 +15,8 @@ struct proto_alert_status { uint64_t last_submitted_sequence_id; }; -void aclk_push_alert_event(struct aclk_sync_host_config *wc); void aclk_send_alarm_configuration (char *config_hash); -int aclk_push_alert_config_event(char *node_id, char *config_hash); +void aclk_push_alert_config_event(char *node_id, char *config_hash); void aclk_start_alert_streaming(char *node_id, bool resets); void sql_queue_removed_alerts_to_aclk(RRDHOST *host); void sql_process_queue_removed_alerts_to_aclk(char *node_id); diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index 82927854a..dcc8c375c 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -7,17 +7,16 @@ #include "../../aclk/aclk_capas.h" #ifdef ENABLE_ACLK + DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { RRDSET *st; char name[500]; - rrdset_foreach_read(st, host) { + rrdset_foreach_read(st, host) + { if (rrdset_is_available_for_viewers(st)) { - struct collector_info col = { - .plugin = rrdset_plugin_name(st), - .module = rrdset_module_name(st) - }; - snprintfz(name, 499, "%s:%s", col.plugin, col.module); + struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)}; + snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module); dictionary_set(dict, name, &col, sizeof(struct collector_info)); } } @@ -26,17 +25,9 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { return dict; } -static void build_node_collectors(char *node_id __maybe_unused) +static void build_node_collectors(RRDHOST *host) { - - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; - if (unlikely(!wc)) - return; + struct aclk_sync_cfg_t *wc = host->aclk_config; struct update_node_collectors upd_node_collectors; DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED); @@ -50,45 +41,33 @@ static void build_node_collectors(char *node_id __maybe_unused) dictionary_destroy(dict); freez(upd_node_collectors.claim_id); - netdata_log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host)); - - freez(node_id); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host)); } -static void build_node_info(char *node_id __maybe_unused) +static void build_node_info(RRDHOST *host) { struct update_node_info node_info; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely((!host))) { - freez(node_id); - return; - } - - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; - - if (unlikely(!wc)) { - freez(node_id); - return; - } + struct aclk_sync_cfg_t *wc = host->aclk_config; rrd_rdlock(); node_info.node_id = wc->node_id; node_info.claim_id = get_agent_claimid(); node_info.machine_guid = host->machine_guid; - node_info.child = (wc->host != localhost); + node_info.child = (host != localhost); node_info.ml_info.ml_capable = ml_capable(); - node_info.ml_info.ml_enabled = ml_enabled(wc->host); + node_info.ml_info.ml_enabled = ml_enabled(host); - node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host); + node_info.node_instance_capabilities = aclk_get_node_instance_capas(host); now_realtime_timeval(&node_info.updated_at); char *host_version = NULL; if (host != localhost) { netdata_mutex_lock(&host->receiver_lock); - host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host)); + host_version = strdupz( + host->receiver && host->receiver->program_version ? host->receiver->program_version : + rrdhost_program_version(host)); netdata_mutex_unlock(&host->receiver_lock); } @@ -112,10 +91,11 @@ static void build_node_info(char *node_id __maybe_unused) node_info.data.machine_guid = host->machine_guid; struct capability node_caps[] = { - { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled }, - { .name = "mc", .version = host->system_info->mc_version ? host->system_info->mc_version : 0, .enabled = host->system_info->mc_version ? 1 : 0 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; + {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled}, + {.name = "mc", + .version = host->system_info->mc_version ? host->system_info->mc_version : 0, + .enabled = host->system_info->mc_version ? 1 : 0}, + {.name = NULL, .version = 0, .enabled = 0}}; node_info.node_capabilities = node_caps; node_info.data.ml_info.ml_capable = host->system_info->ml_capable; @@ -124,7 +104,14 @@ static void build_node_info(char *node_id __maybe_unused) node_info.data.host_labels_ptr = host->rrdlabels; aclk_update_node_info(&node_info); - netdata_log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child"); + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", + wc->node_id, + rrdhost_hostname(host), + host->machine_guid, + host == localhost ? "parent" : "child"); rrd_unlock(); freez(node_info.claim_id); @@ -132,10 +119,21 @@ static void build_node_info(char *node_id __maybe_unused) freez(host_version); wc->node_collectors_send = now_realtime_sec(); - freez(node_id); - } +static bool host_is_replicating(RRDHOST *host) +{ + bool replicating = false; + RRDSET *st; + rrdset_foreach_reentrant(st, host) { + if (rrdset_is_replicating(st)) { + replicating = true; + break; + } + } + rrdset_foreach_done(st); + return replicating; +} void aclk_check_node_info_and_collectors(void) { @@ -144,35 +142,59 @@ void aclk_check_node_info_and_collectors(void) if (unlikely(!aclk_connected)) return; - size_t pending = 0; - dfe_start_reentrant(rrdhost_root_index, host) { + size_t context_loading = 0; + size_t replicating = 0; + size_t context_pp = 0; - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + time_t now = now_realtime_sec(); + dfe_start_reentrant(rrdhost_root_index, host) + { + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!wc)) continue; if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host)); - pending++; + context_loading++; continue; } - if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) { + if (unlikely(host_is_replicating(host))) { + internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host)); + replicating++; + continue; + } + + bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue)); + + if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send)) + context_pp++; + + if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) { wc->node_info_send_time = 0; - build_node_info(strdupz(wc->node_id)); + build_node_info(host); internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host)); } - if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { - build_node_collectors(strdupz(wc->node_id)); + if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) { + build_node_collectors(host); internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host)); wc->node_collectors_send = 0; } } dfe_done(host); - if(pending) - netdata_log_info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending); + if (context_loading || replicating || context_pp) { + nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS); + nd_log_limit( + &erl, + NDLS_DAEMON, + NDLP_INFO, + "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing", + context_loading, + replicating, + context_pp); + } } #endif diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c index d4b15e99d..26ed8a96a 100644 --- a/database/sqlite/sqlite_context.c +++ b/database/sqlite/sqlite_context.c @@ -7,16 +7,16 @@ #define DB_CONTEXT_METADATA_VERSION 1 const char *database_context_config[] = { - "CREATE TABLE IF NOT EXISTS context (host_id BLOB, id TEXT NOT NULL, version INT NOT NULL, title TEXT NOT NULL, " \ + "CREATE TABLE IF NOT EXISTS context (host_id BLOB, id TEXT NOT NULL, version INT NOT NULL, title TEXT NOT NULL, " "chart_type TEXT NOT NULL, unit TEXT NOT NULL, priority INT NOT NULL, first_time_t INT NOT NULL, " "last_time_t INT NOT NULL, deleted INT NOT NULL, " - "family TEXT, PRIMARY KEY (host_id, id));", + "family TEXT, PRIMARY KEY (host_id, id))", NULL }; const char *database_context_cleanup[] = { - "VACUUM;", + "VACUUM", NULL }; @@ -31,7 +31,7 @@ int sql_init_context_database(int memory) int rc; if (likely(!memory)) - snprintfz(sqlite_database, FILENAME_MAX, "%s/context-meta.db", netdata_configured_cache_dir); + snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/context-meta.db", netdata_configured_cache_dir); else strcpy(sqlite_database, ":memory:"); @@ -56,9 +56,9 @@ int sql_init_context_database(int memory) return 1; if (likely(!memory)) - snprintfz(buf, 1024, "ATTACH DATABASE \"%s/netdata-meta.db\" as meta;", netdata_configured_cache_dir); + snprintfz(buf, sizeof(buf) - 1, "ATTACH DATABASE \"%s/netdata-meta.db\" as meta", netdata_configured_cache_dir); else - snprintfz(buf, 1024, "ATTACH DATABASE ':memory:' as meta;"); + snprintfz(buf, sizeof(buf) - 1, "ATTACH DATABASE ':memory:' as meta"); if(init_database_batch(db_context_meta, list)) return 1; @@ -92,7 +92,7 @@ void sql_close_context_database(void) // Fetching data // #define CTX_GET_CHART_LIST "SELECT c.chart_id, c.type||'.'||c.id, c.name, c.context, c.title, c.unit, c.priority, " \ - "c.update_every, c.chart_type, c.family FROM meta.chart c WHERE c.host_id = @host_id and c.chart_id is not null; " + "c.update_every, c.chart_type, c.family FROM chart c WHERE c.host_id = @host_id AND c.chart_id IS NOT NULL" void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data) { @@ -105,7 +105,7 @@ void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, voi } if (unlikely(!res)) { - rc = prepare_statement(db_context_meta, CTX_GET_CHART_LIST, &res); + rc = prepare_statement(db_meta, CTX_GET_CHART_LIST, &res); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to fetch chart list"); return; @@ -141,14 +141,14 @@ skip_load: // Dimension list #define CTX_GET_DIMENSION_LIST "SELECT d.dim_id, d.id, d.name, CASE WHEN INSTR(d.options,\"hidden\") > 0 THEN 1 ELSE 0 END " \ - "FROM meta.dimension d WHERE d.chart_id = @id and d.dim_id is not null ORDER BY d.rowid ASC;" + "FROM dimension d WHERE d.chart_id = @id AND d.dim_id IS NOT NULL ORDER BY d.rowid ASC" void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data) { int rc; static __thread sqlite3_stmt *res = NULL; if (unlikely(!res)) { - rc = prepare_statement(db_context_meta, CTX_GET_DIMENSION_LIST, &res); + rc = prepare_statement(db_meta, CTX_GET_DIMENSION_LIST, &res); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to fetch chart dimension data"); return; @@ -178,7 +178,8 @@ failed: } // LABEL LIST -#define CTX_GET_LABEL_LIST "SELECT l.label_key, l.label_value, l.source_type FROM meta.chart_label l WHERE l.chart_id = @id;" +#define CTX_GET_LABEL_LIST "SELECT l.label_key, l.label_value, l.source_type FROM meta.chart_label l WHERE l.chart_id = @id" + void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, void *), void *data) { int rc; @@ -215,7 +216,8 @@ failed: // CONTEXT LIST #define CTX_GET_CONTEXT_LIST "SELECT id, version, title, chart_type, unit, priority, first_time_t, " \ - "last_time_t, deleted, family FROM context c WHERE c.host_id = @host_id;" + "last_time_t, deleted, family FROM context c WHERE c.host_id = @host_id" + void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_DATA *, void *), void *data) { @@ -266,9 +268,10 @@ failed: // // Storing Data // -#define CTX_STORE_CONTEXT "INSERT OR REPLACE INTO context " \ - "(host_id, id, version, title, chart_type, unit, priority, first_time_t, last_time_t, deleted, family) " \ - "VALUES (@host_id, @context, @version, @title, @chart_type, @unit, @priority, @first_time_t, @last_time_t, @deleted, @family);" +#define CTX_STORE_CONTEXT \ + "INSERT OR REPLACE INTO context " \ + "(host_id, id, version, title, chart_type, unit, priority, first_time_t, last_time_t, deleted, family) " \ + "VALUES (@host_id, @context, @version, @title, @chart_type, @unit, @priority, @first_t, @last_t, @delete, @family)" int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) { @@ -292,7 +295,7 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) rc = bind_text_null(res, 2, context_data->id, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind context to store details"); + error_report("Failed to bind context to store context details"); goto skip_store; } @@ -304,19 +307,19 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) rc = bind_text_null(res, 4, context_data->title, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind context to store details"); + error_report("Failed to bind context to store context details"); goto skip_store; } rc = bind_text_null(res, 5, context_data->chart_type, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind context to store details"); + error_report("Failed to bind context to store context details"); goto skip_store; } rc = bind_text_null(res, 6, context_data->units, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind context to store details"); + error_report("Failed to bind context to store context details"); goto skip_store; } @@ -365,7 +368,7 @@ skip_store: // Delete a context -#define CTX_DELETE_CONTEXT "DELETE FROM context WHERE host_id = @host_id AND id = @context;" +#define CTX_DELETE_CONTEXT "DELETE FROM context WHERE host_id = @host_id AND id = @context" int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) { int rc, rc_stored = 1; @@ -382,13 +385,13 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id to delete context data"); + error_report("Failed to bind host_id for context data deletion"); goto skip_delete; } rc = sqlite3_bind_text(res, 2, context_data->id, -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind context id for data deletion"); + error_report("Failed to bind context id for context data deletion"); goto skip_delete; } @@ -396,13 +399,6 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) if (rc_stored != SQLITE_DONE) error_report("Failed to delete context %s, rc = %d", context_data->id, rc_stored); -#ifdef NETDATA_INTERNAL_CHECKS - else { - char host_uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(*host_uuid, host_uuid_str); - netdata_log_info("%s: Deleted context %s under host %s", __FUNCTION__, context_data->id, host_uuid_str); - } -#endif skip_delete: rc = sqlite3_finalize(res); diff --git a/database/sqlite/sqlite_db_migration.c b/database/sqlite/sqlite_db_migration.c index a011d0fef..29da6c249 100644 --- a/database/sqlite/sqlite_db_migration.c +++ b/database/sqlite/sqlite_db_migration.c @@ -7,7 +7,7 @@ static int return_int_cb(void *data, int argc, char **argv, char **column) int *status = data; UNUSED(argc); UNUSED(column); - *status = str2uint32_t(argv[0], NULL); + *status = (int) str2uint32_t(argv[0], NULL); return 0; } @@ -18,7 +18,7 @@ static int get_auto_vaccum(sqlite3 *database) int exists = 0; - snprintf(sql, 127, "PRAGMA auto_vacuum"); + snprintf(sql, sizeof(sql) - 1, "PRAGMA auto_vacuum"); int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg); if (rc != SQLITE_OK) { @@ -35,7 +35,7 @@ int db_table_count(sqlite3 *database) char sql[128]; int count = 0; - snprintf(sql, 127, "select count(1) from sqlite_schema where type = 'table'"); + snprintf(sql, sizeof(sql) - 1, "select count(1) from sqlite_schema where type = 'table'"); int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &count, &err_msg); if (rc != SQLITE_OK) { netdata_log_info("Error checking database table count; %s", err_msg); @@ -51,7 +51,7 @@ int table_exists_in_database(sqlite3 *database, const char *table) int exists = 0; - snprintf(sql, 127, "select 1 from sqlite_schema where type = 'table' and name = '%s';", table); + snprintf(sql, sizeof(sql) - 1, "select 1 from sqlite_schema where type = 'table' and name = '%s'", table); int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg); if (rc != SQLITE_OK) { @@ -69,7 +69,7 @@ static int column_exists_in_table(sqlite3 *database, const char *table, const ch int exists = 0; - snprintf(sql, 127, "SELECT 1 FROM pragma_table_info('%s') where name = '%s';", table, column); + snprintf(sql, sizeof(sql) - 1, "SELECT 1 FROM pragma_table_info('%s') where name = '%s'", table, column); int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg); if (rc != SQLITE_OK) { @@ -92,64 +92,64 @@ static int get_database_user_version(sqlite3 *database) } const char *database_migrate_v1_v2[] = { - "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0;", + "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0", NULL }; const char *database_migrate_v2_v3[] = { - "ALTER TABLE host ADD memory_mode INT NOT NULL DEFAULT 0;", - "ALTER TABLE host ADD abbrev_timezone TEXT NOT NULL DEFAULT '';", - "ALTER TABLE host ADD utc_offset INT NOT NULL DEFAULT 0;", - "ALTER TABLE host ADD program_name TEXT NOT NULL DEFAULT 'unknown';", - "ALTER TABLE host ADD program_version TEXT NOT NULL DEFAULT 'unknown';", - "ALTER TABLE host ADD entries INT NOT NULL DEFAULT 0;", - "ALTER TABLE host ADD health_enabled INT NOT NULL DEFAULT 0;", + "ALTER TABLE host ADD memory_mode INT NOT NULL DEFAULT 0", + "ALTER TABLE host ADD abbrev_timezone TEXT NOT NULL DEFAULT ''", + "ALTER TABLE host ADD utc_offset INT NOT NULL DEFAULT 0", + "ALTER TABLE host ADD program_name TEXT NOT NULL DEFAULT 'unknown'", + "ALTER TABLE host ADD program_version TEXT NOT NULL DEFAULT 'unknown'", + "ALTER TABLE host ADD entries INT NOT NULL DEFAULT 0", + "ALTER TABLE host ADD health_enabled INT NOT NULL DEFAULT 0", NULL }; const char *database_migrate_v4_v5[] = { - "DROP TABLE IF EXISTS chart_active;", - "DROP TABLE IF EXISTS dimension_active;", - "DROP TABLE IF EXISTS chart_hash;", - "DROP TABLE IF EXISTS chart_hash_map;", - "DROP VIEW IF EXISTS v_chart_hash;", + "DROP TABLE IF EXISTS chart_active", + "DROP TABLE IF EXISTS dimension_active", + "DROP TABLE IF EXISTS chart_hash", + "DROP TABLE IF EXISTS chart_hash_map", + "DROP VIEW IF EXISTS v_chart_hash", NULL }; const char *database_migrate_v5_v6[] = { - "DROP TRIGGER IF EXISTS tr_dim_del;", - "DROP TABLE IF EXISTS dimension_delete;", + "DROP TRIGGER IF EXISTS tr_dim_del", + "DROP TABLE IF EXISTS dimension_delete", NULL }; const char *database_migrate_v9_v10[] = { - "ALTER TABLE alert_hash ADD chart_labels TEXT;", + "ALTER TABLE alert_hash ADD chart_labels TEXT", NULL }; const char *database_migrate_v10_v11[] = { - "ALTER TABLE health_log ADD chart_name TEXT;", + "ALTER TABLE health_log ADD chart_name TEXT", NULL }; const char *database_migrate_v11_v12[] = { - "ALTER TABLE health_log_detail ADD summary TEXT;", - "ALTER TABLE alert_hash ADD summary TEXT;", + "ALTER TABLE health_log_detail ADD summary TEXT", + "ALTER TABLE alert_hash ADD summary TEXT", NULL }; const char *database_migrate_v12_v13_detail[] = { - "ALTER TABLE health_log_detail ADD summary TEXT;", + "ALTER TABLE health_log_detail ADD summary TEXT", NULL }; const char *database_migrate_v12_v13_hash[] = { - "ALTER TABLE alert_hash ADD summary TEXT;", + "ALTER TABLE alert_hash ADD summary TEXT", NULL }; const char *database_migrate_v13_v14[] = { - "ALTER TABLE host ADD last_connected INT NOT NULL DEFAULT 0;", + "ALTER TABLE host ADD last_connected INT NOT NULL DEFAULT 0", NULL }; @@ -173,7 +173,7 @@ static int do_migration_v3_v4(sqlite3 *database) int rc; sqlite3_stmt *res = NULL; - snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%';"); + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%'"); rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to alter health_log tables"); @@ -183,7 +183,7 @@ static int do_migration_v3_v4(sqlite3 *database) while (sqlite3_step_monitored(res) == SQLITE_ROW) { char *table = strdupz((char *) sqlite3_column_text(res, 0)); if (!column_exists_in_table(database, table, "chart_context")) { - snprintfz(sql, 255, "ALTER TABLE %s ADD chart_context text", table); + snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD chart_context text", table); sqlite3_exec_monitored(database, sql, 0, 0, NULL); } freez(table); @@ -212,7 +212,7 @@ static int do_migration_v6_v7(sqlite3 *database) int rc; sqlite3_stmt *res = NULL; - snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'aclk_alert_%%';"); + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'aclk_alert_%%'"); rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to alter aclk_alert tables"); @@ -222,9 +222,9 @@ static int do_migration_v6_v7(sqlite3 *database) while (sqlite3_step_monitored(res) == SQLITE_ROW) { char *table = strdupz((char *) sqlite3_column_text(res, 0)); if (!column_exists_in_table(database, table, "filtered_alert_unique_id")) { - snprintfz(sql, 255, "ALTER TABLE %s ADD filtered_alert_unique_id", table); + snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD filtered_alert_unique_id", table); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 255, "UPDATE %s SET filtered_alert_unique_id = alert_unique_id", table); + snprintfz(sql, sizeof(sql) - 1, "UPDATE %s SET filtered_alert_unique_id = alert_unique_id", table); sqlite3_exec_monitored(database, sql, 0, 0, NULL); } freez(table); @@ -243,7 +243,7 @@ static int do_migration_v7_v8(sqlite3 *database) int rc; sqlite3_stmt *res = NULL; - snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%';"); + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%'"); rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to alter health_log tables"); @@ -253,7 +253,7 @@ static int do_migration_v7_v8(sqlite3 *database) while (sqlite3_step_monitored(res) == SQLITE_ROW) { char *table = strdupz((char *) sqlite3_column_text(res, 0)); if (!column_exists_in_table(database, table, "transition_id")) { - snprintfz(sql, 255, "ALTER TABLE %s ADD transition_id blob", table); + snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD transition_id blob", table); sqlite3_exec_monitored(database, sql, 0, 0, NULL); } freez(table); @@ -273,38 +273,38 @@ static int do_migration_v8_v9(sqlite3 *database) sqlite3_stmt *res = NULL; //create the health_log table and it's index - snprintfz(sql, 2047, "CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, " \ + snprintfz(sql, sizeof(sql) - 1, "CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, " \ "config_hash_id blob, name text, chart text, family text, recipient text, units text, exec text, " \ - "chart_context text, last_transition_id blob, UNIQUE (host_id, alarm_id)) ;"); + "chart_context text, last_transition_id blob, UNIQUE (host_id, alarm_id))"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); //TODO indexes - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, " \ + snprintfz(sql, sizeof(sql) - 1, "CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, " \ "updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, " \ "flags int, exec_run_timestamp int, delay_up_to_timestamp int, " \ "info text, exec_code int, new_status real, old_status real, delay int, " \ - "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text, host_id blob);"); + "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text, host_id blob)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_1 ON health_log_detail (unique_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_1 ON health_log_detail (unique_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_4 ON health_log_detail (health_log_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_4 ON health_log_detail (health_log_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "ALTER TABLE alert_hash ADD source text;"); + snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE alert_hash ADD source text"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS alert_hash_index ON alert_hash (hash_id);"); + snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS alert_hash_index ON alert_hash (hash_id)"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); - snprintfz(sql, 2047, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%' AND name <> 'health_log_detail';"); + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%' AND name <> 'health_log_detail'"); rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to alter health_log tables"); @@ -332,7 +332,7 @@ static int do_migration_v8_v9(sqlite3 *database) dfe_done(table); dictionary_destroy(dict_tables); - snprintfz(sql, 2047, "ALTER TABLE health_log_detail DROP COLUMN host_id;"); + snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE health_log_detail DROP COLUMN host_id"); sqlite3_exec_monitored(database, sql, 0, 0, NULL); return 0; @@ -353,7 +353,7 @@ static int do_migration_v10_v11(sqlite3 *database) return 0; } -#define MIGR_11_12_UPD_HEALTH_LOG_DETAIL "UPDATE health_log_detail SET summary = (select name from health_log where health_log_id = health_log_detail.health_log_id);" +#define MIGR_11_12_UPD_HEALTH_LOG_DETAIL "UPDATE health_log_detail SET summary = (select name from health_log where health_log_id = health_log_detail.health_log_id)" static int do_migration_v11_v12(sqlite3 *database) { int rc = 0; @@ -368,6 +368,68 @@ static int do_migration_v11_v12(sqlite3 *database) return rc; } +static int do_migration_v14_v15(sqlite3 *database) +{ + char sql[256]; + + int rc; + sqlite3_stmt *res = NULL; + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type = \"index\" AND name LIKE \"aclk_alert_index@_%%\" ESCAPE \"@\""); + rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to drop unused indices"); + return 1; + } + + BUFFER *wb = buffer_create(128, NULL); + size_t count = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + buffer_sprintf(wb, "DROP INDEX IF EXISTS %s; ", (char *)sqlite3_column_text(res, 0)); + count++; + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when dropping unused indices, rc = %d", rc); + + if (count) + (void) db_execute(database, buffer_tostring(wb)); + + buffer_free(wb); + return 0; +} + +static int do_migration_v15_v16(sqlite3 *database) +{ + char sql[256]; + + int rc; + sqlite3_stmt *res = NULL; + snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type = \"table\" AND name LIKE \"aclk_alert_%%\""); + rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to drop unused indices"); + return 1; + } + + BUFFER *wb = buffer_create(128, NULL); + size_t count = 0; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + buffer_sprintf(wb, "ANALYZE %s ; ", (char *)sqlite3_column_text(res, 0)); + count++; + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when running ANALYZE on aclk_alert_tables, rc = %d", rc); + + if (count) + (void) db_execute(database, buffer_tostring(wb)); + + buffer_free(wb); + return 0; +} + static int do_migration_v12_v13(sqlite3 *database) { int rc = 0; @@ -425,7 +487,7 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name int user_version = 0; char *err_msg = NULL; - int rc = sqlite3_exec_monitored(database, "PRAGMA user_version;", return_int_cb, (void *) &user_version, &err_msg); + int rc = sqlite3_exec_monitored(database, "PRAGMA user_version", return_int_cb, (void *) &user_version, &err_msg); if (rc != SQLITE_OK) { netdata_log_info("Error checking the %s database version; %s", db_name, err_msg); sqlite3_free(err_msg); @@ -446,7 +508,6 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name } } return target_version; - } DATABASE_FUNC_MIGRATION_LIST migration_action[] = { @@ -464,6 +525,8 @@ DATABASE_FUNC_MIGRATION_LIST migration_action[] = { {.name = "v11 to v12", .func = do_migration_v11_v12}, {.name = "v12 to v13", .func = do_migration_v12_v13}, {.name = "v13 to v14", .func = do_migration_v13_v14}, + {.name = "v14 to v15", .func = do_migration_v14_v15}, + {.name = "v15 to v16", .func = do_migration_v15_v16}, // the terminator of this array {.name = NULL, .func = NULL} }; diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index 393d6a238..e3537bf5a 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -4,7 +4,7 @@ #include "sqlite3recover.h" #include "sqlite_db_migration.h" -#define DB_METADATA_VERSION 14 +#define DB_METADATA_VERSION 16 const char *database_config[] = { "CREATE TABLE IF NOT EXISTS host(host_id BLOB PRIMARY KEY, hostname TEXT NOT NULL, " @@ -14,70 +14,77 @@ const char *database_config[] = { "memory_mode INT DEFAULT 0, abbrev_timezone TEXT DEFAULT '', utc_offset INT NOT NULL DEFAULT 0," "program_name TEXT NOT NULL DEFAULT 'unknown', program_version TEXT NOT NULL DEFAULT 'unknown', " "entries INT NOT NULL DEFAULT 0," - "health_enabled INT NOT NULL DEFAULT 0, last_connected INT NOT NULL DEFAULT 0);", + "health_enabled INT NOT NULL DEFAULT 0, last_connected INT NOT NULL DEFAULT 0)", "CREATE TABLE IF NOT EXISTS chart(chart_id blob PRIMARY KEY, host_id blob, type text, id text, name text, " "family text, context text, title text, unit text, plugin text, module text, priority int, update_every int, " - "chart_type int, memory_mode int, history_entries);", + "chart_type int, memory_mode int, history_entries)", + "CREATE TABLE IF NOT EXISTS dimension(dim_id blob PRIMARY KEY, chart_id blob, id text, name text, " - "multiplier int, divisor int , algorithm int, options text);", + "multiplier int, divisor int , algorithm int, options text)", + + "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int)", + + "CREATE INDEX IF NOT EXISTS ind_d2 on dimension (chart_id)", + + "CREATE INDEX IF NOT EXISTS ind_c3 on chart (host_id)", - "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int);", - "CREATE INDEX IF NOT EXISTS ind_d2 on dimension (chart_id);", - "CREATE INDEX IF NOT EXISTS ind_c3 on chart (host_id);", "CREATE TABLE IF NOT EXISTS chart_label(chart_id blob, source_type int, label_key text, " - "label_value text, date_created int, PRIMARY KEY (chart_id, label_key));", - "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created);", + "label_value text, date_created int, PRIMARY KEY (chart_id, label_key))", + + "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created)", + "CREATE TABLE IF NOT EXISTS alert_hash(hash_id blob PRIMARY KEY, date_updated int, alarm text, template text, " "on_key text, class text, component text, type text, os text, hosts text, lookup text, " "every text, units text, calc text, families text, plugin text, module text, charts text, green text, " "red text, warn text, crit text, exec text, to_key text, info text, delay text, options text, " "repeat text, host_labels text, p_db_lookup_dimensions text, p_db_lookup_method text, p_db_lookup_options int, " - "p_db_lookup_after int, p_db_lookup_before int, p_update_every int, source text, chart_labels text, summary text);", + "p_db_lookup_after int, p_db_lookup_before int, p_update_every int, source text, chart_labels text, summary text)", "CREATE TABLE IF NOT EXISTS host_info(host_id blob, system_key text NOT NULL, system_value text NOT NULL, " - "date_created INT, PRIMARY KEY(host_id, system_key));", + "date_created INT, PRIMARY KEY(host_id, system_key))", "CREATE TABLE IF NOT EXISTS host_label(host_id blob, source_type int, label_key text NOT NULL, " - "label_value text NOT NULL, date_created INT, PRIMARY KEY (host_id, label_key));", + "label_value text NOT NULL, date_created INT, PRIMARY KEY (host_id, label_key))", "CREATE TRIGGER IF NOT EXISTS ins_host AFTER INSERT ON host BEGIN INSERT INTO node_instance (host_id, date_created)" - " SELECT new.host_id, unixepoch() WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END;", + " SELECT new.host_id, unixepoch() WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END", "CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, " "config_hash_id blob, name text, chart text, family text, recipient text, units text, exec text, " - "chart_context text, last_transition_id blob, chart_name text, UNIQUE (host_id, alarm_id)) ;", + "chart_context text, last_transition_id blob, chart_name text, UNIQUE (host_id, alarm_id))", - "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id);", + "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id)", "CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, " "updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, " "flags int, exec_run_timestamp int, delay_up_to_timestamp int, " "info text, exec_code int, new_status real, old_status real, delay int, " - "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text);", + "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text)", - "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id);", - "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id);", - "CREATE INDEX IF NOT EXISTS health_log_d_ind_5 ON health_log_detail (health_log_id, unique_id DESC);", + "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id)", + "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id)", + "CREATE INDEX IF NOT EXISTS health_log_d_ind_9 ON health_log_detail (unique_id DESC, health_log_id)", "CREATE INDEX IF NOT EXISTS health_log_d_ind_6 on health_log_detail (health_log_id, when_key)", - "CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id);", - "CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id);", + "CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id)", + "CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id)", NULL }; const char *database_cleanup[] = { - "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart);", - "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);", - "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host);", - "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host);", - "DROP TRIGGER IF EXISTS tr_dim_del;", - "DROP INDEX IF EXISTS ind_d1;", - "DROP INDEX IF EXISTS ind_c1;", - "DROP INDEX IF EXISTS ind_c2;", - "DROP INDEX IF EXISTS alert_hash_index;", - "DROP INDEX IF EXISTS health_log_d_ind_4;", - "DROP INDEX IF EXISTS health_log_d_ind_1;", + "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart)", + "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)", + "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host)", + "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host)", + "DROP TRIGGER IF EXISTS tr_dim_del", + "DROP INDEX IF EXISTS ind_d1", + "DROP INDEX IF EXISTS ind_c1", + "DROP INDEX IF EXISTS ind_c2", + "DROP INDEX IF EXISTS alert_hash_index", + "DROP INDEX IF EXISTS health_log_d_ind_4", + "DROP INDEX IF EXISTS health_log_d_ind_1", + "DROP INDEX IF EXISTS health_log_d_ind_5", NULL }; @@ -202,42 +209,42 @@ int configure_sqlite_database(sqlite3 *database, int target_version) // https://www.sqlite.org/pragma.html#pragma_auto_vacuum // PRAGMA schema.auto_vacuum = 0 | NONE | 1 | FULL | 2 | INCREMENTAL; - snprintfz(buf, 1024, "PRAGMA auto_vacuum=%s;", config_get(CONFIG_SECTION_SQLITE, "auto vacuum", "INCREMENTAL")); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA auto_vacuum=%s", config_get(CONFIG_SECTION_SQLITE, "auto vacuum", "INCREMENTAL")); if (init_database_batch(database, list)) return 1; // https://www.sqlite.org/pragma.html#pragma_synchronous // PRAGMA schema.synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL | 3 | EXTRA; - snprintfz(buf, 1024, "PRAGMA synchronous=%s;", config_get(CONFIG_SECTION_SQLITE, "synchronous", "NORMAL")); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA synchronous=%s", config_get(CONFIG_SECTION_SQLITE, "synchronous", "NORMAL")); if (init_database_batch(database, list)) return 1; // https://www.sqlite.org/pragma.html#pragma_journal_mode // PRAGMA schema.journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF - snprintfz(buf, 1024, "PRAGMA journal_mode=%s;", config_get(CONFIG_SECTION_SQLITE, "journal mode", "WAL")); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_mode=%s", config_get(CONFIG_SECTION_SQLITE, "journal mode", "WAL")); if (init_database_batch(database, list)) return 1; // https://www.sqlite.org/pragma.html#pragma_temp_store // PRAGMA temp_store = 0 | DEFAULT | 1 | FILE | 2 | MEMORY; - snprintfz(buf, 1024, "PRAGMA temp_store=%s;", config_get(CONFIG_SECTION_SQLITE, "temp store", "MEMORY")); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA temp_store=%s", config_get(CONFIG_SECTION_SQLITE, "temp store", "MEMORY")); if (init_database_batch(database, list)) return 1; // https://www.sqlite.org/pragma.html#pragma_journal_size_limit // PRAGMA schema.journal_size_limit = N ; - snprintfz(buf, 1024, "PRAGMA journal_size_limit=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", 16777216)); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_size_limit=%lld", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", 16777216)); if (init_database_batch(database, list)) return 1; // https://www.sqlite.org/pragma.html#pragma_cache_size // PRAGMA schema.cache_size = pages; // PRAGMA schema.cache_size = -kibibytes; - snprintfz(buf, 1024, "PRAGMA cache_size=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "cache size", -2000)); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA cache_size=%lld", config_get_number(CONFIG_SECTION_SQLITE, "cache size", -2000)); if (init_database_batch(database, list)) return 1; - snprintfz(buf, 1024, "PRAGMA user_version=%d;", target_version); + snprintfz(buf, sizeof(buf) - 1, "PRAGMA user_version=%d", target_version); if (init_database_batch(database, list)) return 1; @@ -384,13 +391,13 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) int rc; if (likely(!memory)) { - snprintfz(sqlite_database, FILENAME_MAX, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir); + snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir); rc = unlink(sqlite_database); snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir); if (rc == 0 || (rebuild & DB_CHECK_RECOVER)) { char new_sqlite_database[FILENAME_MAX + 1]; - snprintfz(new_sqlite_database, FILENAME_MAX, "%s/netdata-meta-recover.db", netdata_configured_cache_dir); + snprintfz(new_sqlite_database, sizeof(new_sqlite_database) - 1, "%s/netdata-meta-recover.db", netdata_configured_cache_dir); recover_database(sqlite_database, new_sqlite_database); if (rebuild & DB_CHECK_RECOVER) return 0; @@ -410,7 +417,7 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) if (rebuild & DB_CHECK_RECLAIM_SPACE) { netdata_log_info("Reclaiming space of %s", sqlite_database); - rc = sqlite3_exec_monitored(db_meta, "VACUUM;", 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, "VACUUM", 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("Failed to execute VACUUM rc = %d (%s)", rc, err_msg); sqlite3_free(err_msg); @@ -422,6 +429,20 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) return 1; } + if (rebuild & DB_CHECK_ANALYZE) { + netdata_log_info("Running ANALYZE on %s", sqlite_database); + rc = sqlite3_exec_monitored(db_meta, "ANALYZE", 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error_report("Failed to execute ANALYZE rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + else { + (void) db_execute(db_meta, "select count(*) from sqlite_master limit 0"); + (void) sqlite3_close(db_meta); + } + return 1; + } + netdata_log_info("SQLite database %s initialization", sqlite_database); rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0); @@ -471,6 +492,9 @@ void sql_close_database(void) add_stmt_to_list(NULL); + (void) db_execute(db_meta, "PRAGMA analysis_limit=10000"); + (void) db_execute(db_meta, "PRAGMA optimize"); + rc = sqlite3_close_v2(db_meta); if (unlikely(rc != SQLITE_OK)) error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc)); @@ -511,22 +535,25 @@ int db_execute(sqlite3 *db, const char *cmd) { int rc; int cnt = 0; + while (cnt < SQL_MAX_RETRY) { char *err_msg; rc = sqlite3_exec_monitored(db, cmd, 0, 0, &err_msg); - if (rc != SQLITE_OK) { - error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt); - sqlite3_free(err_msg); - if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) { - usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); - } - else - break; - } - else + if (likely(rc == SQLITE_OK)) break; ++cnt; + error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt); + sqlite3_free(err_msg); + + if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) { + usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); + continue; + } + + if (rc == SQLITE_CORRUPT) + mark_database_to_recover(NULL, db); + break; } return (rc != SQLITE_OK); } @@ -542,7 +569,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) return; } - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!host->node_id)) { uuid_t *t = mallocz(sizeof(*host->node_id)); @@ -559,7 +586,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) uuid_unparse_lower(*node_id, wc->node_id); } -#define SQL_UPDATE_NODE_ID "update node_instance set node_id = @node_id where host_id = @host_id;" +#define SQL_UPDATE_NODE_ID "UPDATE node_instance SET node_id = @node_id WHERE host_id = @host_id" int update_node_id(uuid_t *host_id, uuid_t *node_id) { @@ -611,7 +638,7 @@ failed: return rc - 1; } -#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL;" +#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL" int get_node_id(uuid_t *host_id, uuid_t *node_id) { @@ -647,8 +674,9 @@ failed: return (rc == SQLITE_ROW) ? 0 : -1; } -#define SQL_INVALIDATE_NODE_INSTANCES "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \ - "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id));" +#define SQL_INVALIDATE_NODE_INSTANCES \ + "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \ + "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id))" void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id) { @@ -692,8 +720,9 @@ failed: error_report("Failed to finalize the prepared statement when invalidating node instance information"); } -#define SQL_GET_NODE_INSTANCE_LIST "SELECT ni.node_id, ni.host_id, h.hostname " \ - "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0;" +#define SQL_GET_NODE_INSTANCE_LIST \ + "SELECT ni.node_id, ni.host_id, h.hostname " \ + "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0" struct node_instance_list *get_node_list(void) { @@ -762,7 +791,7 @@ failed: return node_list; }; -#define SQL_GET_HOST_NODE_ID "select node_id from node_instance where host_id = @host_id;" +#define SQL_GET_HOST_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id" void sql_load_node_id(RRDHOST *host) { @@ -801,7 +830,7 @@ failed: }; -#define SELECT_HOST_INFO "SELECT system_key, system_value FROM host_info WHERE host_id = @host_id;" +#define SELECT_HOST_INFO "SELECT system_key, system_value FROM host_info WHERE host_id = @host_id" void sql_build_host_system_info(uuid_t *host_id, struct rrdhost_system_info *system_info) { @@ -832,7 +861,7 @@ skip: } #define SELECT_HOST_LABELS "SELECT label_key, label_value, source_type FROM host_label WHERE host_id = @host_id " \ - "AND label_key IS NOT NULL AND label_value IS NOT NULL;" + "AND label_key IS NOT NULL AND label_value IS NOT NULL" RRDLABELS *sql_load_host_labels(uuid_t *host_id) { @@ -888,7 +917,7 @@ int sql_metadata_cache_stats(int op) return count; } -#define SQL_DROP_TABLE "DROP table %s;" +#define SQL_DROP_TABLE "DROP table %s" void sql_drop_table(const char *table) { @@ -896,7 +925,7 @@ void sql_drop_table(const char *table) return; char wstr[255]; - snprintfz(wstr, 254, SQL_DROP_TABLE, table); + snprintfz(wstr, sizeof(wstr) - 1, SQL_DROP_TABLE, table); int rc = sqlite3_exec_monitored(db_meta, wstr, 0, 0, NULL); if (rc != SQLITE_OK) { diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 9cd1f7ad4..40b5af464 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -21,8 +21,9 @@ struct node_instance_list { typedef enum db_check_action_type { DB_CHECK_NONE = (1 << 0), DB_CHECK_RECLAIM_SPACE = (1 << 1), - DB_CHECK_CONT = (1 << 2), - DB_CHECK_RECOVER = (1 << 3), + DB_CHECK_ANALYZE = (1 << 2), + DB_CHECK_CONT = (1 << 3), + DB_CHECK_RECOVER = (1 << 4), } db_check_action_type_t; #define SQL_MAX_RETRY (100) diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c index 6fc6a2e64..7d79ff70b 100644 --- a/database/sqlite/sqlite_health.c +++ b/database/sqlite/sqlite_health.c @@ -95,18 +95,22 @@ failed: /* Health related SQL queries Inserts an entry in the table */ + #define SQL_INSERT_HEALTH_LOG \ "INSERT INTO health_log (host_id, alarm_id, " \ - "config_hash_id, name, chart, exec, recipient, units, chart_context, last_transition_id, chart_name) " \ - "VALUES (?,?,?,?,?,?,?,?,?,?,?) " \ - "ON CONFLICT (host_id, alarm_id) DO UPDATE SET last_transition_id = excluded.last_transition_id, " \ - "chart_name = excluded.chart_name RETURNING health_log_id; " - -#define SQL_INSERT_HEALTH_LOG_DETAIL \ - "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \ - "updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \ + "config_hash_id, name, chart, exec, recipient, units, chart_context, last_transition_id, chart_name) " \ + "VALUES (@host_id,@alarm_id, @config_hash_id,@name,@chart,@exec,@recipient,@units,@chart_context," \ + "@last_transition_id,@chart_name) ON CONFLICT (host_id, alarm_id) DO UPDATE " \ + "SET last_transition_id = excluded.last_transition_id, chart_name = excluded.chart_name RETURNING health_log_id" + +#define SQL_INSERT_HEALTH_LOG_DETAIL \ + "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \ + "updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \ "info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \ - "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,@global_id,?); " + "VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \ + "@non_clear_duration,@flags,@exec_run_timestamp,@delay_up_to_timestamp, @info,@exec_code,@new_status,@old_status," \ + "@delay,@new_value,@old_value,@last_repeat,@transition_id,@global_id,@summary)" + static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; int rc; @@ -353,7 +357,6 @@ static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { } ae->flags |= HEALTH_ENTRY_FLAG_SAVED; - host->health.health_log_entries_written++; failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) @@ -374,48 +377,6 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) } } -/* Health related SQL queries - Get a count of rows from health log table -*/ -#define SQL_COUNT_HEALTH_LOG_DETAIL "SELECT count(1) FROM health_log_detail hld, health_log hl " \ - "where hl.host_id = @host_id and hl.health_log_id = hld.health_log_id" - -static int sql_health_alarm_log_count(RRDHOST *host) { - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return -1; - } - - int entries_in_db = -1; - - rc = sqlite3_prepare_v2(db_meta, SQL_COUNT_HEALTH_LOG_DETAIL, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to count health log entries from db"); - goto done; - } - - rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id for SQL_COUNT_HEALTH_LOG."); - goto done; - } - - rc = sqlite3_step_monitored(res); - if (likely(rc == SQLITE_ROW)) - entries_in_db = (int) sqlite3_column_int64(res, 0); - -done: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement to count health log entries from db"); - - return entries_in_db; -} - /* * * Health related SQL queries @@ -423,19 +384,22 @@ done: * */ -#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED "DELETE FROM health_log_detail WHERE health_log_id IN " \ - "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < unixepoch() - @history " \ - "AND updated_by_id <> 0 AND transition_id NOT IN " \ - "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id);" - -#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) "DELETE from health_log_detail WHERE unique_id NOT IN " \ - "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \ - "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \ - "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \ - "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \ - "AND when_key < unixepoch() - @history " \ - "AND updated_by_id <> 0 AND transition_id NOT IN " \ - "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id);", guid +#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED \ + "DELETE FROM health_log_detail WHERE health_log_id IN " \ + "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \ + "AND updated_by_id <> 0 AND transition_id NOT IN " \ + "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)" + +#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) \ + "DELETE from health_log_detail WHERE unique_id NOT IN " \ + "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \ + "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \ + "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \ + "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \ + "AND when_key < unixepoch() - @history " \ + "AND updated_by_id <> 0 AND transition_id NOT IN " \ + "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)", \ + guid void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) { sqlite3_stmt *res = NULL; @@ -450,14 +414,14 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) { char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(command, MAX_HEALTH_SQL_SIZE, "aclk_alert_%s", uuid_str); + snprintfz(command, sizeof(command) - 1, "aclk_alert_%s", uuid_str); bool aclk_table_exists = table_exists_in_database(db_meta, command); char *sql = SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED; if (claimed && aclk_table_exists) { - snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str)); + snprintfz(command, sizeof(command) - 1, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str)); sql = command; } @@ -483,10 +447,6 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) { if (unlikely(rc != SQLITE_DONE)) error_report("Failed to cleanup health log detail table, rc = %d", rc); - int rows = sql_health_alarm_log_count(host); - if (rows >= 0) - host->health.health_log_entries_written = rows; - if (aclk_table_exists) sql_aclk_alert_clean_dead_entries(host); @@ -497,17 +457,25 @@ done: } #define SQL_INJECT_REMOVED \ - "insert into health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \ + "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \ "duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \ "delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \ - "select health_log_id, ?1, ?2, ?3, 0, ?4, unixepoch(), 0, 0, flags, exec_run_timestamp, unixepoch(), info, exec_code, -2, " \ - "new_status, delay, NULL, new_value, 0, ?5, now_usec(0), summary from health_log_detail where unique_id = ?6 and transition_id = ?7;" - -#define SQL_INJECT_REMOVED_UPDATE_DETAIL "update health_log_detail set flags = flags | ?1, updated_by_id = ?2 where unique_id = ?3 and transition_id = ?4;" - -#define SQL_INJECT_REMOVED_UPDATE_LOG "update health_log set last_transition_id = ?1 where alarm_id = ?2 and last_transition_id = ?3 and host_id = ?4;" - -void sql_inject_removed_status(RRDHOST *host, uint32_t alarm_id, uint32_t alarm_event_id, uint32_t unique_id, uint32_t max_unique_id, uuid_t *prev_transition_id) + "SELECT health_log_id, ?1, ?2, ?3, 0, ?4, UNIXEPOCH(), 0, 0, flags, exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \ + "new_status, delay, NULL, new_value, 0, ?5, NOW_USEC(0), summary FROM health_log_detail WHERE unique_id = ?6 AND transition_id = ?7" + +#define SQL_INJECT_REMOVED_UPDATE_DETAIL \ + "UPDATE health_log_detail SET flags = flags | ?1, updated_by_id = ?2 WHERE unique_id = ?3 AND transition_id = ?4" + +#define SQL_INJECT_REMOVED_UPDATE_LOG \ + "UPDATE health_log SET last_transition_id = ?1 WHERE alarm_id = ?2 AND last_transition_id = ?3 AND host_id = ?4" + +void sql_inject_removed_status( + RRDHOST *host, + uint32_t alarm_id, + uint32_t alarm_event_id, + uint32_t unique_id, + uint32_t max_unique_id, + uuid_t *prev_transition_id) { int rc; @@ -737,13 +705,14 @@ void sql_check_removed_alerts_state(RRDHOST *host) /* Health related SQL queries Load from the health log table */ -#define SQL_LOAD_HEALTH_LOG "SELECT hld.unique_id, hld.alarm_id, hld.alarm_event_id, hl.config_hash_id, hld.updated_by_id, " \ - "hld.updates_id, hld.when_key, hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, " \ - "hld.delay_up_to_timestamp, hl.name, hl.chart, hl.exec, hl.recipient, ah.source, hl.units, " \ - "hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \ - "hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.global_id, hl.chart_name, hld.summary " \ - "FROM health_log hl, alert_hash ah, health_log_detail hld " \ - "WHERE hl.config_hash_id = ah.hash_id and hl.host_id = @host_id and hl.last_transition_id = hld.transition_id;" +#define SQL_LOAD_HEALTH_LOG \ + "SELECT hld.unique_id, hld.alarm_id, hld.alarm_event_id, hl.config_hash_id, hld.updated_by_id, " \ + "hld.updates_id, hld.when_key, hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, " \ + "hld.delay_up_to_timestamp, hl.name, hl.chart, hl.exec, hl.recipient, ah.source, hl.units, " \ + "hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \ + "hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.global_id, " \ + "hl.chart_name, hld.summary FROM health_log hl, alert_hash ah, health_log_detail hld " \ + "WHERE hl.config_hash_id = ah.hash_id and hl.host_id = @host_id and hl.last_transition_id = hld.transition_id" void sql_health_alarm_log_load(RRDHOST *host) { @@ -751,8 +720,6 @@ void sql_health_alarm_log_load(RRDHOST *host) int ret; ssize_t errored = 0, loaded = 0; - host->health.health_log_entries_written = 0; - if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host)); @@ -914,27 +881,28 @@ void sql_health_alarm_log_load(RRDHOST *host) if (unlikely(!host->health_log.next_alarm_id || host->health_log.next_alarm_id <= host->health_max_alarm_id)) host->health_log.next_alarm_id = host->health_max_alarm_id + 1; - netdata_log_health("[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.", rrdhost_hostname(host), loaded, errored); + nd_log(NDLS_DAEMON, errored ? NDLP_WARNING : NDLP_DEBUG, + "[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.", + rrdhost_hostname(host), loaded, errored); ret = sqlite3_finalize(res); if (unlikely(ret != SQLITE_OK)) error_report("Failed to finalize the health log read statement"); - - int rows = sql_health_alarm_log_count(host); - - if (rows >= 0) - host->health.health_log_entries_written = rows; } /* * Store an alert config hash in the database */ -#define SQL_STORE_ALERT_CONFIG_HASH "insert or replace into alert_hash (hash_id, date_updated, alarm, template, " \ - "on_key, class, component, type, os, hosts, lookup, every, units, calc, plugin, module, " \ - "charts, green, red, warn, crit, exec, to_key, info, delay, options, repeat, host_labels, " \ - "p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, " \ - "p_db_lookup_before, p_update_every, source, chart_labels, summary) values (?1,unixepoch(),?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12," \ - "?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,?29,?30,?31,?32,?33,?34,?35,?36);" +#define SQL_STORE_ALERT_CONFIG_HASH \ + "insert or replace into alert_hash (hash_id, date_updated, alarm, template, " \ + "on_key, class, component, type, os, hosts, lookup, every, units, calc, plugin, module, " \ + "charts, green, red, warn, crit, exec, to_key, info, delay, options, repeat, host_labels, " \ + "p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, " \ + "p_db_lookup_before, p_update_every, source, chart_labels, summary) values (@hash_id,UNIXEPOCH(),@alarm,@template," \ + "@on_key,@class,@component,@type,@os,@hosts,@lookup,@every,@units,@calc,@plugin,@module," \ + "@charts,@green,@red,@warn,@crit,@exec,@to_key,@info,@delay,@options,@repeat,@host_labels," \ + "@p_db_lookup_dimensions,@p_db_lookup_method,@p_db_lookup_options,@p_db_lookup_after," \ + "@p_db_lookup_before,@p_update_every,@source,@chart_labels,@summary)" int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg) { @@ -1212,7 +1180,7 @@ int alert_hash_and_store_config( #define SQL_SELECT_HEALTH_LAST_EXECUTED_EVENT \ "SELECT hld.new_status FROM health_log hl, health_log_detail hld " \ "WHERE hl.host_id = @host_id AND hl.alarm_id = @alarm_id AND hld.unique_id != @unique_id AND hld.flags & @flags " \ - "AND hl.health_log_id = hld.health_log_id ORDER BY hld.unique_id DESC LIMIT 1;" + "AND hl.health_log_id = hld.health_log_id ORDER BY hld.unique_id DESC LIMIT 1" int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status) { @@ -1270,191 +1238,162 @@ done: "hl.units, hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \ "hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.summary " \ "FROM health_log hl, alert_hash ah, health_log_detail hld WHERE hl.config_hash_id = ah.hash_id and " \ - "hl.health_log_id = hld.health_log_id and hl.host_id = @host_id " + "hl.health_log_id = hld.health_log_id and hl.host_id = @host_id AND hld.unique_id > @after " -void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, uint32_t after, char *chart) { +void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart) +{ + unsigned int max = host->health_log.max; - buffer_strcat(wb, "["); + static __thread sqlite3_stmt *stmt_no_chart = NULL; + static __thread sqlite3_stmt *stmt_with_chart = NULL; - unsigned int max = host->health_log.max; - unsigned int count = 0; + sqlite3_stmt **active_stmt; + sqlite3_stmt *stmt_query; - sqlite3_stmt *res = NULL; - int rc; + int rc; - BUFFER *command = buffer_create(MAX_HEALTH_SQL_SIZE, NULL); - buffer_sprintf(command, SQL_SELECT_HEALTH_LOG); + active_stmt = chart ? &stmt_with_chart : &stmt_no_chart; - if (chart) { - char chart_sql[MAX_HEALTH_SQL_SIZE + 1]; - snprintfz(chart_sql, MAX_HEALTH_SQL_SIZE, "AND hl.chart = '%s' ", chart); - buffer_strcat(command, chart_sql); - } + if (!*active_stmt) { - if (after) { - char after_sql[MAX_HEALTH_SQL_SIZE + 1]; - snprintfz(after_sql, MAX_HEALTH_SQL_SIZE, "AND hld.unique_id > %u ", after); - buffer_strcat(command, after_sql); - } + BUFFER *command = buffer_create(MAX_HEALTH_SQL_SIZE, NULL); + buffer_sprintf(command, SQL_SELECT_HEALTH_LOG); - { - char limit_sql[MAX_HEALTH_SQL_SIZE + 1]; - snprintfz(limit_sql, MAX_HEALTH_SQL_SIZE, "ORDER BY hld.unique_id DESC LIMIT %u ", max); - buffer_strcat(command, limit_sql); - } + if (chart) + buffer_strcat(command, " AND hl.chart = @chart "); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(command), -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement SQL_SELECT_HEALTH_LOG"); - buffer_free(command); - return; - } + buffer_strcat(command, " ORDER BY hld.unique_id DESC LIMIT @limit"); - rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id for SQL_SELECT_HEALTH_LOG."); - sqlite3_finalize(res); - buffer_free(command); - return; - } + rc = prepare_statement(db_meta, buffer_tostring(command), active_stmt); + buffer_free(command); - while (sqlite3_step(res) == SQLITE_ROW) { + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement SQL_SELECT_HEALTH_LOG"); + return; + } + } - char old_value_string[100 + 1]; - char new_value_string[100 + 1]; + stmt_query = *active_stmt; - char config_hash_id[UUID_STR_LEN]; - uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), config_hash_id); + int param = 0; + rc = sqlite3_bind_blob(stmt_query, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id for SQL_SELECT_HEALTH_LOG."); + goto finish; + } - char transition_id[UUID_STR_LEN] = {0}; - if (sqlite3_column_type(res, 30) != SQLITE_NULL) - uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 30)), transition_id); - - char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : strdupz("UNKNOWN=0=UNKNOWN"); - - if (count) - buffer_sprintf(wb, ","); - - count++; - - buffer_sprintf( - wb, - "\n\t{\n" - "\t\t\"hostname\": \"%s\",\n" - "\t\t\"utc_offset\": %d,\n" - "\t\t\"timezone\": \"%s\",\n" - "\t\t\"unique_id\": %u,\n" - "\t\t\"alarm_id\": %u,\n" - "\t\t\"alarm_event_id\": %u,\n" - "\t\t\"config_hash_id\": \"%s\",\n" - "\t\t\"transition_id\": \"%s\",\n" - "\t\t\"name\": \"%s\",\n" - "\t\t\"chart\": \"%s\",\n" - "\t\t\"context\": \"%s\",\n" - "\t\t\"class\": \"%s\",\n" - "\t\t\"component\": \"%s\",\n" - "\t\t\"type\": \"%s\",\n" - "\t\t\"processed\": %s,\n" - "\t\t\"updated\": %s,\n" - "\t\t\"exec_run\": %lu,\n" - "\t\t\"exec_failed\": %s,\n" - "\t\t\"exec\": \"%s\",\n" - "\t\t\"recipient\": \"%s\",\n" - "\t\t\"exec_code\": %d,\n" - "\t\t\"source\": \"%s\",\n" - "\t\t\"command\": \"%s\",\n" - "\t\t\"units\": \"%s\",\n" - "\t\t\"when\": %lu,\n" - "\t\t\"duration\": %lu,\n" - "\t\t\"non_clear_duration\": %lu,\n" - "\t\t\"status\": \"%s\",\n" - "\t\t\"old_status\": \"%s\",\n" - "\t\t\"delay\": %d,\n" - "\t\t\"delay_up_to_timestamp\": %lu,\n" - "\t\t\"updated_by_id\": %u,\n" - "\t\t\"updates_id\": %u,\n" - "\t\t\"value_string\": \"%s\",\n" - "\t\t\"old_value_string\": \"%s\",\n" - "\t\t\"last_repeat\": %lu,\n" - "\t\t\"silenced\": \"%s\",\n", - rrdhost_hostname(host), - host->utc_offset, - rrdhost_abbrev_timezone(host), - (unsigned int) sqlite3_column_int64(res, 0), - (unsigned int) sqlite3_column_int64(res, 1), - (unsigned int) sqlite3_column_int64(res, 2), - config_hash_id, - transition_id, - sqlite3_column_text(res, 12), - sqlite3_column_text(res, 13), - sqlite3_column_text(res, 29), - sqlite3_column_text(res, 26) ? (const char *) sqlite3_column_text(res, 26) : (char *) "Unknown", - sqlite3_column_text(res, 27) ? (const char *) sqlite3_column_text(res, 27) : (char *) "Unknown", - sqlite3_column_text(res, 28) ? (const char *) sqlite3_column_text(res, 28) : (char *) "Unknown", - (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_PROCESSED)?"true":"false", - (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_UPDATED)?"true":"false", - (long unsigned int)sqlite3_column_int64(res, 10), - (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_EXEC_FAILED)?"true":"false", - sqlite3_column_text(res, 14) ? (const char *) sqlite3_column_text(res, 14) : string2str(host->health.health_default_exec), - sqlite3_column_text(res, 15) ? (const char *) sqlite3_column_text(res, 15) : string2str(host->health.health_default_recipient), - sqlite3_column_int(res, 19), - sqlite3_column_text(res, 16) ? (const char *) sqlite3_column_text(res, 16) : (char *) "Unknown", - edit_command, - sqlite3_column_text(res, 17), - (long unsigned int)sqlite3_column_int64(res, 6), - (long unsigned int)sqlite3_column_int64(res, 7), - (long unsigned int)sqlite3_column_int64(res, 8), - rrdcalc_status2string(sqlite3_column_int(res, 20)), - rrdcalc_status2string(sqlite3_column_int(res, 21)), - sqlite3_column_int(res, 22), - (long unsigned int)sqlite3_column_int64(res, 11), - (unsigned int)sqlite3_column_int64(res, 4), - (unsigned int)sqlite3_column_int64(res, 5), - sqlite3_column_type(res, 23) == SQLITE_NULL ? "-" : format_value_and_unit(new_value_string, 100, sqlite3_column_double(res, 23), (char *) sqlite3_column_text(res, 17), -1), - sqlite3_column_type(res, 24) == SQLITE_NULL ? "-" : format_value_and_unit(old_value_string, 100, sqlite3_column_double(res, 24), (char *) sqlite3_column_text(res, 17), -1), - (long unsigned int)sqlite3_column_int64(res, 25), - (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_SILENCED)?"true":"false"); - - health_string2json(wb, "\t\t", "summary", (char *) sqlite3_column_text(res, 31), ",\n"); - health_string2json(wb, "\t\t", "info", (char *) sqlite3_column_text(res, 18), ",\n"); - - if(unlikely(sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION)) { - buffer_strcat(wb, "\t\t\"no_clear_notification\": true,\n"); - } + rc = sqlite3_bind_int64(stmt_query, ++param, after); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind after for SQL_SELECT_HEALTH_LOG."); + goto finish; + } - buffer_strcat(wb, "\t\t\"value\":"); - if (sqlite3_column_type(res, 23) == SQLITE_NULL) - buffer_strcat(wb, "null"); - else - buffer_print_netdata_double(wb, sqlite3_column_double(res, 23)); - buffer_strcat(wb, ",\n"); + if (chart) { + rc = sqlite3_bind_text(stmt_query, ++param, chart, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind after for SQL_SELECT_HEALTH_LOG."); + goto finish; + } + } - buffer_strcat(wb, "\t\t\"old_value\":"); - if (sqlite3_column_type(res, 24) == SQLITE_NULL) - buffer_strcat(wb, "null"); + rc = sqlite3_bind_int64(stmt_query, ++param, max); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind max lines for SQL_SELECT_HEALTH_LOG."); + goto finish; + } + + buffer_json_initialize(wb, "\"", "\"", 0, false, BUFFER_JSON_OPTIONS_DEFAULT); + buffer_json_member_add_array(wb, NULL); + + while (sqlite3_step(stmt_query) == SQLITE_ROW) { + char old_value_string[100 + 1]; + char new_value_string[100 + 1]; + + char config_hash_id[UUID_STR_LEN]; + uuid_unparse_lower(*((uuid_t *)sqlite3_column_blob(stmt_query, 3)), config_hash_id); + + char transition_id[UUID_STR_LEN] = {0}; + if (sqlite3_column_type(stmt_query, 30) != SQLITE_NULL) + uuid_unparse_lower(*((uuid_t *)sqlite3_column_blob(stmt_query, 30)), transition_id); + + char *edit_command = sqlite3_column_bytes(stmt_query, 16) > 0 ? + health_edit_command_from_source((char *)sqlite3_column_text(stmt_query, 16)) : + strdupz("UNKNOWN=0=UNKNOWN"); + + buffer_json_add_array_item_object(wb); // this node + + buffer_json_member_add_string_or_empty(wb, "hostname", rrdhost_hostname(host)); + buffer_json_member_add_int64(wb, "utc_offset", (int64_t)host->utc_offset); + buffer_json_member_add_string_or_empty(wb, "timezone", rrdhost_abbrev_timezone(host)); + buffer_json_member_add_int64(wb, "unique_id", (int64_t) sqlite3_column_int64(stmt_query, 0)); + buffer_json_member_add_int64(wb, "alarm_id", (int64_t) sqlite3_column_int64(stmt_query, 1)); + buffer_json_member_add_int64(wb, "alarm_event_id", (int64_t) sqlite3_column_int64(stmt_query, 2)); + buffer_json_member_add_string_or_empty(wb, "config_hash_id", config_hash_id); + buffer_json_member_add_string_or_empty(wb, "transition_id", transition_id); + buffer_json_member_add_string_or_empty(wb, "name", (const char *) sqlite3_column_text(stmt_query, 12)); + buffer_json_member_add_string_or_empty(wb, "chart", (const char *) sqlite3_column_text(stmt_query, 13)); + buffer_json_member_add_string_or_empty(wb, "context", (const char *) sqlite3_column_text(stmt_query, 29)); + buffer_json_member_add_string_or_empty(wb, "class", sqlite3_column_text(stmt_query, 26) ? (const char *) sqlite3_column_text(stmt_query, 26) : (char *) "Unknown"); + buffer_json_member_add_string_or_empty(wb, "component", sqlite3_column_text(stmt_query, 27) ? (const char *) sqlite3_column_text(stmt_query, 27) : (char *) "Unknown"); + buffer_json_member_add_string_or_empty(wb, "type", sqlite3_column_text(stmt_query, 28) ? (const char *) sqlite3_column_text(stmt_query, 28) : (char *) "Unknown"); + buffer_json_member_add_boolean(wb, "processed", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_PROCESSED)); + buffer_json_member_add_boolean(wb, "updated", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_UPDATED)); + buffer_json_member_add_int64(wb, "exec_run", (int64_t)sqlite3_column_int64(stmt_query, 10)); + buffer_json_member_add_boolean(wb, "exec_failed", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_EXEC_FAILED)); + buffer_json_member_add_string_or_empty(wb, "exec", sqlite3_column_text(stmt_query, 14) ? (const char *) sqlite3_column_text(stmt_query, 14) : string2str(host->health.health_default_exec)); + buffer_json_member_add_string_or_empty(wb, "recipient", sqlite3_column_text(stmt_query, 15) ? (const char *) sqlite3_column_text(stmt_query, 15) : string2str(host->health.health_default_recipient)); + buffer_json_member_add_int64(wb, "exec_code", sqlite3_column_int(stmt_query, 19)); + buffer_json_member_add_string_or_empty(wb, "source", sqlite3_column_text(stmt_query, 16) ? (const char *) sqlite3_column_text(stmt_query, 16) : (char *) "Unknown"); + buffer_json_member_add_string_or_empty(wb, "command", edit_command); + buffer_json_member_add_string_or_empty(wb, "units", (const char *) sqlite3_column_text(stmt_query, 17)); + buffer_json_member_add_int64(wb, "when", (int64_t)sqlite3_column_int64(stmt_query, 6)); + buffer_json_member_add_int64(wb, "duration", (int64_t)sqlite3_column_int64(stmt_query, 7)); + buffer_json_member_add_int64(wb, "non_clear_duration", (int64_t)sqlite3_column_int64(stmt_query, 8)); + buffer_json_member_add_string_or_empty(wb, "status", rrdcalc_status2string(sqlite3_column_int(stmt_query, 20))); + buffer_json_member_add_string_or_empty(wb, "old_status", rrdcalc_status2string(sqlite3_column_int(stmt_query, 21))); + buffer_json_member_add_int64(wb, "delay", sqlite3_column_int(stmt_query, 22)); + buffer_json_member_add_int64(wb, "delay_up_to_timestamp",(int64_t)sqlite3_column_int64(stmt_query, 11)); + buffer_json_member_add_int64(wb, "updated_by_id", (unsigned int)sqlite3_column_int64(stmt_query, 4)); + buffer_json_member_add_int64(wb, "updates_id", (unsigned int)sqlite3_column_int64(stmt_query, 5)); + buffer_json_member_add_string_or_empty(wb, "value_string", sqlite3_column_type(stmt_query, 23) == SQLITE_NULL ? "-" : + format_value_and_unit(new_value_string, 100, sqlite3_column_double(stmt_query, 23), (char *) sqlite3_column_text(stmt_query, 17), -1)); + buffer_json_member_add_string_or_empty(wb, "old_value_string", sqlite3_column_type(stmt_query, 24) == SQLITE_NULL ? "-" : + format_value_and_unit(old_value_string, 100, sqlite3_column_double(stmt_query, 24), (char *) sqlite3_column_text(stmt_query, 17), -1)); + buffer_json_member_add_int64(wb, "last_repeat", (int64_t)sqlite3_column_int64(stmt_query, 25)); + buffer_json_member_add_boolean(wb, "silenced", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_SILENCED)); + buffer_json_member_add_string_or_empty(wb, "summary", (const char *) sqlite3_column_text(stmt_query, 31)); + buffer_json_member_add_string_or_empty(wb, "info", (const char *) sqlite3_column_text(stmt_query, 18)); + buffer_json_member_add_boolean(wb, "no_clear_notification",(sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION)); + + if (sqlite3_column_type(stmt_query, 23) == SQLITE_NULL) + buffer_json_member_add_string(wb, "value", NULL); else - buffer_print_netdata_double(wb, sqlite3_column_double(res, 24)); - buffer_strcat(wb, "\n"); + buffer_json_member_add_double(wb, "value", sqlite3_column_double(stmt_query, 23)); - buffer_strcat(wb, "\t}"); + if (sqlite3_column_type(stmt_query, 24) == SQLITE_NULL) + buffer_json_member_add_string(wb, "old_value", NULL); + else + buffer_json_member_add_double(wb, "old_value", sqlite3_column_double(stmt_query, 23)); freez(edit_command); + + buffer_json_object_close(wb); } - buffer_strcat(wb, "\n]"); + buffer_json_array_close(wb); + buffer_json_finalize(wb); - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement for SQL_SELECT_HEALTH_LOG"); - - buffer_free(command); +finish: + rc = sqlite3_reset(stmt_query); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement for SQL_SELECT_HEALTH_LOG"); } -#define SQL_COPY_HEALTH_LOG(table) "INSERT OR IGNORE INTO health_log (host_id, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context) SELECT ?1, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context from %s;", table -#define SQL_COPY_HEALTH_LOG_DETAIL(table) "INSERT INTO health_log_detail (unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, host_id) SELECT unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, now_usec(1), ?1 from %s;", table -#define SQL_UPDATE_HEALTH_LOG_DETAIL_TRANSITION_ID "update health_log_detail set transition_id = uuid_random() where transition_id is null;" -#define SQL_UPDATE_HEALTH_LOG_DETAIL_HEALTH_LOG_ID "update health_log_detail set health_log_id = (select health_log_id from health_log where host_id = ?1 and alarm_id = health_log_detail.alarm_id) where health_log_id is null and host_id = ?2;" -#define SQL_UPDATE_HEALTH_LOG_LAST_TRANSITION_ID "update health_log set last_transition_id = (select transition_id from health_log_detail where health_log_id = health_log.health_log_id and alarm_id = health_log.alarm_id group by (alarm_id) having max(alarm_event_id)) where host_id = ?1;" +#define SQL_COPY_HEALTH_LOG(table) "INSERT OR IGNORE INTO health_log (host_id, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context) SELECT ?1, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context from %s", table +#define SQL_COPY_HEALTH_LOG_DETAIL(table) "INSERT INTO health_log_detail (unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, host_id) SELECT unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, now_usec(1), ?1 from %s", table +#define SQL_UPDATE_HEALTH_LOG_DETAIL_TRANSITION_ID "update health_log_detail set transition_id = uuid_random() where transition_id is null" +#define SQL_UPDATE_HEALTH_LOG_DETAIL_HEALTH_LOG_ID "update health_log_detail set health_log_id = (select health_log_id from health_log where host_id = ?1 and alarm_id = health_log_detail.alarm_id) where health_log_id is null and host_id = ?2" +#define SQL_UPDATE_HEALTH_LOG_LAST_TRANSITION_ID "update health_log set last_transition_id = (select transition_id from health_log_detail where health_log_id = health_log.health_log_id and alarm_id = health_log.alarm_id group by (alarm_id) having max(alarm_event_id)) where host_id = ?1" int health_migrate_old_health_log_table(char *table) { if (!table) return 0; @@ -1476,7 +1415,7 @@ int health_migrate_old_health_log_table(char *table) { int rc; char command[MAX_HEALTH_SQL_SIZE + 1]; sqlite3_stmt *res = NULL; - snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_COPY_HEALTH_LOG(table)); + snprintfz(command, sizeof(command) - 1, SQL_COPY_HEALTH_LOG(table)); rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to prepare statement to copy health log, rc = %d", rc); @@ -1503,7 +1442,7 @@ int health_migrate_old_health_log_table(char *table) { } //detail - snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_COPY_HEALTH_LOG_DETAIL(table)); + snprintfz(command, sizeof(command) - 1, SQL_COPY_HEALTH_LOG_DETAIL(table)); rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to prepare statement to copy health log detail, rc = %d", rc); @@ -1913,12 +1852,12 @@ void sql_alert_transitions( goto run_query; } - snprintfz(sql, 511, SQL_BUILD_ALERT_TRANSITION, nodes); + snprintfz(sql, sizeof(sql) - 1, SQL_BUILD_ALERT_TRANSITION, nodes); rc = db_execute(db_meta, sql); if (rc) return; - snprintfz(sql, 511, SQL_POPULATE_TEMP_ALERT_TRANSITION_TABLE, nodes); + snprintfz(sql, sizeof(sql) - 1, SQL_POPULATE_TEMP_ALERT_TRANSITION_TABLE, nodes); // Prepare statement to add things rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); @@ -2046,7 +1985,7 @@ done: done_only_drop: if (likely(!transition)) { - (void)snprintfz(sql, 511, "DROP TABLE IF EXISTS v_%p", nodes); + (void)snprintfz(sql, sizeof(sql) - 1, "DROP TABLE IF EXISTS v_%p", nodes); (void)db_execute(db_meta, sql); buffer_free(command); } @@ -2078,12 +2017,12 @@ int sql_get_alert_configuration( if (unlikely(!configs)) return added; - snprintfz(sql, 511, SQL_BUILD_CONFIG_TARGET_LIST, configs); + snprintfz(sql, sizeof(sql) - 1, SQL_BUILD_CONFIG_TARGET_LIST, configs); rc = db_execute(db_meta, sql); if (rc) return added; - snprintfz(sql, 511, SQL_POPULATE_TEMP_CONFIG_TARGET_TABLE, configs); + snprintfz(sql, sizeof(sql) - 1, SQL_POPULATE_TEMP_CONFIG_TARGET_TABLE, configs); // Prepare statement to add things rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); @@ -2180,7 +2119,7 @@ int sql_get_alert_configuration( error_report("Failed to finalize statement for sql_get_alert_configuration"); fail_only_drop: - (void)snprintfz(sql, 511, "DROP TABLE IF EXISTS c_%p", configs); + (void)snprintfz(sql, sizeof(sql) - 1, "DROP TABLE IF EXISTS c_%p", configs); (void)db_execute(db_meta, sql); buffer_free(command); return added; diff --git a/database/sqlite/sqlite_health.h b/database/sqlite/sqlite_health.h index e21912368..5549b7525 100644 --- a/database/sqlite/sqlite_health.h +++ b/database/sqlite/sqlite_health.h @@ -13,7 +13,7 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed); int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg, int store_hash); void sql_aclk_alert_clean_dead_entries(RRDHOST *host); int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status); -void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, uint32_t after, char *chart); +void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart); int health_migrate_old_health_log_table(char *table); uint32_t sql_get_alarm_id(RRDHOST *host, STRING *chart, STRING *name, uint32_t *next_event_id, uuid_t *config_hash_id); uint32_t sql_get_alarm_id_check_zero_hash(RRDHOST *host, STRING *chart, STRING *name, uint32_t *next_event_id, uuid_t *config_hash_id); diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 143783163..636f51966 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -4,11 +4,12 @@ // SQL statements -#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \ - "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \ - "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;" +#define SQL_STORE_CLAIM_ID \ + "INSERT INTO node_instance " \ + "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, UNIXEPOCH()) " \ + "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id" -#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" +#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid" #define STORE_HOST_LABEL \ "INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES " @@ -18,13 +19,13 @@ #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())" -#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;" +#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid" #define SQL_STORE_HOST_INFO \ "INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \ "memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \ "VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \ - "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);" + "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected)" #define SQL_STORE_CHART \ "INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \ @@ -51,11 +52,10 @@ "(@uuid, @name, @value, UNIXEPOCH())" #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \ - "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);" -#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;" -#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);" + "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0)" +#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id" +#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)" -#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds #define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work #define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health @@ -81,10 +81,10 @@ enum metadata_opcode { METADATA_ADD_HOST_INFO, METADATA_SCAN_HOSTS, METADATA_LOAD_HOST_CONTEXT, + METADATA_DELETE_HOST_CHART_LABELS, METADATA_MAINTENANCE, METADATA_SYNC_SHUTDOWN, METADATA_UNITTEST, - METADATA_ML_LOAD_MODELS, // leave this last // we need it to check for worker utilization METADATA_MAX_ENUMERATIONS_DEFINED @@ -98,14 +98,9 @@ struct metadata_cmd { struct metadata_cmd *prev, *next; }; -struct metadata_database_cmdqueue { - struct metadata_cmd *cmd_base; -}; - typedef enum { METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down - METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress } METADATA_FLAG; struct metadata_wc { @@ -114,19 +109,20 @@ struct metadata_wc { uv_async_t async; uv_timer_t timer_req; time_t metadata_check_after; - volatile unsigned queue_size; METADATA_FLAG flags; - struct completion init_complete; + struct completion start_stop_complete; struct completion *scan_complete; /* FIFO command queue */ - uv_mutex_t cmd_mutex; - struct metadata_database_cmdqueue cmd_queue; + SPINLOCK cmd_queue_lock; + struct metadata_cmd *cmd_base; }; #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag)) #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST) #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST) +struct metadata_wc metasync_worker = {.loop = NULL}; + // // For unittest // @@ -146,6 +142,33 @@ struct query_build { char uuid_str[UUID_STR_LEN]; }; +#define SQL_DELETE_CHART_LABELS_BY_HOST \ + "DELETE FROM chart_label WHERE chart_id in (SELECT chart_id FROM chart WHERE host_id = @host_id)" + +static void delete_host_chart_labels(uuid_t *host_uuid) +{ + sqlite3_stmt *res = NULL; + + int rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABELS_BY_HOST, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to delete chart labels by host"); + return; + } + + rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to host chart labels"); + goto failed; + } + rc = sqlite3_step_monitored(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to execute command to remove host chart labels"); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize statement to remove host chart labels"); +} + static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { struct query_build *lb = data; if (unlikely(!lb->count)) @@ -168,8 +191,8 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value return 1; } -#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;" -#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;" +#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id" +#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id" static void clean_old_chart_labels(RRDSET *st) { @@ -177,9 +200,9 @@ static void clean_old_chart_labels(RRDSET *st) time_t first_time_s = rrdset_first_entry_s(st); if (unlikely(!first_time_s)) - snprintfz(sql, 511,SQL_DELETE_CHART_LABEL); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL); else - snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); int rc = exec_statement_with_uuid(sql, &st->chart_uuid); if (unlikely(rc)) @@ -873,7 +896,7 @@ static void check_dimension_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_DIM_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -940,7 +963,7 @@ static void check_chart_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_CHART_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -1009,7 +1032,7 @@ static void check_label_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -1059,21 +1082,15 @@ static void cleanup_health_log(struct metadata_wc *wc) // EVENT LOOP STARTS HERE // -static void metadata_init_cmd_queue(struct metadata_wc *wc) -{ - wc->cmd_queue.cmd_base = NULL; - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); -} - static void metadata_free_cmd_queue(struct metadata_wc *wc) { - uv_mutex_lock(&wc->cmd_mutex); - while(wc->cmd_queue.cmd_base) { - struct metadata_cmd *t = wc->cmd_queue.cmd_base; - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + spinlock_lock(&wc->cmd_queue_lock); + while(wc->cmd_base) { + struct metadata_cmd *t = wc->cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); freez(t); } - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_unlock(&wc->cmd_queue_lock); } static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) @@ -1090,9 +1107,9 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) *t = *cmd; t->prev = t->next = NULL; - uv_mutex_lock(&wc->cmd_mutex); - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_lock(&wc->cmd_queue_lock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_base, t, prev, next); + spinlock_unlock(&wc->cmd_queue_lock); wakeup_event_loop: (void) uv_async_send(&wc->async); @@ -1102,10 +1119,10 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { struct metadata_cmd ret; - uv_mutex_lock(&wc->cmd_mutex); - if(wc->cmd_queue.cmd_base) { - struct metadata_cmd *t = wc->cmd_queue.cmd_base; - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + spinlock_lock(&wc->cmd_queue_lock); + if(wc->cmd_base) { + struct metadata_cmd *t = wc->cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); ret = *t; freez(t); } @@ -1113,7 +1130,7 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) ret.opcode = METADATA_DATABASE_NOOP; ret.completion = NULL; } - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_unlock(&wc->cmd_queue_lock); return ret; } @@ -1136,9 +1153,7 @@ static void timer_cb(uv_timer_t* handle) struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - time_t now = now_realtime_sec(); - - if (wc->metadata_check_after && wc->metadata_check_after < now) { + if (wc->metadata_check_after < now_realtime_sec()) { cmd.opcode = METADATA_SCAN_HOSTS; metadata_enq_cmd(wc, &cmd); } @@ -1158,10 +1173,10 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int if (free_pages > (total_pages * threshold / 100)) { int do_free_pages = (int) (free_pages * vacuum_pc / 100); - netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s: Freeing %d database pages", db_alias, do_free_pages); char sql[128]; - snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages); + snprintfz(sql, sizeof(sql) - 1, "PRAGMA incremental_vacuum(%d)", do_free_pages); (void) db_execute(database, sql); } } @@ -1184,16 +1199,10 @@ void run_metadata_cleanup(struct metadata_wc *wc) (void) sqlite3_wal_checkpoint(db_meta, NULL); } -struct ml_model_payload { - uv_work_t request; - struct metadata_wc *wc; - Pvoid_t JudyL; - size_t count; -}; - struct scan_metadata_payload { uv_work_t request; struct metadata_wc *wc; + void *data; BUFFER *work_buffer; uint32_t max_count; }; @@ -1271,7 +1280,7 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) register_libuv_worker_jobs(); struct scan_metadata_payload *data = req->data; - UNUSED(data); + struct metadata_wc *wc = data->wc; worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD); usec_t started_ut = now_monotonic_usec(); (void)started_ut; @@ -1279,6 +1288,9 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) RRDHOST *host; size_t max_threads = MIN(get_netdata_cpus() / 2, 6); + if (max_threads < 1) + max_threads = 1; + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads); struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt)); size_t thread_index; @@ -1290,25 +1302,28 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host)); - cleanup_finished_threads(hclt, max_threads, false); - bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + bool found_slot = false; + do { + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; - if (unlikely(!found_slot)) { - struct host_context_load_thread hclt_sync = {.host = host}; - restore_host_context(&hclt_sync); - } - else { - __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); - hclt[thread_index].host = host; - assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); - } + cleanup_finished_threads(hclt, max_threads, false); + found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + } while (!found_slot); + + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; + + __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); + hclt[thread_index].host = host; + fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); } dfe_done(host); cleanup_finished_threads(hclt, max_threads, true); freez(hclt); usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; - internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); worker_is_idle(); } @@ -1335,6 +1350,10 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans bool more_to_do = false; uint32_t scan_count = 1; + sqlite3_stmt *ml_load_stmt = NULL; + + bool load_ml_models = max_count; + if (use_transaction) (void)db_execute(db_meta, "BEGIN TRANSACTION"); @@ -1379,6 +1398,14 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans rrdhost_hostname(host), rrdset_name(st), rrddim_name(rd)); } + + if(rrddim_flag_check(rd, RRDDIM_FLAG_ML_MODEL_LOAD)) { + rrddim_flag_clear(rd, RRDDIM_FLAG_ML_MODEL_LOAD); + if (likely(load_ml_models)) + (void) ml_dimension_load_models(rd, &ml_load_stmt); + } + + worker_is_idle(); } rrddim_foreach_done(rd); } @@ -1387,6 +1414,11 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans if (use_transaction) (void)db_execute(db_meta, "COMMIT TRANSACTION"); + if (ml_load_stmt) { + sqlite3_finalize(ml_load_stmt); + ml_load_stmt = NULL; + } + return more_to_do; } @@ -1411,6 +1443,11 @@ static void store_host_and_system_info(RRDHOST *host, size_t *query_counter) } } +struct host_chart_label_cleanup { + Pvoid_t JudyL; + Word_t count; +}; + // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { @@ -1427,11 +1464,33 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) internal_error(true, "METADATA: checking all hosts..."); usec_t started_ut = now_monotonic_usec(); (void)started_ut; + struct host_chart_label_cleanup *cl_cleanup_data = data->data; + + if (cl_cleanup_data) { + Word_t Index = 0; + bool first = true; + Pvoid_t *PValue; + while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) { + char *machine_guid = *PValue; + + host = rrdhost_find_by_guid(machine_guid); + if (likely(!host)) { + uuid_t host_uuid; + if (!uuid_parse(machine_guid, host_uuid)) + delete_host_chart_labels(&host_uuid); + } + + freez(machine_guid); + } + JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0); + freez(cl_cleanup_data); + } + bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); if (!data->max_count) - transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;"); + transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION"); dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) @@ -1501,7 +1560,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) dfe_done(host); if (!data->max_count && transaction_started) - transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;"); + transaction_started = db_execute(db_meta, "COMMIT TRANSACTION"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", @@ -1516,42 +1575,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) worker_is_idle(); } -// Callback after scan of hosts is done -static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused) -{ - struct ml_model_payload *ml_data = req->data; - struct metadata_wc *wc = ml_data->wc; - metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); - JudyLFreeArray(&ml_data->JudyL, PJE0); - freez(ml_data); -} - -static void start_ml_model_load(uv_work_t *req __maybe_unused) -{ - register_libuv_worker_jobs(); - - struct ml_model_payload *ml_data = req->data; - - worker_is_busy(UV_EVENT_METADATA_ML_LOAD); - - Pvoid_t *PValue; - Word_t Index = 0; - bool first = true; - RRDDIM *rd; - RRDDIM_ACQUIRED *rda; - internal_error(true, "Batch ML load loader, %zu items", ml_data->count); - while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) { - UNUSED(PValue); - rda = (RRDDIM_ACQUIRED *) Index; - rd = rrddim_acquired_to_rrddim(rda); - ml_dimension_load_models(rd); - rrddim_acquired_release(rda); - } - worker_is_idle(); -} - - - static void metadata_event_loop(void *arg) { worker_register("METASYNC"); @@ -1561,7 +1584,6 @@ static void metadata_event_loop(void *arg) worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info"); worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); - worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models"); int ret; uv_loop_t *loop; @@ -1593,7 +1615,7 @@ static void metadata_event_loop(void *arg) wc->timer_req.data = wc; fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS)); - netdata_log_info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread"); struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -1602,11 +1624,11 @@ static void metadata_event_loop(void *arg) wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; int shutdown = 0; - completion_mark_complete(&wc->init_complete); + completion_mark_complete(&wc->start_stop_complete); BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); struct scan_metadata_payload *data; + struct host_chart_label_cleanup *cl_cleanup_data = NULL; - struct ml_model_payload *ml_data = NULL; while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) { uuid_t *uuid; RRDHOST *host = NULL; @@ -1633,43 +1655,10 @@ static void metadata_event_loop(void *arg) if (likely(opcode != METADATA_DATABASE_NOOP)) worker_is_busy(opcode); - // Have pending ML models to load? - if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) { - static usec_t ml_submit_last = 0; - usec_t now = now_monotonic_usec(); - if (!ml_submit_last) - ml_submit_last = now; - - if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) { - metadata_flag_set(wc, METADATA_FLAG_ML_LOADING); - if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load))) - metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); - else { - ml_submit_last = now; - ml_data = NULL; - } - } - } - switch (opcode) { case METADATA_DATABASE_NOOP: case METADATA_DATABASE_TIMER: break; - - case METADATA_ML_LOAD_MODELS: { - RRDDIM *rd = (RRDDIM *) cmd.param[0]; - RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd)); - if (likely(rda)) { - if (!ml_data) { - ml_data = callocz(1,sizeof(*ml_data)); - ml_data->request.data = ml_data; - ml_data->wc = wc; - } - JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0); - ml_data->count++; - } - break; - } case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; if (likely(dimension_can_be_deleted(uuid, NULL, false))) @@ -1695,7 +1684,9 @@ static void metadata_event_loop(void *arg) data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; + data->data = cl_cleanup_data; data->work_buffer = work_buffer; + cl_cleanup_data = NULL; if (unlikely(cmd.completion)) { data->max_count = 0; // 0 will process all pending updates @@ -1711,6 +1702,7 @@ static void metadata_event_loop(void *arg) after_metadata_hosts))) { // Failed to launch worker -- let the event loop handle completion cmd.completion = wc->scan_complete; + cl_cleanup_data = data->data; freez(data); metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); } @@ -1727,6 +1719,15 @@ static void metadata_event_loop(void *arg) after_start_host_load_context))) { freez(data); } + break; + case METADATA_DELETE_HOST_CHART_LABELS:; + if (!cl_cleanup_data) + cl_cleanup_data = callocz(1,sizeof(*cl_cleanup_data)); + + Pvoid_t *PValue = JudyLIns(&cl_cleanup_data->JudyL, (Word_t) ++cl_cleanup_data->count, PJE0); + if (PValue) + *PValue = (void *) cmd.param[0]; + break; case METADATA_UNITTEST:; struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0]; @@ -1755,10 +1756,12 @@ static void metadata_event_loop(void *arg) freez(loop); worker_unregister(); - netdata_log_info("METADATA: Shutting down event loop"); - completion_mark_complete(&wc->init_complete); - completion_destroy(wc->scan_complete); - freez(wc->scan_complete); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread"); + completion_mark_complete(&wc->start_stop_complete); + if (wc->scan_complete) { + completion_destroy(wc->scan_complete); + freez(wc->scan_complete); + } metadata_free_cmd_queue(wc); return; @@ -1771,23 +1774,21 @@ error_after_loop_init: worker_unregister(); } -struct metadata_wc metasync_worker = {.loop = NULL}; - void metadata_sync_shutdown(void) { - completion_init(&metasync_worker.init_complete); + completion_init(&metasync_worker.start_stop_complete); struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - netdata_log_info("METADATA: Sending a shutdown command"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a shutdown command"); cmd.opcode = METADATA_SYNC_SHUTDOWN; metadata_enq_cmd(&metasync_worker, &cmd); /* wait for metadata thread to shut down */ - netdata_log_info("METADATA: Waiting for shutdown ACK"); - completion_wait_for(&metasync_worker.init_complete); - completion_destroy(&metasync_worker.init_complete); - netdata_log_info("METADATA: Shutdown complete"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for shutdown ACK"); + completion_wait_for(&metasync_worker.start_stop_complete); + completion_destroy(&metasync_worker.start_stop_complete); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Shutdown complete"); } void metadata_sync_shutdown_prepare(void) @@ -1804,20 +1805,20 @@ void metadata_sync_shutdown_prepare(void) completion_init(compl); __atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED); - netdata_log_info("METADATA: Sending a scan host command"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a scan host command"); uint32_t max_wait_iterations = 2000; while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) { if (max_wait_iterations == 1999) - netdata_log_info("METADATA: Current worker is running; waiting to finish"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Current worker is running; waiting to finish"); sleep_usec(1000); } cmd.opcode = METADATA_SCAN_HOSTS; metadata_enq_cmd(&metasync_worker, &cmd); - netdata_log_info("METADATA: Waiting for host scan completion"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for host scan completion"); completion_wait_for(wc->scan_complete); - netdata_log_info("METADATA: Host scan complete; can continue with shutdown"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Host scan complete; can continue with shutdown"); } // ------------------------------------------------------------- @@ -1828,15 +1829,14 @@ void metadata_sync_init(void) struct metadata_wc *wc = &metasync_worker; memset(wc, 0, sizeof(*wc)); - metadata_init_cmd_queue(wc); - completion_init(&wc->init_complete); + completion_init(&wc->start_stop_complete); fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc)); - completion_wait_for(&wc->init_complete); - completion_destroy(&wc->init_complete); + completion_wait_for(&wc->start_stop_complete); + completion_destroy(&wc->start_stop_complete); - netdata_log_info("SQLite metadata sync initialization complete"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete"); } @@ -1887,9 +1887,7 @@ void metaqueue_host_update_info(RRDHOST *host) void metaqueue_ml_load_models(RRDDIM *rd) { - if (unlikely(!metasync_worker.loop)) - return; - queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL); + rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD); } void metadata_queue_load_host_context(RRDHOST *host) @@ -1897,8 +1895,22 @@ void metadata_queue_load_host_context(RRDHOST *host) if (unlikely(!metasync_worker.loop)) return; queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts"); } +void metadata_delete_host_chart_labels(char *machine_guid) +{ + if (unlikely(!metasync_worker.loop)) { + freez(machine_guid); + return; + } + + // Node machine guid is already strdup-ed + queue_metadata_cmd(METADATA_DELETE_HOST_CHART_LABELS, machine_guid, NULL); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command delete chart labels for host %s", machine_guid); +} + + // // unitests // @@ -1946,7 +1958,7 @@ static void *metadata_unittest_threads(void) tu.join = 0; for (int i = 0; i < threads_to_create; i++) { char buf[100 + 1]; - snprintf(buf, 100, "META[%d]", i); + snprintf(buf, sizeof(buf) - 1, "META[%d]", i); netdata_thread_create( &threads[i], buf, diff --git a/database/sqlite/sqlite_metadata.h b/database/sqlite/sqlite_metadata.h index f75a9ab00..6860cfedf 100644 --- a/database/sqlite/sqlite_metadata.h +++ b/database/sqlite/sqlite_metadata.h @@ -17,6 +17,7 @@ void metaqueue_host_update_info(RRDHOST *host); void metaqueue_ml_load_models(RRDDIM *rd); void migrate_localhost(uuid_t *host_uuid); void metadata_queue_load_host_context(RRDHOST *host); +void metadata_delete_host_chart_labels(char *machine_guid); void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc); // UNIT TEST -- cgit v1.2.3