diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /database/sqlite | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 908 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 125 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 719 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 26 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 100 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.h | 3 | ||||
-rw-r--r-- | database/sqlite/sqlite_context.c | 24 | ||||
-rw-r--r-- | database/sqlite/sqlite_db_migration.c | 4 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 92 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 3 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.c | 163 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 549 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.h | 2 |
13 files changed, 1329 insertions, 1389 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 3b0c40522..a33e09f5d 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -5,58 +5,176 @@ #include "sqlite_aclk_node.h" +struct aclk_sync_config_s { + uv_thread_t thread; + uv_loop_t loop; + uv_timer_t timer_req; + time_t cleanup_after; // Start a cleanup after this timestamp + uv_async_t async; + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + bool initialized; + volatile unsigned queue_size; + struct aclk_database_cmdqueue cmd_queue; +} aclk_sync_config = { 0 }; + + void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } -static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) + +int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) { - struct aclk_database_worker_config *wc = data; - UNUSED(argc); - UNUSED(column); + unsigned queue_size; - debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.data = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(wc, &cmd); + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return 1; + } + + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); return 0; } -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" - -static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) +static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { - char *err_msg = NULL; - debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); } + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + + /* wake up event loop */ + int rc = uv_async_send(&aclk_sync_config.async); + if (unlikely(rc)) + debug(D_ACLK_SYNC, "Failed to wake up event loop"); } -static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +enum { + IDX_HOST_ID, + IDX_HOSTNAME, + IDX_REGISTRY, + IDX_UPDATE_EVERY, + IDX_OS, + IDX_TIMEZONE, + IDX_TAGS, + IDX_HOPS, + IDX_MEMORY_MODE, + IDX_ABBREV_TIMEZONE, + IDX_UTC_OFFSET, + IDX_PROGRAM_NAME, + IDX_PROGRAM_VERSION, + IDX_ENTRIES, + IDX_HEALTH_ENABLED, +}; + +static int create_host_callback(void *data, int argc, char **argv, char **column) { - UNUSED(cmd); + int *number_of_chidren = data; + UNUSED(argc); + UNUSED(column); - debug(D_ACLK, "Checking database for %s", wc->host_guid); + char guid[UUID_STR_LEN]; + uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); - db_execute(buffer_tostring(sql)); + system_info->hops = str2i((const char *) argv[IDX_HOPS]); - buffer_free(sql); + sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); + + RRDHOST *host = rrdhost_find_or_create( + (const char *) argv[IDX_HOSTNAME] + , (const char *) argv[IDX_REGISTRY] + , guid + , (const char *) argv[IDX_OS] + , (const char *) argv[IDX_TIMEZONE] + , (const char *) argv[IDX_ABBREV_TIMEZONE] + , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0) + , (const char *) argv[IDX_TAGS] + , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") + , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") + , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 + , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0 + , default_rrd_memory_mode + , 0 // health + , 0 // rrdpush enabled + , NULL //destination + , NULL // api key + , NULL // send charts matching + , false // rrdpush_enable_replication + , 0 // rrdpush_seconds_to_replicate + , 0 // rrdpush_replication_step + , system_info + , 1 + ); + if (likely(host)) + host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + + (*number_of_chidren)++; + +#ifdef NETDATA_INTERNAL_CHECKS + char node_str[UUID_STR_LEN] = "<none>"; + if (likely(host->node_id)) + uuid_unparse_lower(*host->node_id, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); +#endif + return 0; } +#ifdef ENABLE_ACLK +static struct aclk_database_cmd aclk_database_deq_cmd(void) +{ + struct aclk_database_cmd ret; + unsigned queue_size; -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + queue_size = aclk_sync_config.queue_size; + if (queue_size == 0) { + memset(&ret, 0, sizeof(ret)); + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; + + } else { + /* dequeue command */ + ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; + if (queue_size == 1) { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; + } else { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.head + 1 : 0; + } + aclk_sync_config.queue_size = queue_size - 1; + /* wake up producers */ + uv_cond_signal(&aclk_sync_config.cmd_cond); + } + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return ret; +} + +#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" static int is_host_available(uuid_t *host_id) { sqlite3_stmt *res = NULL; @@ -76,7 +194,7 @@ static int is_host_available(uuid_t *host_id) rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); + error_report("Failed to bind host_id parameter to check host existence"); goto failed; } rc = sqlite3_step_monitored(res); @@ -89,15 +207,13 @@ failed: } // OPCODE: ACLK_DATABASE_DELETE_HOST -void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +static void sql_delete_aclk_table_list(char *host_guid) { - UNUSED(wc); - char uuid_str[GUID_LEN + 1]; - char host_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_str[UUID_STR_LEN]; int rc; uuid_t host_uuid; - char *host_guid = (char *)cmd.data; if (unlikely(!host_guid)) return; @@ -139,273 +255,67 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); - db_execute(buffer_tostring(sql)); + rc = db_execute(db_meta, buffer_tostring(sql)); + if (unlikely(rc)) + error("Failed to drop unused ACLK tables"); fail: buffer_free(sql); } -uv_mutex_t aclk_async_lock; -struct aclk_database_worker_config *aclk_thread_head = NULL; - -int claimed() +static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) { - int rc; - rrdhost_aclk_state_lock(localhost); - rc = (localhost->aclk_state.claimed_id != NULL); - rrdhost_aclk_state_unlock(localhost); - return rc; -} - -void aclk_add_worker_thread(struct aclk_database_worker_config *wc) -{ - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - if (unlikely(!wc->host)) { - wc->next = aclk_thread_head; - aclk_thread_head = wc; - } - uv_mutex_unlock(&aclk_async_lock); + debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DELETE_HOST; + cmd.param[0] = strdupz((char *) argv[0]); + aclk_database_enq_cmd_noblock(&cmd); + return 0; } -void aclk_del_worker_thread(struct aclk_database_worker_config *wc) -{ - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config **tmp = &aclk_thread_head; - while (*tmp && (*tmp) != wc) - tmp = &(*tmp)->next; - if (*tmp) - *tmp = wc->next; - uv_mutex_unlock(&aclk_async_lock); -} +#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" -int aclk_worker_thread_exists(char *guid) +static void sql_check_aclk_table_list(void) { - int rc = 0; - uv_mutex_lock(&aclk_async_lock); - - struct aclk_database_worker_config *tmp = aclk_thread_head; - - while (tmp && !rc) { - rc = strcmp(tmp->uuid_str, guid) == 0; - tmp = tmp->next; + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); } - uv_mutex_unlock(&aclk_async_lock); - return rc; } -void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc) -{ - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); -} +#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;" -int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) +static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return 1; - } - - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); + char sql[512]; + snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); + if (unlikely(db_execute(db_meta, sql))) + error_report("Failed to clean stale ACLK alert entries"); return 0; } -void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) -{ - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if (wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return; - } - while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); - - /* wake up event loop */ - int rc = uv_async_send(&wc->async); - if (unlikely(rc)) - debug(D_ACLK_SYNC, "Failed to wake up event loop"); -} - -struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0 || wc->is_shutting_down) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - if (wc->is_shutting_down) - uv_cond_signal(&wc->cmd_cond); - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); - } - uv_mutex_unlock(&wc->cmd_mutex); - - return ret; -} +#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" -struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id) +static void sql_maint_aclk_sync_database_all(void) { - if (unlikely(!node_id)) - return NULL; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config *wc = aclk_thread_head; - - while (wc) { - if (!strcmp(wc->node_id, node_id)) - break; - wc = wc->next; + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); } - uv_mutex_unlock(&aclk_async_lock); - - return (wc); } -void aclk_sync_exit_all() -{ - rrd_rdlock(); - RRDHOST *host; - rrdhost_foreach_read(host) { - struct aclk_database_worker_config *wc = host->dbsync_worker; - if (wc) { - wc->is_shutting_down = 1; - (void) aclk_database_deq_cmd(wc); - uv_cond_signal(&wc->cmd_cond); - } - } - rrd_unlock(); - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config *wc = aclk_thread_head; - while (wc) { - wc->is_shutting_down = 1; - wc = wc->next; - } - uv_mutex_unlock(&aclk_async_lock); -} - -enum { - IDX_HOST_ID, - IDX_HOSTNAME, - IDX_REGISTRY, - IDX_UPDATE_EVERY, - IDX_OS, - IDX_TIMEZONE, - IDX_TAGS, - IDX_HOPS, - IDX_MEMORY_MODE, - IDX_ABBREV_TIMEZONE, - IDX_UTC_OFFSET, - IDX_PROGRAM_NAME, - IDX_PROGRAM_VERSION, - IDX_ENTRIES, - IDX_HEALTH_ENABLED, -}; - -static int create_host_callback(void *data, int argc, char **argv, char **column) -{ - UNUSED(data); - UNUSED(argc); - UNUSED(column); - - char guid[UUID_STR_LEN]; - uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); - - struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); - __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); - - system_info->hops = str2i((const char *) argv[IDX_HOPS]); - - sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); - - RRDHOST *host = rrdhost_find_or_create( - (const char *) argv[IDX_HOSTNAME] - , (const char *) argv[IDX_REGISTRY] - , guid - , (const char *) argv[IDX_OS] - , (const char *) argv[IDX_TIMEZONE] - , (const char *) argv[IDX_ABBREV_TIMEZONE] - , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0 - , (const char *) argv[IDX_TAGS] - , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") - , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") - , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 - , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0 - , default_rrd_memory_mode - , 0 // health - , 0 // rrdpush enabled - , NULL //destination - , NULL // api key - , NULL // send charts matching - , false // rrdpush_enable_replication - , 0 // rrdpush_seconds_to_replicate - , 0 // rrdpush_replication_step - , system_info - , 1 - ); - if (likely(host)) - host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); - -#ifdef NETDATA_INTERNAL_CHECKS - char node_str[UUID_STR_LEN] = "<none>"; - if (likely(host->node_id)) - uuid_unparse_lower(*host->node_id, node_str); - internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); -#endif - return 0; -} - -#ifdef ENABLE_ACLK -static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) +static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { char uuid_str[GUID_LEN + 1]; - UNUSED(data); - UNUSED(argc); - UNUSED(column); - uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); RRDHOST *host = rrdhost_find_by_guid(uuid_str); @@ -415,156 +325,81 @@ static int aclk_start_sync_thread(void *data, int argc, char **argv, char **colu sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]); return 0; } -#endif -void sql_aclk_sync_init(void) -{ - char *err_msg = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { - return; - } - error_report("Database has not been initialized"); - return; - } - - info("Creating archived hosts"); - rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " - "program_version, entries, health_enabled FROM host WHERE hops >0;", - create_host_callback, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); - sqlite3_free(err_msg); - } - -#ifdef ENABLE_ACLK - fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); - rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " - "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); - sqlite3_free(err_msg); - } - info("ACLK sync initialization completed"); -#endif -} static void async_cb(uv_async_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); - debug(D_ACLK_SYNC, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } #define TIMER_PERIOD_MS (1000) -static void timer_cb(uv_timer_t* handle) +static void timer_cb(uv_timer_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); -#ifdef ENABLE_ACLK - struct aclk_database_worker_config *wc = handle->data; + struct aclk_sync_config_s *config = handle->data; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_TIMER; - aclk_database_enq_cmd_noblock(wc, &cmd); time_t now = now_realtime_sec(); - if (wc->cleanup_after && wc->cleanup_after < now) { + if (config->cleanup_after && config->cleanup_after < now) { cmd.opcode = ACLK_DATABASE_CLEANUP; - if (!aclk_database_enq_cmd_noblock(wc, &cmd)) - wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; + if (!aclk_database_enq_cmd_noblock(&cmd)) + config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } if (aclk_connected) { - if (wc->alert_updates && !wc->pause_alert_updates) { - cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - cmd.count = ACLK_MAX_ALERT_UPDATES; - aclk_database_enq_cmd_noblock(wc, &cmd); - } + cmd.opcode = ACLK_DATABASE_PUSH_ALERT; + aclk_database_enq_cmd_noblock(&cmd); + + aclk_check_node_info_and_collectors(); } -#endif } -static void aclk_database_worker(void *arg) +static void aclk_synchronization(void *arg __maybe_unused) { - service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + struct aclk_sync_config_s *config = arg; + uv_thread_set_name_np(config->thread, "ACLKSYNC"); worker_register("ACLKSYNC"); + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); - worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); - worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info"); - worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors"); + worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); - struct aclk_database_worker_config *wc = arg; - uv_loop_t *loop; - int ret; - enum aclk_database_opcode opcode; - uv_timer_t timer_req; - struct aclk_database_cmd cmd; + uv_loop_t *loop = &config->loop; + fatal_assert(0 == uv_loop_init(loop)); + fatal_assert(0 == uv_async_init(loop, &config->async, async_cb)); - char threadname[NETDATA_THREAD_NAME_MAX+1]; - if (wc->host) - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); - else { - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str); - threadname[11] = '\0'; - } - uv_thread_set_name_np(wc->thread, threadname); - - loop = wc->loop = mallocz(sizeof(uv_loop_t)); - ret = uv_loop_init(loop); - if (ret) { - error("uv_loop_init(): %s", uv_strerror(ret)); - goto error_after_loop_init; - } - loop->data = wc; + fatal_assert(0 == uv_timer_init(loop, &config->timer_req)); + config->timer_req.data = config; + fatal_assert(0 == uv_timer_start(&config->timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - ret = uv_async_init(wc->loop, &wc->async, async_cb); - if (ret) { - error("uv_async_init(): %s", uv_strerror(ret)); - goto error_after_async_init; - } - wc->async.data = wc; - - ret = uv_timer_init(loop, &timer_req); - if (ret) { - error("uv_timer_init(): %s", uv_strerror(ret)); - goto error_after_timer_init; - } - timer_req.data = wc; - fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - - wc->node_info_send = 1; - info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, (unsigned long int) sizeof(*wc)); - - memset(&cmd, 0, sizeof(cmd)); + info("Starting ACLK synchronization thread"); - wc->startup_time = now_realtime_sec(); - wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; + config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST; + config->initialized = true; - debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); - - while (likely(!netdata_exit)) { + while (likely(service_running(SERVICE_ACLKSYNC))) { + enum aclk_database_opcode opcode; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ do { - cmd = aclk_database_deq_cmd(wc); + struct aclk_database_cmd cmd = aclk_database_deq_cmd(); - if (netdata_exit) + if (unlikely(!service_running(SERVICE_ACLKSYNC))) break; opcode = cmd.opcode; @@ -576,201 +411,216 @@ static void aclk_database_worker(void *arg) case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ break; - // MAINTENANCE case ACLK_DATABASE_CLEANUP: - debug(D_ACLK_SYNC, "Database cleanup for %s", wc->host_guid); - - if (wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST + 2 < now_realtime_sec() && claimed() && aclk_connected) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - cmd.completion = NULL; - (void) aclk_database_enq_cmd_noblock(wc, &cmd); - } - - sql_maint_aclk_sync_database(wc, cmd); - if (wc->host == localhost) - sql_check_aclk_table_list(wc); + // Scan all aclk_alert_ tables and cleanup as needed + sql_maint_aclk_sync_database_all(); + sql_check_aclk_table_list(); break; case ACLK_DATABASE_DELETE_HOST: - debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data); - sql_delete_aclk_table_list(wc, cmd); + sql_delete_aclk_table_list(cmd.param[0]); + break; +// NODE STATE + case ACLK_DATABASE_NODE_STATE:; + RRDHOST *host = cmd.param[0]; + int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; + struct aclk_sync_host_config *ahc = host->aclk_sync_host_config; + if (unlikely(!ahc)) + sql_create_aclk_table(host, &host->host_uuid, host->node_id); + aclk_host_state_update(host, live); break; - // ALERTS case ACLK_DATABASE_PUSH_ALERT_CONFIG: - debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid); - aclk_push_alert_config_event(wc, cmd); + aclk_push_alert_config_event(cmd.param[0], cmd.param[1]); break; case ACLK_DATABASE_PUSH_ALERT: - debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid); - aclk_push_alert_event(wc, cmd); - break; - case ACLK_DATABASE_ALARM_HEALTH_LOG: - debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid); - aclk_push_alarm_health_log(wc, cmd); + aclk_push_alert_events_for_all_hosts(); break; - case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT: - debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid); - aclk_push_alert_snapshot_event(wc, cmd); + case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; + aclk_push_alert_snapshot_event(cmd.param[0]); break; case ACLK_DATABASE_QUEUE_REMOVED_ALERTS: - debug(D_ACLK_SYNC, "Queueing removed alerts for node %s", wc->host_guid); - sql_process_queue_removed_alerts_to_aclk(wc, cmd); - break; - -// NODE OPERATIONS - case ACLK_DATABASE_NODE_INFO: - debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str); - sql_build_node_info(wc, cmd); - break; - case ACLK_DATABASE_NODE_COLLECTORS: - debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str); - sql_build_node_collectors(wc); - break; -#ifdef ENABLE_ACLK - -// NODE_INSTANCE DETECTION - case ACLK_DATABASE_ORPHAN_HOST: - wc->host = NULL; - wc->is_orphan = 1; - aclk_add_worker_thread(wc); - break; -#endif - case ACLK_DATABASE_TIMER: - if (unlikely(localhost && !wc->host && !wc->is_orphan)) { - if (claimed()) { - wc->host = rrdhost_find_by_guid(wc->host_guid); - if (wc->host) { - info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid); - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); - uv_thread_set_name_np(wc->thread, threadname); - wc->host->dbsync_worker = wc; - if (unlikely(!wc->hostname)) - wc->hostname = strdupz(rrdhost_hostname(wc->host)); - aclk_del_worker_thread(wc); - wc->node_info_send = 1; - } - } - } - if (wc->node_info_send && localhost && claimed() && aclk_connected) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - cmd.completion = NULL; - wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); - } - if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { - cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS; - cmd.completion = NULL; - wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd); - } - if (localhost == wc->host) - (void) sqlite3_wal_checkpoint(db_meta, NULL); + sql_process_queue_removed_alerts_to_aclk(cmd.param[0]); break; default: debug(D_ACLK_SYNC, "%s: default.", __func__); break; } if (cmd.completion) - aclk_complete(cmd.completion); + completion_mark_complete(cmd.completion); } while (opcode != ACLK_DATABASE_NOOP); } - if (!uv_timer_stop(&timer_req)) - uv_close((uv_handle_t *)&timer_req, NULL); - - /* cleanup operations of the event loop */ - //info("Shutting down ACLK sync event loop for %s", wc->host_guid); + if (!uv_timer_stop(&config->timer_req)) + uv_close((uv_handle_t *)&config->timer_req, NULL); - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&wc->async, NULL); - uv_run(loop, UV_RUN_DEFAULT); + uv_close((uv_handle_t *)&config->async, NULL); +// uv_close((uv_handle_t *)&config->async_exit, NULL); + uv_cond_destroy(&config->cmd_cond); + (void) uv_loop_close(loop); - info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid); - /* TODO: don't let the API block by waiting to enqueue commands */ - uv_cond_destroy(&wc->cmd_cond); - - int rc; - do { - rc = uv_loop_close(loop); - } while (rc != UV_EBUSY); - - freez(loop); + worker_unregister(); + service_exits(); + info("ACLK SYNC: Shutting down ACLK synchronization event loop"); +} - rrd_rdlock(); - if (likely(wc->host)) - wc->host->dbsync_worker = NULL; - freez(wc->hostname); - freez(wc); - rrd_unlock(); +static void aclk_synchronization_init(void) +{ + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; + aclk_sync_config.queue_size = 0; + fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond)); + fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex)); - worker_unregister(); - return; - -error_after_timer_init: - uv_close((uv_handle_t *)&wc->async, NULL); -error_after_async_init: - fatal_assert(0 == uv_loop_close(loop)); -error_after_loop_init: - freez(loop); - worker_unregister(); + fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } +#endif // ------------------------------------------------------------- -void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) +void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK char uuid_str[GUID_LEN + 1]; char host_guid[GUID_LEN + 1]; + int rc; uuid_unparse_lower_fix(host_uuid, uuid_str); - - if (aclk_worker_thread_exists(uuid_str)) - return; - uuid_unparse_lower(*host_uuid, host_guid); - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + char sql[ACLK_SYNC_QUERY_SIZE]; - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, INDEX_ACLK_ALERT, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, TABLE_ACLK_ALERT, uuid_str); + rc = db_execute(db_meta, sql); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid); + else { + snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT, uuid_str, uuid_str); + rc = db_execute(db_meta, sql); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table index for host %s", host ? string2str(host->hostname) : host_guid); + } + if (likely(host) && unlikely(host->aclk_sync_host_config)) + return; - if (likely(host) && unlikely(host->dbsync_worker)) + if (unlikely(!host)) return; - struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config)); + struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config)); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); - if (likely(host)) { - host->dbsync_worker = (void *)wc; - wc->hostname = strdupz(rrdhost_hostname(host)); - if (node_id && !host->node_id) { - host->node_id = mallocz(sizeof(*host->node_id)); - uuid_copy(*host->node_id, *node_id); - } + + host->aclk_sync_host_config = (void *)wc; + if (node_id && !host->node_id) { + host->node_id = mallocz(sizeof(*host->node_id)); + uuid_copy(*host->node_id, *node_id); } - else - wc->hostname = get_hostname_by_node_id(wc->node_id); + wc->host = host; strcpy(wc->uuid_str, uuid_str); - strcpy(wc->host_guid, host_guid); wc->alert_updates = 0; - aclk_database_init_cmd_queue(wc); - aclk_add_worker_thread(wc); - fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc)); -#else - UNUSED(host); - UNUSED(host_uuid); - UNUSED(node_id); + time_t now = now_realtime_sec(); + wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; #endif -}
\ No newline at end of file +} + +#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \ + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ + "program_version, entries, health_enabled FROM host WHERE hops >0;" + +#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ + "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " +void sql_aclk_sync_init(void) +{ + char *err_msg = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { + return; + } + error_report("Database has not been initialized"); + return; + } + + info("Creating archived hosts"); + int number_of_children = 0; + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_HOSTS, create_host_callback, &number_of_children, &err_msg); + + if (rc != SQLITE_OK) { + error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + + info("Created %d archived hosts", number_of_children); + // Trigger host context load for hosts that have been created + metadata_queue_load_host_context(NULL); + +#ifdef ENABLE_ACLK + if (!number_of_children) + aclk_queue_node_info(localhost, true); + + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg); + + if (rc != SQLITE_OK) { + error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + aclk_synchronization_init(); + + info("ACLK sync initialization completed"); +#endif +} + +// Public + +static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1) +{ + struct aclk_database_cmd cmd; + cmd.opcode = opcode; + cmd.param[0] = (void *) param0; + cmd.param[1] = (void *) param1; + cmd.completion = NULL; + aclk_database_enq_cmd(&cmd); +} + +// Public +void aclk_push_alert_config(const char *node_id, const char *config_hash) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash)); +} + +void aclk_push_node_alert_snapshot(const char *node_id) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL); +} + + +void aclk_push_node_removed_alerts(const char *node_id) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL); +} + +void schedule_node_info_update(RRDHOST *host __maybe_unused) +{ +#ifdef ENABLE_ACLK + if (unlikely(!host)) + return; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_NODE_STATE; + cmd.param[0] = host; + cmd.completion = NULL; + aclk_database_enq_cmd(&cmd); +#endif +} diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 208177e45..d555a0cef 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -18,45 +18,6 @@ #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400) #define ACLK_SYNC_QUERY_SIZE 512 -struct aclk_completion { - uv_mutex_t mutex; - uv_cond_t cond; - volatile unsigned completed; -}; - -static inline void init_aclk_completion(struct aclk_completion *p) -{ - p->completed = 0; - fatal_assert(0 == uv_cond_init(&p->cond)); - fatal_assert(0 == uv_mutex_init(&p->mutex)); -} - -static inline void destroy_aclk_completion(struct aclk_completion *p) -{ - uv_cond_destroy(&p->cond); - uv_mutex_destroy(&p->mutex); -} - -static inline void wait_for_aclk_completion(struct aclk_completion *p) -{ - uv_mutex_lock(&p->mutex); - while (0 == p->completed) { - uv_cond_wait(&p->cond, &p->mutex); - } - fatal_assert(1 == p->completed); - uv_mutex_unlock(&p->mutex); -} - -static inline void aclk_complete(struct aclk_completion *p) -{ - uv_mutex_lock(&p->mutex); - p->completed = 1; - uv_mutex_unlock(&p->mutex); - uv_cond_broadcast(&p->cond); -} - -extern uv_mutex_t aclk_async_lock; - static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) { uuid_unparse_lower(*uuid, out); @@ -66,6 +27,12 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) out[23] = '_'; } +static inline int claimed() +{ + return localhost->aclk_state.claimed_id != NULL; +} + + #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ "unique(alert_unique_id));" @@ -74,16 +41,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, - ACLK_DATABASE_ORPHAN_HOST, - ACLK_DATABASE_ALARM_HEALTH_LOG, ACLK_DATABASE_CLEANUP, ACLK_DATABASE_DELETE_HOST, - ACLK_DATABASE_NODE_INFO, + ACLK_DATABASE_NODE_STATE, ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT_CONFIG, ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, + ACLK_DATABASE_PUSH_ALERT_CHECKPOINT, ACLK_DATABASE_QUEUE_REMOVED_ALERTS, - ACLK_DATABASE_NODE_COLLECTORS, ACLK_DATABASE_TIMER, // leave this last @@ -93,10 +58,8 @@ enum aclk_database_opcode { struct aclk_database_cmd { enum aclk_database_opcode opcode; - void *data; - void *data_param; - int count; - struct aclk_completion *completion; + void *param[2]; + struct completion *completion; }; #define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024) @@ -106,67 +69,27 @@ struct aclk_database_cmdqueue { struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE]; }; -struct aclk_database_worker_config { - uv_thread_t thread; - char uuid_str[GUID_LEN + 1]; - char node_id[GUID_LEN + 1]; - char host_guid[GUID_LEN + 1]; - char *hostname; // hostname to avoid constant lookups - time_t cleanup_after; // Start a cleanup after this timestamp - time_t startup_time; // When the sync thread started - uint64_t alerts_batch_id; // batch id for alerts to use - uint64_t alerts_start_seq_id; // cloud has asked to start streaming from - uint64_t alert_sequence_id; // last alert sequence_id - int pause_alert_updates; - uint32_t chart_payload_count; - uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested - uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message - uv_loop_t *loop; +struct aclk_sync_host_config { RRDHOST *host; - uv_async_t async; - /* FIFO command queue */ - uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; - volatile unsigned queue_size; - struct aclk_database_cmdqueue cmd_queue; int alert_updates; - int node_info_send; + int alert_checkpoint_req; + int alert_queue_removed; + time_t node_info_send_time; time_t node_collectors_send; - volatile unsigned is_shutting_down; - volatile unsigned is_orphan; - struct aclk_database_worker_config *next; + char uuid_str[UUID_STR_LEN]; + char node_id[UUID_STR_LEN]; + char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested }; -static inline RRDHOST *find_host_by_node_id(char *node_id) -{ - uuid_t node_uuid; - if (unlikely(!node_id)) - return NULL; - - if (uuid_parse(node_id, node_uuid)) - return NULL; - - rrd_rdlock(); - RRDHOST *host, *ret = NULL; - rrdhost_foreach_read(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { - ret = host; - break; - } - } - rrd_unlock(); - - return ret; -} - - extern sqlite3 *db_meta; -int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); +int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd); void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); void sql_aclk_sync_init(void); -int claimed(); -void aclk_sync_exit_all(); -struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id); +void aclk_push_alert_config(const char *node_id, const char *config_hash); +void aclk_push_node_alert_snapshot(const char *node_id); +void aclk_push_node_health_log(const char *node_id); +void aclk_push_node_removed_alerts(const char *node_id); +void schedule_node_info_update(RRDHOST *host); + #endif //NETDATA_SQLITE_ACLK_H diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index ce284ebc3..62f1df29d 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -5,20 +5,20 @@ #ifdef ENABLE_ACLK #include "../../aclk/aclk_alarm_api.h" -#include "../../aclk/aclk.h" #endif +#define SQL_GET_ALERT_REMOVE_TIME "SELECT when_key FROM health_log_%s WHERE alarm_id = %u " \ + "AND unique_id > %u AND unique_id < %u " \ + "AND new_status = -2;" + time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) { sqlite3_stmt *res = NULL; - int rc = 0; time_t when = 0; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \ - "and unique_id > %u and unique_id < %u " \ - "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_GET_ALERT_REMOVE_TIME, uuid_str, alarm_id, after_unique_id, before_unique_id); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to find removed gap."); return 0; @@ -36,22 +36,26 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return when; } +#define SQL_UPDATE_FILTERED_ALERT "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u" + void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) { char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id); sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; } +#define SQL_SELECT_ALERT_BY_UNIQUE_ID "SELECT hl.unique_id FROM health_log_%s hl, alert_hash ah WHERE hl.unique_id = %u " \ + "AND hl.config_hash_id = ah.hash_id " \ + "AND ah.warn IS NULL AND ah.crit IS NULL;" + static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char *uuid_str) { sqlite3_stmt *res = NULL; int rc = 0; bool ret = false; char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.unique_id from health_log_%s hl, alert_hash ah where hl.unique_id = %u " \ - "and hl.config_hash_id = ah.hash_id " \ - "and ah.warn is null and ah.crit is null;", uuid_str, unique_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_UNIQUE_ID, uuid_str, unique_id); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -73,12 +77,18 @@ static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char #define MAX_REMOVED_PERIOD 86400 //decide if some events should be sent or not + +#define SQL_SELECT_ALERT_BY_ID "SELECT hl.new_status, hl.config_hash_id, hl.unique_id FROM health_log_%s hl, aclk_alert_%s aa " \ + "WHERE hl.unique_id = aa.filtered_alert_unique_id " \ + "AND hl.alarm_id = %u " \ + "ORDER BY alarm_event_id DESC LIMIT 1;" + int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - int send = 1, rc = 0; + int send = 1; if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) { return 0; @@ -87,9 +97,6 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) if (unlikely(uuid_is_null(ae->config_hash_id))) return 0; - if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) - return 0; - char sql[ACLK_SYNC_QUERY_SIZE]; uuid_t config_hash_id; RRDCALC_STATUS status; @@ -97,12 +104,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) //get the previous sent event of this alarm_id //base the search on the last filtered event - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ - where hl.unique_id = aa.filtered_alert_unique_id \ - and hl.alarm_id = %u \ - order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_ID, uuid_str, uuid_str, ae->alarm_id); - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to filter alert events."); send = 1; @@ -126,7 +130,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) goto done; } - if (uuid_compare(ae->config_hash_id, config_hash_id)) { + if (uuid_memcmp(&ae->config_hash_id, &config_hash_id)) { send = 1; goto done; } @@ -162,6 +166,10 @@ done: // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases + +#define SQL_QUEUE_ALERT_TO_CLOUD "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) ON CONFLICT (alert_unique_id) do nothing;" + int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) { if(!service_running(SERVICE_ACLK)) @@ -182,27 +190,24 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } } - int rc = 0; - sqlite3_stmt *res_alert = NULL; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + return 0; + + sqlite3_stmt *res_alert = NULL; + char sql[ACLK_SYNC_QUERY_SIZE]; - buffer_sprintf( - sql, - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " - "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ", - uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to prepare statement to store alert event"); - buffer_free(sql); return 1; } - rc = sqlite3_bind_int(res_alert, 1, ae->unique_id); + rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id); if (unlikely(rc != SQLITE_OK)) goto bind_fail; @@ -213,16 +218,12 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; - if (wc) { - wc->pause_alert_updates = 0; - } + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); bind_fail: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) error_report("Failed to reset statement in store alert event, rc = %d", rc); - buffer_free(sql); return 0; } @@ -254,11 +255,10 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) #endif } -void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_push_alert_event(struct aclk_sync_host_config *wc) { #ifndef ENABLE_ACLK UNUSED(wc); - UNUSED(cmd); #else int rc; @@ -278,26 +278,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - if (wc->alerts_start_seq_id != 0) { - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64 - "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_cloud_ack is null " - "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_submitted is null", - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id); - db_execute(buffer_tostring(sql)); - buffer_reset(sql); - wc->alerts_start_seq_id = 0; - } - - int limit = cmd.count > 0 ? cmd.count : 1; + int limit = ACLK_MAX_ALERT_UPDATES; sqlite3_stmt *res = NULL; @@ -318,10 +299,15 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str); - db_execute(buffer_tostring(sql_fix)); - buffer_flush(sql_fix); - buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); - db_execute(buffer_tostring(sql_fix)); + rc = db_execute(db_meta, buffer_tostring(sql_fix)); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); + else { + buffer_flush(sql_fix); + buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str); + if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix)))) + error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host)); + } buffer_free(sql_fix); // Try again @@ -353,8 +339,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11)); alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL; - alarm_log.batch_id = wc->alerts_batch_id; - alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + //alarm_log.batch_id = wc->alerts_batch_id; + //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); alarm_log.when = (time_t) sqlite3_column_int64(res, 5); uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str); @@ -429,19 +415,23 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", wc->uuid_str, first_sequence_id, last_sequence_id); - db_execute(buffer_tostring(sql)); + + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); + + // Mark to do one more check + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + } else { if (log_first_sequence_id) log_access( - "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, + "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", log_first_sequence_id, - log_last_sequence_id, - wc->alerts_batch_id); + log_last_sequence_id); log_first_sequence_id = 0; log_last_sequence_id = 0; - wc->pause_alert_updates = 1; } rc = sqlite3_finalize(res); @@ -451,8 +441,24 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d freez(claim_id); buffer_free(sql); #endif +} + +void aclk_push_alert_events_for_all_hosts(void) +{ + RRDHOST *host; + + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) + continue; - return; + internal_error(true, "ACLK SYNC: Scanning host %s", rrdhost_hostname(host)); + rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (likely(wc)) + aclk_push_alert_event(wc); + } + dfe_done(host); } void sql_queue_existing_alerts_to_aclk(RRDHOST *host) @@ -467,137 +473,15 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host) "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); - - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; - if (wc) { - wc->pause_alert_updates = 0; - } -} - -void aclk_send_alarm_health_log(char *node_id) -{ - if (unlikely(!node_id)) - return; - - struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); - - if (likely(!wc)) { - RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - } - - if (!wc) { - log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id); - return; - } - - log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A"); - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG; - - aclk_database_enq_cmd(wc, &cmd); - return; -} - -void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); -#ifndef ENABLE_ACLK - UNUSED(wc); -#else - int rc; - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - RRDHOST *host = wc->host; - if (unlikely(!host)) { - host = find_host_by_node_id(wc->node_id); - - if (unlikely(!host)) { - log_access( - "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.", - wc->node_id, - wc->host_guid); - freez(claim_id); - return; - } - } - - uint64_t first_sequence = 0; - uint64_t last_sequence = 0; - struct timeval first_timestamp; - struct timeval last_timestamp; - - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - - sqlite3_stmt *res = NULL; - - //TODO: make this better: include info from health log too - buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \ - "FROM aclk_alert_%s;", wc->uuid_str); - - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to get health log statistics from the database"); - buffer_free(sql); - freez(claim_id); - return; - } - - first_timestamp.tv_sec = 0; - first_timestamp.tv_usec = 0; - last_timestamp.tv_sec = 0; - last_timestamp.tv_usec = 0; - - while (sqlite3_step_monitored(res) == SQLITE_ROW) { - first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; - if (sqlite3_column_bytes(res, 1) > 0) { - first_timestamp.tv_sec = sqlite3_column_int64(res, 1); - } - - last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; - if (sqlite3_column_bytes(res, 3) > 0) { - last_timestamp.tv_sec = sqlite3_column_int64(res, 3); - } - } - - struct alarm_log_entries log_entries; - log_entries.first_seq_id = first_sequence; - log_entries.first_when = first_timestamp; - log_entries.last_seq_id = last_sequence; - log_entries.last_when = last_timestamp; - - struct alarm_log_health alarm_log; - alarm_log.claim_id = claim_id; - alarm_log.node_id = wc->node_id; - alarm_log.log_entries = log_entries; - alarm_log.status = wc->alert_updates == 0 ? 2 : 1; - alarm_log.enabled = (int)host->health.health_enabled; - - wc->alert_sequence_id = last_sequence; + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - aclk_send_alarm_log_health(&alarm_log); - log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence); + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host)); - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - freez(claim_id); buffer_free(sql); - - aclk_alert_reloaded = 1; -#endif - - return; + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } void aclk_send_alarm_configuration(char *config_hash) @@ -605,22 +489,14 @@ void aclk_send_alarm_configuration(char *config_hash) if (unlikely(!config_hash)) return; - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker; + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config; - if (unlikely(!wc)) { + if (unlikely(!wc)) return; - } log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG; - cmd.data_param = (void *) strdupz(config_hash); - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - - return; + aclk_push_alert_config(wc->node_id, config_hash); } #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ @@ -628,19 +504,31 @@ void aclk_send_alarm_configuration(char *config_hash) "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;" -int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { - UNUSED(wc); -#ifndef ENABLE_ACLK - UNUSED(cmd); -#else int rc = 0; +#ifdef ENABLE_ACLK + CHECK_SQLITE_CONNECTION(db_meta); sqlite3_stmt *res = NULL; - char *config_hash = (char *) cmd.data_param; + struct aclk_sync_host_config *wc = NULL; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) { + freez(config_hash); + freez(node_id); + return 1; + } + + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + freez(config_hash); + freez(node_id); + return 1; + } rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0); if (rc != SQLITE_OK) { @@ -723,7 +611,6 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct if (likely(p_alarm_config.cfg_hash)) { log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); - freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } @@ -735,150 +622,125 @@ bind_fail: if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc); - return rc; + freez(config_hash); + freez(node_id); #endif - return 0; + return rc; } // Start streaming alerts -void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id) +void aclk_start_alert_streaming(char *node_id, bool resets) { if (unlikely(!node_id)) return; - //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id); - uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) return; - struct aclk_database_worker_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) { - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - if (unlikely(!host->health.health_enabled)) { - log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); - return; - } + if (unlikely(!host)) + return; - if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) - sql_queue_existing_alerts_to_aclk(host); - } else - wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - - if (likely(wc)) { - log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id); - __sync_synchronize(); - wc->alerts_batch_id = batch_id; - wc->alerts_start_seq_id = start_seq_id; - wc->alert_updates = 1; - wc->pause_alert_updates = 0; - __sync_synchronize(); + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + + if (unlikely(!wc)) + return; + + if (unlikely(!host->health.health_enabled)) { + log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + return; } - else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); - return; + if (resets) { + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + sql_queue_existing_alerts_to_aclk(host); + } else + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + + wc->alert_updates = 1; + wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } -void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); +#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \ + "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \ + "(SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ + "ORDER BY unique_id ASC " \ + "ON CONFLICT (alert_unique_id) DO NOTHING;" - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); +void sql_process_queue_removed_alerts_to_aclk(char *node_id) +{ + struct aclk_sync_host_config *wc; + RRDHOST *host = find_host_by_node_id(node_id); + freez(node_id); - buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ - "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ - "where new_status = -2 and updated_by_id = 0 and unique_id not in " \ - "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \ - "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + if (unlikely(!host || !(wc = host->aclk_sync_host_config))) + return; - db_execute(buffer_tostring(sql)); + char sql[ACLK_SYNC_QUERY_SIZE * 2]; - log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str, wc->uuid_str); - buffer_free(sql); + if (unlikely(db_execute(db_meta, sql))) { + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS FAILED", wc->node_id, rrdhost_hostname(wc->host)); + error_report("Failed to queue ACLK alert removed entries for host %s", rrdhost_hostname(wc->host)); + } + else + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - wc->pause_alert_updates = 0; - return; + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) { - if (unlikely(!host->dbsync_worker)) + if (unlikely(!host->aclk_sync_host_config)) return; - if (!claimed()) + if (!claimed() || !host->node_id) return; - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS; - cmd.data = NULL; - cmd.data_param = NULL; - cmd.completion = NULL; - aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd); + char node_id[UUID_STR_LEN]; + uuid_unparse_lower(*host->node_id, node_id); + + aclk_push_node_removed_alerts(node_id); } -void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id) +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { - UNUSED(claim_id); - if (unlikely(!node_id)) - return; - uuid_t node_uuid; - if (uuid_parse(node_id, node_uuid)) + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; - struct aclk_database_worker_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - - if (likely(wc)) { - log_access( - "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A", - snapshot_id, - sequence_id); - if (wc->alerts_snapshot_id == snapshot_id) - return; - __sync_synchronize(); - wc->alerts_snapshot_id = snapshot_id; - wc->alerts_ack_sequence_id = sequence_id; - __sync_synchronize(); - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT; - cmd.data_param = NULL; - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - } else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); - - return; -} + if (unlikely(!host)) { + log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + return; + } -void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id) -{ - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (alerts_ack_sequence_id != 0) { - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "", - uuid_str, - alerts_ack_sequence_id); - db_execute(buffer_tostring(sql)); + if (unlikely(!wc)) { + log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); + return; } - buffer_free(sql); + log_access( + "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", + node_id, + wc->host ? rrdhost_hostname(wc->host) : "N/A", + snapshot_uuid); + if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) + return; + __sync_synchronize(); + wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); + __sync_synchronize(); + + aclk_push_node_alert_snapshot(node_id); } #ifdef ENABLE_ACLK @@ -949,37 +811,41 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) #endif #define ALARM_EVENTS_PER_CHUNK 10 -void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) { -#ifndef ENABLE_ACLK - UNUSED(wc); - UNUSED(cmd); -#else - UNUSED(cmd); - // we perhaps we don't need this for snapshots - if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); +#ifdef ENABLE_ACLK + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) { + log_access("AC [%s (N/A)]: Node id not found", node_id); + freez(node_id); return; } + freez(node_id); - if (unlikely(!wc->host)) { - error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + + // we perhaps we don't need this for snapshots + if (unlikely(!wc->alert_updates)) { + log_access( + "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", + wc->node_id, + wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } - if (unlikely(!wc->alerts_snapshot_id)) + if (unlikely(!wc->alerts_snapshot_uuid)) return; char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; - log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id); + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); - aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); - - RRDHOST *host = wc->host; uint32_t cnt = 0; + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -995,6 +861,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) continue; + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + continue; + cnt++; } @@ -1008,7 +877,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1024,6 +893,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) continue; + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + continue; + cnt++; struct alarm_log_entry alarm_log; @@ -1047,7 +919,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1061,73 +933,204 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru } netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - wc->alerts_snapshot_id = 0; + wc->alerts_snapshot_uuid = NULL; freez(claim_id); #endif - return; } +#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE filtered_alert_unique_id NOT IN (SELECT unique_id FROM health_log_%s);" + void sql_aclk_alert_clean_dead_entries(RRDHOST *host) { if (!claimed()) return; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + char sql[512]; + snprintfz(sql,511,SQL_DELETE_ALERT_ENTRIES, uuid_str, uuid_str); - buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in " - " (select unique_id from health_log_%s); ", uuid_str, uuid_str); - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg); if (rc != SQLITE_OK) { - error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""", - uuid_str, err_msg); + error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg); sqlite3_free(err_msg); } - buffer_free(sql); } +#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ + "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ + "FROM aclk_alert_%s WHERE date_submitted IS NULL;" + int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status) { int rc; - struct aclk_database_worker_config *wc = NULL; - wc = (struct aclk_database_worker_config *)host->dbsync_worker; + struct aclk_sync_host_config *wc = NULL; + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; if (!wc) return 1; proto_alert_status->alert_updates = wc->alert_updates; - proto_alert_status->alerts_batch_id = wc->alerts_batch_id; - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + char sql[ACLK_SYNC_QUERY_SIZE]; sqlite3_stmt *res = NULL; - buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \ - "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \ - "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \ - "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to get alert log status from the database."); - buffer_free(sql); return 1; } while (sqlite3_step_monitored(res) == SQLITE_ROW) { proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; - proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0; + proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; } rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc); - buffer_free(sql); return 0; } + +void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) +{ + if (unlikely(!node_id)) + return; + + struct aclk_sync_host_config *wc = NULL; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) + return; + + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); + return; + } + + log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); + + wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; +} + +typedef struct active_alerts { + char *name; + char *chart; + RRDCALC_STATUS status; +} active_alerts_t; + +static inline int compare_active_alerts(const void * a, const void * b) { + active_alerts_t *active_alerts_a = (active_alerts_t *)a; + active_alerts_t *active_alerts_b = (active_alerts_t *)b; + + if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) ) + { + return strcmp(active_alerts_a->chart, active_alerts_b->chart); + } + else + return strcmp(active_alerts_a->name, active_alerts_b->name); +} + +void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) +{ +#ifdef ENABLE_ACLK + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + return; + } + + //TODO: make sure all pending events are sent. + if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { + //postpone checkpoint send + wc->alert_checkpoint_req++; + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + return; + } + + //TODO: lock rc here, or make sure it's called when health decides + //count them + RRDCALC *rc; + uint32_t cnt = 0; + size_t len = 0; + active_alerts_t *active_alerts = NULL; + + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + + if (cnt) { + active_alerts = callocz(cnt, sizeof(active_alerts_t)); + cnt = 0; + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + active_alerts[cnt].name = (char *)rrdcalc_name(rc); + len += string_strlen(rc->name); + active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc); + len += string_strlen(rc->chart); + active_alerts[cnt].status = rc->status; + len++; + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + } + + BUFFER *alarms_to_hash; + if (cnt) { + qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); + + alarms_to_hash = buffer_create(len, NULL); + for (uint32_t i=0;i<cnt;i++) { + buffer_strcat(alarms_to_hash, active_alerts[i].name); + buffer_strcat(alarms_to_hash, active_alerts[i].chart); + if (active_alerts[i].status == RRDCALC_STATUS_WARNING) + buffer_strcat(alarms_to_hash, "W"); + else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL) + buffer_strcat(alarms_to_hash, "C"); + } + } else { + alarms_to_hash = buffer_create(1, NULL); + buffer_strcat(alarms_to_hash, ""); + len = 0; + } + + char hash[SHA256_DIGEST_LENGTH + 1]; + if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) { + hash[SHA256_DIGEST_LENGTH] = 0; + + struct alarm_checkpoint alarm_checkpoint; + char *claim_id = get_agent_claimid(); + alarm_checkpoint.claim_id = claim_id; + alarm_checkpoint.node_id = wc->node_id; + alarm_checkpoint.checksum = (char *)hash; + + aclk_send_provide_alarm_checkpoint(&alarm_checkpoint); + log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); + } else { + log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); + } + wc->alert_checkpoint_req = 0; + buffer_free(alarms_to_hash); +#endif +} diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 88a939e87..d7252aad6 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -5,27 +5,31 @@ extern sqlite3 *db_meta; +#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3 +#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4 + struct proto_alert_status { int alert_updates; - uint64_t alerts_batch_id; - uint64_t last_acked_sequence_id; uint64_t pending_min_sequence_id; uint64_t pending_max_sequence_id; uint64_t last_submitted_sequence_id; }; -int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_send_alarm_health_log(char *node_id); -void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); +int aclk_add_alert_event(struct aclk_sync_host_config *wc, struct aclk_database_cmd cmd); +void aclk_push_alert_event(struct aclk_sync_host_config *wc); void aclk_send_alarm_configuration (char *config_hash); -int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id); +int aclk_push_alert_config_event(char *node_id, char *config_hash); +void aclk_start_alert_streaming(char *node_id, bool resets); void sql_queue_removed_alerts_to_aclk(RRDHOST *host); -void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); +void sql_process_queue_removed_alerts_to_aclk(char *node_id); +void aclk_send_alarm_checkpoint(char *node_id, char *claim_id); +void aclk_push_alarm_checkpoint(RRDHOST *host); + +void aclk_push_alert_snapshot_event(char *node_id); +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); +void aclk_push_alert_events_for_all_hosts(void); + #endif //NETDATA_SQLITE_ACLK_ALERT_H diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index afe774997..3817296da 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -25,12 +25,17 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { return dict; } -#endif -void sql_build_node_collectors(struct aclk_database_worker_config *wc) +static void build_node_collectors(char *node_id __maybe_unused) { -#ifdef ENABLE_ACLK - if (!wc->host) + + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) + return; + + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; + if (unlikely(!wc)) return; struct update_node_collectors upd_node_collectors; @@ -39,48 +44,51 @@ void sql_build_node_collectors(struct aclk_database_worker_config *wc) upd_node_collectors.node_id = wc->node_id; upd_node_collectors.claim_id = get_agent_claimid(); - upd_node_collectors.node_collectors = collectors_from_charts(wc->host, dict); + upd_node_collectors.node_collectors = collectors_from_charts(host, dict); aclk_update_node_collectors(&upd_node_collectors); dictionary_destroy(dict); freez(upd_node_collectors.claim_id); - log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(wc->host)); -#else - UNUSED(wc); -#endif - return; + log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host)); + + freez(node_id); } -void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +static void build_node_info(char *node_id __maybe_unused) { - UNUSED(cmd); - -#ifdef ENABLE_ACLK struct update_node_info node_info; - if (!wc->host) { - wc->node_info_send = 1; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely((!host))) { + freez(node_id); + return; + } + + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; + + if (unlikely(!wc)) { + freez(node_id); return; } rrd_rdlock(); node_info.node_id = wc->node_id; node_info.claim_id = get_agent_claimid(); - node_info.machine_guid = wc->host_guid; + node_info.machine_guid = host->machine_guid; node_info.child = (wc->host != localhost); - node_info.ml_info.ml_capable = ml_capable(localhost); + node_info.ml_info.ml_capable = ml_capable(); node_info.ml_info.ml_enabled = ml_enabled(wc->host); node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host); now_realtime_timeval(&node_info.updated_at); - RRDHOST *host = wc->host; char *host_version = NULL; if (host != localhost) { netdata_mutex_lock(&host->receiver_lock); - host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : "unknown"); + host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host)); netdata_mutex_unlock(&host->receiver_lock); } @@ -91,7 +99,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.kernel_name = host->system_info->kernel_name; node_info.data.kernel_version = host->system_info->kernel_version; node_info.data.architecture = host->system_info->architecture; - node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores) : 0; + node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores, NULL) : 0; node_info.data.cpu_frequency = host->system_info->host_cpu_freq ? host->system_info->host_cpu_freq : "0"; node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0"; node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0"; @@ -101,7 +109,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown"; node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown"; node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", ""); - node_info.data.machine_guid = wc->host_guid; + node_info.data.machine_guid = host->machine_guid; struct capability node_caps[] = { { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled }, @@ -116,7 +124,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.host_labels_ptr = host->rrdlabels; aclk_update_node_info(&node_info); - log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), wc->host_guid, wc->host == localhost ? "parent" : "child"); + log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child"); rrd_unlock(); freez(node_info.claim_id); @@ -124,9 +132,47 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat freez(host_version); wc->node_collectors_send = now_realtime_sec(); -#else - UNUSED(wc); -#endif + freez(node_id); + +} + - return; +void aclk_check_node_info_and_collectors(void) +{ + RRDHOST *host; + + if (unlikely(!aclk_connected)) + return; + + size_t pending = 0; + dfe_start_reentrant(rrdhost_root_index, host) { + + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (unlikely(!wc)) + continue; + + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { + internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host)); + pending++; + continue; + } + + if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) { + wc->node_info_send_time = 0; + build_node_info(strdupz(wc->node_id)); + internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host)); + } + + if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { + build_node_collectors(strdupz(wc->node_id)); + internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host)); + wc->node_collectors_send = 0; + } + } + dfe_done(host); + + if(pending) + info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending); } + +#endif diff --git a/database/sqlite/sqlite_aclk_node.h b/database/sqlite/sqlite_aclk_node.h index c2c54f8c7..6afdf8d78 100644 --- a/database/sqlite/sqlite_aclk_node.h +++ b/database/sqlite/sqlite_aclk_node.h @@ -3,6 +3,5 @@ #ifndef NETDATA_SQLITE_ACLK_NODE_H #define NETDATA_SQLITE_ACLK_NODE_H -void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void sql_build_node_collectors(struct aclk_database_worker_config *wc); +void aclk_check_node_info_and_collectors(void); #endif //NETDATA_SQLITE_ACLK_NODE_H diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c index 892292cc7..b72726dc2 100644 --- a/database/sqlite/sqlite_context.c +++ b/database/sqlite/sqlite_context.c @@ -117,7 +117,6 @@ void sql_close_context_database(void) rc = sqlite3_close_v2(db_context_meta); if (unlikely(rc != SQLITE_OK)) error_report("Error %d while closing the context SQLite database, %s", rc, sqlite3_errstr(rc)); - return; } // @@ -243,8 +242,6 @@ failed: rc = sqlite3_reset(res); if (rc != SQLITE_OK) error_report("Failed to reset statement that fetches chart label data, rc = %d", rc); - - return; } // CONTEXT LIST @@ -372,9 +369,9 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) goto skip_store; } - rc = sqlite3_bind_int(res, 10, (time_t) context_data->deleted); + rc = sqlite3_bind_int(res, 10, context_data->deleted); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind last_time_t to store context details"); + error_report("Failed to bind deleted flag to store context details"); goto skip_store; } @@ -431,11 +428,11 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data) if (rc_stored != SQLITE_DONE) error_report("Failed to delete context %s, rc = %d", context_data->id, rc_stored); #ifdef NETDATA_INTERNAL_CHECKS - else { - char host_uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(*host_uuid, host_uuid_str); - info("%s: Deleted context %s under host %s", __FUNCTION__ , context_data->id, host_uuid_str); - } + else { + char host_uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(*host_uuid, host_uuid_str); + info("%s: Deleted context %s under host %s", __FUNCTION__, context_data->id, host_uuid_str); + } #endif skip_delete: @@ -449,6 +446,10 @@ skip_delete: int sql_context_cache_stats(int op) { int count, dummy; + + if (unlikely(!db_context_meta)) + return 0; + netdata_thread_disable_cancelability(); sqlite3_db_status(db_context_meta, op, &count, &dummy, 0); netdata_thread_enable_cancelability(); @@ -489,6 +490,8 @@ int ctx_unittest(void) uuid_t host_uuid; uuid_generate(host_uuid); + initialize_thread_key_pool(); + int rc = sql_init_context_database(1); if (rc != SQLITE_OK) @@ -556,6 +559,7 @@ int ctx_unittest(void) freez((void *)context_data.title); freez((void *)context_data.chart_type); freez((void *)context_data.family); + freez((void *)context_data.units); // The list should be empty info("List context start after delete"); diff --git a/database/sqlite/sqlite_db_migration.c b/database/sqlite/sqlite_db_migration.c index 8b1d01594..3132ae2d0 100644 --- a/database/sqlite/sqlite_db_migration.c +++ b/database/sqlite/sqlite_db_migration.c @@ -7,7 +7,7 @@ static int return_int_cb(void *data, int argc, char **argv, char **column) int *status = data; UNUSED(argc); UNUSED(column); - *status = str2uint32_t(argv[0]); + *status = str2uint32_t(argv[0], NULL); return 0; } @@ -49,7 +49,7 @@ static int column_exists_in_table(const char *table, const char *column) } const char *database_migrate_v1_v2[] = { - "ALTER TABLE host ADD hops INTEGER;", + "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0;", NULL }; diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index 1d03cfc2a..2fca2dfc8 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -49,7 +49,6 @@ const char *database_config[] = { const char *database_cleanup[] = { "DELETE FROM chart WHERE chart_id NOT IN (SELECT chart_id FROM dimension);", "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart);", - "DELETE FROM chart_label WHERE chart_id NOT IN (SELECT chart_id FROM chart);", "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);", "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host);", "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host);", @@ -117,7 +116,6 @@ int execute_insert(sqlite3_stmt *res) break; } } - return rc; } @@ -156,6 +154,12 @@ static void release_statement(void *statement) error_report("Failed to finalize statement, rc = %d", rc); } +void initialize_thread_key_pool(void) +{ + for (int i = 0; i < MAX_PREPARED_STATEMENTS; i++) + (void)pthread_key_create(&key_pool[i], release_statement); +} + int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement) { static __thread uint32_t keys_used = 0; @@ -448,8 +452,7 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) info("SQLite database initialization completed"); - for (int i = 0; i < MAX_PREPARED_STATEMENTS; i++) - (void)pthread_key_create(&key_pool[i], release_statement); + initialize_thread_key_pool(); rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0); if (unlikely(rc != SQLITE_OK)) @@ -505,14 +508,15 @@ skip: error_report("Failed to finalize statement %s, rc = %d", sql, rc); return result; } - -void db_execute(const char *cmd) +// Return 0 OK +// Return 1 Failed +int db_execute(sqlite3 *db, const char *cmd) { int rc; int cnt = 0; while (cnt < SQL_MAX_RETRY) { char *err_msg; - rc = sqlite3_exec_monitored(db_meta, cmd, 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db, cmd, 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt); sqlite3_free(err_msg); @@ -527,6 +531,7 @@ void db_execute(const char *cmd) ++cnt; } + return (rc != SQLITE_OK); } static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) @@ -540,7 +545,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) return; } - struct aclk_database_worker_config *wc = host->dbsync_worker; + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; if (unlikely(!host->node_id)) host->node_id = mallocz(sizeof(*host->node_id)); @@ -594,7 +599,7 @@ int update_node_id(uuid_t *host_id, uuid_t *node_id) rrd_wrlock(); host = rrdhost_find_by_guid(host_guid); if (likely(host)) - set_host_node_id(host, node_id); + set_host_node_id(host, node_id); rrd_unlock(); failed: @@ -604,48 +609,6 @@ failed: return rc - 1; } -#define SQL_SELECT_HOSTNAME_BY_NODE_ID "SELECT h.hostname FROM node_instance ni, " \ -"host h WHERE ni.host_id = h.host_id AND ni.node_id = @node_id;" - -char *get_hostname_by_node_id(char *node) -{ - sqlite3_stmt *res = NULL; - char *hostname = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return NULL; - } - - uuid_t node_id; - if (uuid_parse(node, node_id)) - return NULL; - - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOSTNAME_BY_NODE_ID, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch hostname by node id"); - return NULL; - } - - rc = sqlite3_bind_blob(res, 1, &node_id, sizeof(node_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); - goto failed; - } - - rc = sqlite3_step_monitored(res); - if (likely(rc == SQLITE_ROW)) - hostname = strdupz((char *)sqlite3_column_text(res, 0)); - -failed: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when search for hostname by node id"); - - return hostname; -} - #define SQL_SELECT_HOST_BY_NODE_ID "select host_id from node_instance where node_id = @node_id;" int get_host_id(uuid_t *node_id, uuid_t *host_id) @@ -684,7 +647,7 @@ failed: return (rc == SQLITE_ROW) ? 0 : -1; } -#define SQL_SELECT_NODE_ID "select node_id from node_instance where host_id = @host_id and node_id not null;" +#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL;" int get_node_id(uuid_t *host_id, uuid_t *node_id) { @@ -720,8 +683,8 @@ failed: return (rc == SQLITE_ROW) ? 0 : -1; } -#define SQL_INVALIDATE_NODE_INSTANCES "update node_instance set node_id = NULL where exists " \ - "(select host_id from node_instance where host_id = @host_id and (@claim_id is null or claim_id <> @claim_id));" +#define SQL_INVALIDATE_NODE_INSTANCES "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \ + "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id));" void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id) { @@ -765,8 +728,8 @@ failed: error_report("Failed to finalize the prepared statement when invalidating node instance information"); } -#define SQL_GET_NODE_INSTANCE_LIST "select ni.node_id, ni.host_id, h.hostname " \ - "from node_instance ni, host h where ni.host_id = h.host_id;" +#define SQL_GET_NODE_INSTANCE_LIST "SELECT ni.node_id, ni.host_id, h.hostname " \ + "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0;" struct node_instance_list *get_node_list(void) { @@ -805,13 +768,18 @@ struct node_instance_list *get_node_list(void) uuid_copy(node_list[row].node_id, *((uuid_t *)sqlite3_column_blob(res, 0))); if (sqlite3_column_bytes(res, 1) == sizeof(uuid_t)) { uuid_t *host_id = (uuid_t *)sqlite3_column_blob(res, 1); - uuid_copy(node_list[row].host_id, *host_id); - node_list[row].queryable = 1; uuid_unparse_lower(*host_id, host_guid); RRDHOST *host = rrdhost_find_by_guid(host_guid); - node_list[row].live = host && (host == localhost || host->receiver) ? 1 : 0; + if (rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) { + info("ACLK: 'host:%s' skipping get node list because context is initializing", rrdhost_hostname(host)); + continue; + } + uuid_copy(node_list[row].host_id, *host_id); + node_list[row].queryable = 1; + node_list[row].live = (host && (host == localhost || host->receiver + || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)))) ? 1 : 0; node_list[row].hops = (host && host->system_info) ? host->system_info->hops : - uuid_compare(*host_id, localhost->host_uuid) ? 1 : 0; + uuid_memcmp(host_id, &localhost->host_uuid) ? 1 : 0; node_list[row].hostname = sqlite3_column_bytes(res, 2) ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL; } @@ -950,6 +918,10 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b int sql_metadata_cache_stats(int op) { int count, dummy; + + if (unlikely(!db_meta)) + return 0; + netdata_thread_disable_cancelability(); sqlite3_db_status(db_meta, op, &count, &dummy, 0); netdata_thread_enable_cancelability(); diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 40abd010d..ee63a397c 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -55,7 +55,8 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement); int execute_insert(sqlite3_stmt *res); int exec_statement_with_uuid(const char *sql, uuid_t *uuid); -void db_execute(const char *cmd); +int db_execute(sqlite3 *database, const char *cmd); +void initialize_thread_key_pool(void); // Look up functions int get_node_id(uuid_t *host_id, uuid_t *node_id); diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c index 471fa3add..dd08f63ec 100644 --- a/database/sqlite/sqlite_health.c +++ b/database/sqlite/sqlite_health.c @@ -12,7 +12,7 @@ #define SQL_CREATE_HEALTH_LOG_TABLE(guid) "CREATE TABLE IF NOT EXISTS health_log_%s(hostname text, unique_id int, alarm_id int, alarm_event_id int, config_hash_id blob, updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, flags int, exec_run_timestamp int, delay_up_to_timestamp int, name text, chart text, family text, exec text, recipient text, source text, units text, info text, exec_code int, new_status real, old_status real, delay int, new_value double, old_value double, last_repeat int, class text, component text, type text, chart_context text);", guid int sql_create_health_log_table(RRDHOST *host) { int rc; - char *err_msg = NULL, command[MAX_HEALTH_SQL_SIZE + 1]; + char command[MAX_HEALTH_SQL_SIZE + 1]; if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) @@ -25,18 +25,17 @@ int sql_create_health_log_table(RRDHOST *host) { snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CREATE_HEALTH_LOG_TABLE(uuid_str)); - rc = sqlite3_exec_monitored(db_meta, command, 0, 0, &err_msg); - if (rc != SQLITE_OK) { - error_report("HEALTH [%s]: SQLite error during creation of health log table, rc = %d (%s)", rrdhost_hostname(host), rc, err_msg); - sqlite3_free(err_msg); - return 1; + rc = db_execute(db_meta, command); + if (unlikely(rc)) + error_report("HEALTH [%s]: SQLite error during creation of health log table", rrdhost_hostname(host)); + else { + snprintfz(command, MAX_HEALTH_SQL_SIZE, "CREATE INDEX IF NOT EXISTS health_log_index_%s ON health_log_%s (unique_id); ", uuid_str, uuid_str); + rc = db_execute(db_meta, command); + if (unlikely(unlikely(rc))) + error_report("HEALTH [%s]: SQLite error during creation of health log table index", rrdhost_hostname(host)); } - snprintfz(command, MAX_HEALTH_SQL_SIZE, "CREATE INDEX IF NOT EXISTS " - "health_log_index_%s ON health_log_%s (unique_id); ", uuid_str, uuid_str); - db_execute(command); - - return 0; + return rc; } /* Health related SQL queries @@ -104,7 +103,7 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) { error_report("HEALTH [%s]: Failed to update health log, rc = %d", rrdhost_hostname(host), rc); } - failed: +failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("HEALTH [%s]: Failed to finalize the prepared statement for updating health log.", rrdhost_hostname(host)); } @@ -345,7 +344,7 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { ae->flags |= HEALTH_ENTRY_FLAG_SAVED; host->health.health_log_entries_written++; - failed: +failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("HEALTH [%s]: Failed to finalize the prepared statement for inserting to health log.", rrdhost_hostname(host)); } @@ -452,7 +451,7 @@ void sql_health_alarm_log_count(RRDHOST *host) { #define SQL_INJECT_REMOVED_UPDATE(guid) "update health_log_%s set flags = flags | ?1, updated_by_id = ?2 where unique_id = ?3; ", guid void sql_inject_removed_status(char *uuid_str, uint32_t alarm_id, uint32_t alarm_event_id, uint32_t unique_id, uint32_t max_unique_id) { - int rc = 0; + int rc; char command[MAX_HEALTH_SQL_SIZE + 1]; if (!alarm_id || !alarm_event_id || !unique_id || !max_unique_id) @@ -546,7 +545,7 @@ failed: #define SQL_SELECT_MAX_UNIQUE_ID(guid) "SELECT MAX(unique_id) from health_log_%s", guid uint32_t sql_get_max_unique_id (char *uuid_str) { - int rc = 0; + int rc; char command[MAX_HEALTH_SQL_SIZE + 1]; uint32_t max_unique_id = 0; @@ -573,10 +572,9 @@ uint32_t sql_get_max_unique_id (char *uuid_str) #define SQL_SELECT_LAST_STATUSES(guid) "SELECT new_status, unique_id, alarm_id, alarm_event_id from health_log_%s group by alarm_id having max(alarm_event_id)", guid void sql_check_removed_alerts_state(char *uuid_str) { - int rc = 0; + int rc; char command[MAX_HEALTH_SQL_SIZE + 1]; - RRDCALC_STATUS status; - uint32_t alarm_id = 0, alarm_event_id = 0, unique_id = 0, max_unique_id = 0; + uint32_t max_unique_id = 0; sqlite3_stmt *res = NULL; @@ -588,6 +586,9 @@ void sql_check_removed_alerts_state(char *uuid_str) } while (sqlite3_step_monitored(res) == SQLITE_ROW) { + uint32_t alarm_id, alarm_event_id, unique_id; + RRDCALC_STATUS status; + status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); unique_id = (uint32_t) sqlite3_column_int64(res, 1); alarm_id = (uint32_t) sqlite3_column_int64(res, 2); @@ -683,8 +684,7 @@ void sql_health_alarm_log_load(RRDHOST *host) { } // Check if we got last_repeat field - time_t last_repeat = 0; - last_repeat = (time_t)sqlite3_column_int64(res, 27); + time_t last_repeat = (time_t)sqlite3_column_int64(res, 27); rc = dictionary_get(all_rrdcalcs, (char *) sqlite3_column_text(res, 14)); if(unlikely(rc)) { @@ -847,196 +847,161 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg) } } - param++; - rc = sqlite3_bind_blob(res, param, hash_id, sizeof(*hash_id), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, hash_id, sizeof(*hash_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->alarm, param); + rc = sqlite3_bind_string_or_null(res, cfg->alarm, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->template_key, param); + rc = sqlite3_bind_string_or_null(res, cfg->template_key, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->on, param); + rc = sqlite3_bind_string_or_null(res, cfg->on, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->classification, param); + rc = sqlite3_bind_string_or_null(res, cfg->classification, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->component, param); + rc = sqlite3_bind_string_or_null(res, cfg->component, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->type, param); + rc = sqlite3_bind_string_or_null(res, cfg->type, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->os, param); + rc = sqlite3_bind_string_or_null(res, cfg->os, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->host, param); + rc = sqlite3_bind_string_or_null(res, cfg->host, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->lookup, param); + rc = sqlite3_bind_string_or_null(res, cfg->lookup, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->every, param); + rc = sqlite3_bind_string_or_null(res, cfg->every, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->units, param); + rc = sqlite3_bind_string_or_null(res, cfg->units, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->calc, param); + rc = sqlite3_bind_string_or_null(res, cfg->calc, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->families, param); + rc = sqlite3_bind_string_or_null(res, cfg->families, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->plugin, param); + rc = sqlite3_bind_string_or_null(res, cfg->plugin, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->module, param); + rc = sqlite3_bind_string_or_null(res, cfg->module, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->charts, param); + rc = sqlite3_bind_string_or_null(res, cfg->charts, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->green, param); + rc = sqlite3_bind_string_or_null(res, cfg->green, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->red, param); + rc = sqlite3_bind_string_or_null(res, cfg->red, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->warn, param); + rc = sqlite3_bind_string_or_null(res, cfg->warn, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->crit, param); + rc = sqlite3_bind_string_or_null(res, cfg->crit, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->exec, param); + rc = sqlite3_bind_string_or_null(res, cfg->exec, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->to, param); + rc = sqlite3_bind_string_or_null(res, cfg->to, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->info, param); + rc = sqlite3_bind_string_or_null(res, cfg->info, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->delay, param); + rc = sqlite3_bind_string_or_null(res, cfg->delay, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->options, param); + rc = sqlite3_bind_string_or_null(res, cfg->options, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->repeat, param); + rc = sqlite3_bind_string_or_null(res, cfg->repeat, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->host_labels, param); + rc = sqlite3_bind_string_or_null(res, cfg->host_labels, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; if (cfg->p_db_lookup_after) { - param++; - rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_dimensions, param); + rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_dimensions, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_method, param); + rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_method, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 31, cfg->p_db_lookup_options); + rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_options); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 32, cfg->p_db_lookup_after); + rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_after); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 33, cfg->p_db_lookup_before); + rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_before); if (unlikely(rc != SQLITE_OK)) goto bind_fail; } else { - param++; - rc = sqlite3_bind_null(res, 29); + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_null(res, 30); + + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_null(res, 31); + + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_null(res, 32); + + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_null(res, 33); + + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; } - param++; - rc = sqlite3_bind_int(res, 34, cfg->p_update_every); + rc = sqlite3_bind_int(res, ++param, cfg->p_update_every); if (unlikely(rc != SQLITE_OK)) goto bind_fail; @@ -1050,7 +1015,7 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg) return 0; - bind_fail: +bind_fail: error_report("Failed to bind parameter %d to store alert hash_id, rc = %d", param, rc); rc = sqlite3_reset(res); if (unlikely(rc != SQLITE_OK)) diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 35f928ffa..607d789a5 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -64,9 +64,11 @@ enum metadata_opcode { METADATA_STORE_CLAIM_ID, METADATA_ADD_HOST_INFO, METADATA_SCAN_HOSTS, + METADATA_LOAD_HOST_CONTEXT, METADATA_MAINTENANCE, METADATA_SYNC_SHUTDOWN, METADATA_UNITTEST, + METADATA_ML_LOAD_MODELS, // leave this last // we need it to check for worker utilization METADATA_MAX_ENUMERATIONS_DEFINED @@ -154,19 +156,43 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value return 1; } -static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer) +#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;" +#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;" + +static void clean_old_chart_labels(RRDSET *st) +{ + char sql[512]; + time_t first_time_s = rrdset_first_entry_s(st); + + if (unlikely(!first_time_s)) + snprintfz(sql, 511,SQL_DELETE_CHART_LABEL); + else + snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); + + int rc = exec_statement_with_uuid(sql, &st->chart_uuid); + if (unlikely(rc)) + error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st)); +} + +static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter) { size_t old_version = st->rrdlabels_last_saved_version; size_t new_version = dictionary_version(st->rrdlabels); - if(new_version != old_version) { - buffer_flush(work_buffer); - struct query_build tmp = {.sql = work_buffer, .count = 0}; - uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); - rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + if (new_version == old_version) + return 0; + + struct query_build tmp = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); + rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + int rc = db_execute(db_meta, buffer_tostring(work_buffer)); + if (likely(!rc)) { st->rrdlabels_last_saved_version = new_version; - db_execute(buffer_tostring(work_buffer)); + (*query_counter)++; } + + clean_old_chart_labels(st); + return rc; } // Migrate all hosts with hops zero to this host_uuid @@ -177,12 +203,13 @@ void migrate_localhost(uuid_t *host_uuid) rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid); if (!rc) rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid); - if (!rc) - db_execute(DELETE_MISSING_NODE_INSTANCES); - + if (!rc) { + if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES))) + error_report("Failed to remove deleted hosts from node instances"); + } } -static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) +static int store_claim_id(uuid_t *host_id, uuid_t *claim_id) { sqlite3_stmt *res = NULL; int rc; @@ -190,18 +217,18 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) error_report("Database has not been initialized"); - return; + return 1; } rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement store chart labels"); - return; + error_report("Failed to prepare statement to store host claim id"); + return 1; } rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to store node instance information"); + error_report("Failed to bind host_id parameter to store claim id"); goto failed; } @@ -210,17 +237,19 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) else rc = sqlite3_bind_null(res, 2); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind claim_id parameter to store node instance information"); + error_report("Failed to bind claim_id parameter to host claim id"); goto failed; } rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store node instance information, rc = %d", rc); + error_report("Failed to store host claim id rc = %d", rc); failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when storing node instance information"); + error_report("Failed to finalize the prepared statement when storing a host claim id"); + + return rc != SQLITE_DONE; } static void delete_dimension_uuid(uuid_t *dimension_uuid) @@ -252,7 +281,7 @@ skip_execution: // // Store host and host system info information in the database -static int sql_store_host_info(RRDHOST *host) +static int store_host_metadata(RRDHOST *host) { static __thread sqlite3_stmt *res = NULL; int rc, param = 0; @@ -340,7 +369,7 @@ static int sql_store_host_info(RRDHOST *host) if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc); - return !(store_rc == SQLITE_DONE); + return store_rc != SQLITE_DONE; bind_fail: error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc); rc = sqlite3_reset(res); @@ -349,7 +378,7 @@ bind_fail: return 1; } -static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data) +static void add_host_sysinfo_key_value(const char *name, const char *value, void *data) { struct query_build *lb = data; @@ -365,44 +394,43 @@ static void sql_store_host_system_info_key_value(const char *name, const char *v lb->count++; } -static BUFFER *sql_store_host_system_info(RRDHOST *host) +static bool build_host_system_info_statements(RRDHOST *host, BUFFER *work_buffer) { struct rrdhost_system_info *system_info = host->system_info; if (unlikely(!system_info)) - return NULL; - - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + return false; + buffer_flush(work_buffer); struct query_build key_data = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, key_data.uuid_str); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data); - - return work_buffer; + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data); + + return true; } @@ -410,13 +438,10 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host) * Store a chart in the database */ -static int sql_store_chart( - uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family, - const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, - int update_every, int chart_type, int memory_mode, long history_entries) +static int store_chart_metadata(RRDSET *st) { static __thread sqlite3_stmt *res = NULL; - int rc, param = 0; + int rc, param = 0, store_rc = 0; if (unlikely(!db_meta)) { if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) @@ -433,98 +458,83 @@ static int sql_store_chart( } } - param++; - rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; + const char *name = string2str(st->parts.name); if (name && *name) - rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC); else - rc = sqlite3_bind_null(res, 5); + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 12, (int) priority); + rc = sqlite3_bind_int(res, ++param, (int) st->priority); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 13, update_every); + rc = sqlite3_bind_int(res, ++param, st->update_every); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 14, chart_type); + rc = sqlite3_bind_int(res, ++param, st->chart_type); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 15, memory_mode); + rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 16, (int) history_entries); + rc = sqlite3_bind_int(res, ++param, (int) st->entries); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store chart, rc = %d", rc); + store_rc = execute_insert(res); + if (unlikely(store_rc != SQLITE_DONE)) + error_report("Failed to store chart, rc = %d", store_rc); rc = sqlite3_reset(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement in chart store function, rc = %d", rc); - return 0; + return store_rc != SQLITE_DONE; bind_fail: error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc); @@ -537,9 +547,7 @@ bind_fail: /* * Store a dimension */ -static int sql_store_dimension( - uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier, - collected_number divisor, int algorithm, bool hidden) +static int store_dimension_metadata(RRDDIM *rd) { static __thread sqlite3_stmt *res = NULL; int rc, param = 0; @@ -559,35 +567,35 @@ static int sql_store_dimension( } } - rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, (int) multiplier); + rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, (int ) divisor); + rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, algorithm); + rc = sqlite3_bind_int(res, ++param, rd->algorithm); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - if (hidden) + if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)) rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC); else rc = sqlite3_bind_null(res, ++param); @@ -700,7 +708,6 @@ static void cleanup_health_log(void) // // EVENT LOOP STARTS HERE // -static uv_mutex_t metadata_async_lock; static void metadata_init_cmd_queue(struct metadata_wc *wc) { @@ -856,6 +863,7 @@ static void start_metadata_cleanup(uv_work_t *req) struct metadata_wc *wc = req->data; check_dimension_metadata(wc); cleanup_health_log(); + (void) sqlite3_wal_checkpoint(db_meta, NULL); worker_is_idle(); } @@ -863,9 +871,125 @@ struct scan_metadata_payload { uv_work_t request; struct metadata_wc *wc; struct completion *completion; + BUFFER *work_buffer; uint32_t max_count; }; +struct host_context_load_thread { + uv_thread_t thread; + RRDHOST *host; + bool busy; + bool finished; +}; + +static void restore_host_context(void *arg) +{ + struct host_context_load_thread *hclt = arg; + RRDHOST *host = hclt->host; + + usec_t started_ut = now_monotonic_usec(); (void)started_ut; + rrdhost_load_rrdcontext_data(host); + usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; + + rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); + +#ifdef ENABLE_ACLK + aclk_queue_node_info(host, false); +#endif + + internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host), + (double)(ended_ut - started_ut) / USEC_PER_MS); + + __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE); +} + +// Callback after scan of hosts is done +static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused) +{ + struct scan_metadata_payload *data = req->data; + freez(data); +} + +#define MAX_FIND_THREAD_RETRIES (10) + +static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait) +{ + for (size_t index = 0; index < max_thread_slots; index++) { + if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED) + || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) { + int rc = uv_thread_join(&(hclt[index].thread)); + if (rc) + error("Failed to join thread, rc = %d",rc); + __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE); + __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE); + } + } +} + +static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index) +{ + size_t retries = MAX_FIND_THREAD_RETRIES; + while (retries--) { + size_t index = 0; + while (index < max_thread_slots) { + if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) { + *found_index = index; + return true; + } + index++; + } + sleep_usec(10 * USEC_PER_MS); + } + return false; +} + +static void start_all_host_load_context(uv_work_t *req __maybe_unused) +{ + register_libuv_worker_jobs(); + + struct scan_metadata_payload *data = req->data; + UNUSED(data); + + worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD); + usec_t started_ut = now_monotonic_usec(); (void)started_ut; + + RRDHOST *host; + + size_t max_threads = MIN(get_netdata_cpus() / 2, 6); + struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt)); + + size_t thread_index; + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) || + !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) + continue; + + rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); + internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host)); + + cleanup_finished_threads(hclt, max_threads, false); + bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + + if (unlikely(!found_slot)) { + struct host_context_load_thread hclt_sync = {.host = host}; + restore_host_context(&hclt_sync); + } + else { + __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); + hclt[thread_index].host = host; + assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); + } + } + dfe_done(host); + + cleanup_finished_threads(hclt, max_threads, true); + freez(hclt); + usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; + internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); + + worker_is_idle(); +} + // Callback after scan of hosts is done static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) { @@ -881,13 +1005,15 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) freez(data); } -static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) { +static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) { RRDSET *st; int rc; bool more_to_do = false; uint32_t scan_count = 1; - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + + if (use_transaction) + (void)db_execute(db_meta, "BEGIN TRANSACTION;"); rrdset_foreach_reentrant(st, host) { if (scan_count == max_count) { @@ -900,27 +1026,16 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_ rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); scan_count++; - check_and_update_chart_labels(st, work_buffer); - - rc = sql_store_chart( - &st->chart_uuid, - &st->rrdhost->host_uuid, - string2str(st->parts.type), - string2str(st->parts.id), - string2str(st->parts.name), - rrdset_family(st), - rrdset_context(st), - rrdset_title(st), - rrdset_units(st), - rrdset_plugin_name(st), - rrdset_module_name(st), - st->priority, - st->update_every, - st->chart_type, - st->rrd_memory_mode, - st->entries); + buffer_flush(work_buffer); + rc = check_and_update_chart_labels(st, work_buffer, query_counter); + if (unlikely(rc)) + error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st)); + else + (*query_counter)++; + + rc = store_chart_metadata(st); if (unlikely(rc)) - internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id)); + error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st)); } RRDDIM *rd; @@ -935,116 +1050,145 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_ else rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - rc = sql_store_dimension( - &rd->metric_uuid, - &rd->rrdset->chart_uuid, - string2str(rd->id), - string2str(rd->name), - rd->multiplier, - rd->divisor, - rd->algorithm, - rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)); - + rc = store_dimension_metadata(rd); if (unlikely(rc)) - error_report("METADATA: Failed to store dimension %s", string2str(rd->id)); + error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s", + rrdhost_hostname(host), rrdset_name(st), + rrddim_name(rd)); } } rrddim_foreach_done(rd); } rrdset_foreach_done(st); - buffer_free(work_buffer); + if (use_transaction) + (void)db_execute(db_meta, "COMMIT TRANSACTION;"); + return more_to_do; } +static void store_host_and_system_info(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter) +{ + bool free_work_buffer = (NULL == work_buffer); + + if (unlikely(free_work_buffer)) + work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + + if (build_host_system_info_statements(host, work_buffer)) { + int rc = db_execute(db_meta, buffer_tostring(work_buffer)); + if (unlikely(rc)) { + error_report("METADATA: 'host:%s': Failed to store host updated information in the database", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); + } + else { + if (likely(query_counter)) + (*query_counter)++; + } + } + + if (unlikely(store_host_metadata(host))) { + error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); + } + else { + if (likely(query_counter)) + (*query_counter)++; + } + + if (unlikely(free_work_buffer)) + buffer_free(work_buffer); +} + // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { register_libuv_worker_jobs(); RRDHOST *host; + int transaction_started = 0; struct scan_metadata_payload *data = req->data; struct metadata_wc *wc = data->wc; + BUFFER *work_buffer = data->work_buffer; usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut; internal_error(true, "METADATA: checking all hosts..."); + usec_t started_ut = now_monotonic_usec(); (void)started_ut; bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); if (!data->max_count) - db_execute("BEGIN TRANSACTION;"); + transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;"); + dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) continue; size_t query_counter = 0; (void)query_counter; - usec_t started_ut = now_monotonic_usec(); (void)started_ut; rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE); if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) { rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS); + int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); - if (likely(rc == SQLITE_OK)) { - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + if (likely(!rc)) { + query_counter++; + + buffer_flush(work_buffer); struct query_build tmp = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, tmp.uuid_str); rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); - db_execute(buffer_tostring(work_buffer)); - buffer_free(work_buffer); - query_counter++; + rc = db_execute(db_meta, buffer_tostring(work_buffer)); + + if (unlikely(rc)) { + error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); + } + else + query_counter++; + } else { + error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); } } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) { rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID); uuid_t uuid; - + int rc; if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid))) - store_claim_id(&host->host_uuid, &uuid); + rc = store_claim_id(&host->host_uuid, &uuid); else - store_claim_id(&host->host_uuid, NULL); - - query_counter++; - } - - if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) { - rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); - - BUFFER *work_buffer = sql_store_host_system_info(host); - if(work_buffer) { - db_execute(buffer_tostring(work_buffer)); - buffer_free(work_buffer); - query_counter++; - } + rc = store_claim_id(&host->host_uuid, NULL); - int rc = sql_store_host_info(host); if (unlikely(rc)) - error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE); else query_counter++; } + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) { + rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); + store_host_and_system_info(host, work_buffer, &query_counter); + } - if (data->max_count) - db_execute("BEGIN TRANSACTION;"); - if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) { + // For clarity + bool use_transaction = data->max_count; + if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) { run_again = true; rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host)); } - if (data->max_count) - db_execute("COMMIT TRANSACTION;"); - usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms", rrdhost_hostname(host), query_counter, (double)(ended_ut - started_ut) / USEC_PER_MS); } dfe_done(host); - if (!data->max_count) - db_execute("COMMIT TRANSACTION;"); + + if (!data->max_count && transaction_started) + transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", @@ -1059,7 +1203,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) static void metadata_event_loop(void *arg) { - service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register("METASYNC"); worker_register_job_name(METADATA_DATABASE_NOOP, "noop"); worker_register_job_name(METADATA_DATABASE_TIMER, "timer"); @@ -1067,6 +1210,7 @@ static void metadata_event_loop(void *arg) worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info"); worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); + worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models"); int ret; uv_loop_t *loop; @@ -1076,6 +1220,7 @@ static void metadata_event_loop(void *arg) uv_work_t metadata_cleanup_worker; uv_thread_set_name_np(wc->thread, "METASYNC"); +// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); loop = wc->loop = mallocz(sizeof(uv_loop_t)); ret = uv_loop_init(loop); if (ret) { @@ -1112,11 +1257,12 @@ static void metadata_event_loop(void *arg) int shutdown = 0; wc->row_id = 0; completion_mark_complete(&wc->init_complete); + BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + struct scan_metadata_payload *data; while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) { uuid_t *uuid; RRDHOST *host = NULL; - int rc; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); @@ -1145,6 +1291,11 @@ static void metadata_event_loop(void *arg) case METADATA_DATABASE_TIMER: break; + case METADATA_ML_LOAD_MODELS: { + RRDDIM *rd = (RRDDIM *) cmd.param[0]; + ml_dimension_load_models(rd); + break; + } case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; if (likely(dimension_can_be_deleted(uuid))) @@ -1158,9 +1309,7 @@ static void metadata_event_loop(void *arg) break; case METADATA_ADD_HOST_INFO: host = (RRDHOST *) cmd.param[0]; - rc = sql_store_host_info(host); - if (unlikely(rc)) - error_report("Failed to store host info in the database for %s", string2str(host->hostname)); + store_host_and_system_info(host, NULL, NULL); break; case METADATA_SCAN_HOSTS: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS))) @@ -1169,10 +1318,11 @@ static void metadata_event_loop(void *arg) if (unittest_running) break; - struct scan_metadata_payload *data = mallocz(sizeof(*data)); + data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; data->completion = cmd.completion; // Completion by the worker + data->work_buffer = work_buffer; if (unlikely(cmd.completion)) { data->max_count = 0; // 0 will process all pending updates @@ -1192,6 +1342,19 @@ static void metadata_event_loop(void *arg) metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); } break; + case METADATA_LOAD_HOST_CONTEXT:; + if (unittest_running) + break; + + data = callocz(1,sizeof(*data)); + data->request.data = data; + data->wc = wc; + if (unlikely( + uv_queue_work(loop,&data->request, start_all_host_load_context, + after_start_host_load_context))) { + freez(data); + } + break; case METADATA_MAINTENANCE: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP))) break; @@ -1220,21 +1383,14 @@ static void metadata_event_loop(void *arg) if (!uv_timer_stop(&wc->timer_req)) uv_close((uv_handle_t *)&wc->timer_req, NULL); - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour we need to be aware if this becomes - * an issue in the future. - */ uv_close((uv_handle_t *)&wc->async, NULL); - uv_run(loop, UV_RUN_DEFAULT); - uv_cond_destroy(&wc->cmd_cond); int rc; - do { rc = uv_loop_close(loop); } while (rc != UV_EBUSY); + buffer_free(work_buffer); freez(loop); worker_unregister(); @@ -1272,6 +1428,9 @@ void metadata_sync_shutdown(void) void metadata_sync_shutdown_prepare(void) { + if (unlikely(!metasync_worker.loop)) + return; + struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -1303,8 +1462,6 @@ void metadata_sync_init(void) { struct metadata_wc *wc = &metasync_worker; - fatal_assert(0 == uv_mutex_init(&metadata_async_lock)); - memset(wc, 0, sizeof(*wc)); metadata_init_cmd_queue(wc); completion_init(&wc->init_complete); @@ -1364,6 +1521,20 @@ void metaqueue_host_update_info(RRDHOST *host) queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL); } +void metaqueue_ml_load_models(RRDDIM *rd) +{ + if (unlikely(!metasync_worker.loop)) + return; + queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL); +} + +void metadata_queue_load_host_context(RRDHOST *host) +{ + if (unlikely(!metasync_worker.loop)) + return; + queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL); +} + // // unitests // @@ -1419,7 +1590,7 @@ static void *metadata_unittest_threads(void) unittest_queue_metadata, &tu); } - uv_async_send(&metasync_worker.async); + (void) uv_async_send(&metasync_worker.async); sleep_usec(seconds_to_run * USEC_PER_SEC); __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED); diff --git a/database/sqlite/sqlite_metadata.h b/database/sqlite/sqlite_metadata.h index d578b7a8f..6b0676ee7 100644 --- a/database/sqlite/sqlite_metadata.h +++ b/database/sqlite/sqlite_metadata.h @@ -14,7 +14,9 @@ void metadata_sync_shutdown_prepare(void); void metaqueue_delete_dimension_uuid(uuid_t *uuid); void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid); void metaqueue_host_update_info(RRDHOST *host); +void metaqueue_ml_load_models(RRDDIM *rd); void migrate_localhost(uuid_t *host_uuid); +void metadata_queue_load_host_context(RRDHOST *host); // UNIT TEST int metadata_unittest(void); |