diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /database/sqlite/sqlite_aclk.c | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 908 |
1 files changed, 379 insertions, 529 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 3b0c40522..a33e09f5d 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -5,58 +5,176 @@ #include "sqlite_aclk_node.h" +struct aclk_sync_config_s { + uv_thread_t thread; + uv_loop_t loop; + uv_timer_t timer_req; + time_t cleanup_after; // Start a cleanup after this timestamp + uv_async_t async; + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + bool initialized; + volatile unsigned queue_size; + struct aclk_database_cmdqueue cmd_queue; +} aclk_sync_config = { 0 }; + + void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } -static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) + +int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) { - struct aclk_database_worker_config *wc = data; - UNUSED(argc); - UNUSED(column); + unsigned queue_size; - debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.data = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(wc, &cmd); + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return 1; + } + + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); return 0; } -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" - -static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) +static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { - char *err_msg = NULL; - debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); } + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + + /* wake up event loop */ + int rc = uv_async_send(&aclk_sync_config.async); + if (unlikely(rc)) + debug(D_ACLK_SYNC, "Failed to wake up event loop"); } -static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +enum { + IDX_HOST_ID, + IDX_HOSTNAME, + IDX_REGISTRY, + IDX_UPDATE_EVERY, + IDX_OS, + IDX_TIMEZONE, + IDX_TAGS, + IDX_HOPS, + IDX_MEMORY_MODE, + IDX_ABBREV_TIMEZONE, + IDX_UTC_OFFSET, + IDX_PROGRAM_NAME, + IDX_PROGRAM_VERSION, + IDX_ENTRIES, + IDX_HEALTH_ENABLED, +}; + +static int create_host_callback(void *data, int argc, char **argv, char **column) { - UNUSED(cmd); + int *number_of_chidren = data; + UNUSED(argc); + UNUSED(column); - debug(D_ACLK, "Checking database for %s", wc->host_guid); + char guid[UUID_STR_LEN]; + uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); - db_execute(buffer_tostring(sql)); + system_info->hops = str2i((const char *) argv[IDX_HOPS]); - buffer_free(sql); + sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); + + RRDHOST *host = rrdhost_find_or_create( + (const char *) argv[IDX_HOSTNAME] + , (const char *) argv[IDX_REGISTRY] + , guid + , (const char *) argv[IDX_OS] + , (const char *) argv[IDX_TIMEZONE] + , (const char *) argv[IDX_ABBREV_TIMEZONE] + , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0) + , (const char *) argv[IDX_TAGS] + , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") + , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") + , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 + , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0 + , default_rrd_memory_mode + , 0 // health + , 0 // rrdpush enabled + , NULL //destination + , NULL // api key + , NULL // send charts matching + , false // rrdpush_enable_replication + , 0 // rrdpush_seconds_to_replicate + , 0 // rrdpush_replication_step + , system_info + , 1 + ); + if (likely(host)) + host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + + (*number_of_chidren)++; + +#ifdef NETDATA_INTERNAL_CHECKS + char node_str[UUID_STR_LEN] = "<none>"; + if (likely(host->node_id)) + uuid_unparse_lower(*host->node_id, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); +#endif + return 0; } +#ifdef ENABLE_ACLK +static struct aclk_database_cmd aclk_database_deq_cmd(void) +{ + struct aclk_database_cmd ret; + unsigned queue_size; -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + queue_size = aclk_sync_config.queue_size; + if (queue_size == 0) { + memset(&ret, 0, sizeof(ret)); + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; + + } else { + /* dequeue command */ + ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; + if (queue_size == 1) { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; + } else { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.head + 1 : 0; + } + aclk_sync_config.queue_size = queue_size - 1; + /* wake up producers */ + uv_cond_signal(&aclk_sync_config.cmd_cond); + } + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return ret; +} + +#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" static int is_host_available(uuid_t *host_id) { sqlite3_stmt *res = NULL; @@ -76,7 +194,7 @@ static int is_host_available(uuid_t *host_id) rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); + error_report("Failed to bind host_id parameter to check host existence"); goto failed; } rc = sqlite3_step_monitored(res); @@ -89,15 +207,13 @@ failed: } // OPCODE: ACLK_DATABASE_DELETE_HOST -void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +static void sql_delete_aclk_table_list(char *host_guid) { - UNUSED(wc); - char uuid_str[GUID_LEN + 1]; - char host_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_str[UUID_STR_LEN]; int rc; uuid_t host_uuid; - char *host_guid = (char *)cmd.data; if (unlikely(!host_guid)) return; @@ -139,273 +255,67 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); - db_execute(buffer_tostring(sql)); + rc = db_execute(db_meta, buffer_tostring(sql)); + if (unlikely(rc)) + error("Failed to drop unused ACLK tables"); fail: buffer_free(sql); } -uv_mutex_t aclk_async_lock; -struct aclk_database_worker_config *aclk_thread_head = NULL; - -int claimed() +static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) { - int rc; - rrdhost_aclk_state_lock(localhost); - rc = (localhost->aclk_state.claimed_id != NULL); - rrdhost_aclk_state_unlock(localhost); - return rc; -} - -void aclk_add_worker_thread(struct aclk_database_worker_config *wc) -{ - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - if (unlikely(!wc->host)) { - wc->next = aclk_thread_head; - aclk_thread_head = wc; - } - uv_mutex_unlock(&aclk_async_lock); + debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DELETE_HOST; + cmd.param[0] = strdupz((char *) argv[0]); + aclk_database_enq_cmd_noblock(&cmd); + return 0; } -void aclk_del_worker_thread(struct aclk_database_worker_config *wc) -{ - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config **tmp = &aclk_thread_head; - while (*tmp && (*tmp) != wc) - tmp = &(*tmp)->next; - if (*tmp) - *tmp = wc->next; - uv_mutex_unlock(&aclk_async_lock); -} +#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" -int aclk_worker_thread_exists(char *guid) +static void sql_check_aclk_table_list(void) { - int rc = 0; - uv_mutex_lock(&aclk_async_lock); - - struct aclk_database_worker_config *tmp = aclk_thread_head; - - while (tmp && !rc) { - rc = strcmp(tmp->uuid_str, guid) == 0; - tmp = tmp->next; + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); } - uv_mutex_unlock(&aclk_async_lock); - return rc; } -void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc) -{ - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); -} +#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;" -int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) +static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return 1; - } - - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); + char sql[512]; + snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); + if (unlikely(db_execute(db_meta, sql))) + error_report("Failed to clean stale ACLK alert entries"); return 0; } -void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) -{ - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if (wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return; - } - while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); - - /* wake up event loop */ - int rc = uv_async_send(&wc->async); - if (unlikely(rc)) - debug(D_ACLK_SYNC, "Failed to wake up event loop"); -} - -struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0 || wc->is_shutting_down) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - if (wc->is_shutting_down) - uv_cond_signal(&wc->cmd_cond); - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); - } - uv_mutex_unlock(&wc->cmd_mutex); - - return ret; -} +#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" -struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id) +static void sql_maint_aclk_sync_database_all(void) { - if (unlikely(!node_id)) - return NULL; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config *wc = aclk_thread_head; - - while (wc) { - if (!strcmp(wc->node_id, node_id)) - break; - wc = wc->next; + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); } - uv_mutex_unlock(&aclk_async_lock); - - return (wc); } -void aclk_sync_exit_all() -{ - rrd_rdlock(); - RRDHOST *host; - rrdhost_foreach_read(host) { - struct aclk_database_worker_config *wc = host->dbsync_worker; - if (wc) { - wc->is_shutting_down = 1; - (void) aclk_database_deq_cmd(wc); - uv_cond_signal(&wc->cmd_cond); - } - } - rrd_unlock(); - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config *wc = aclk_thread_head; - while (wc) { - wc->is_shutting_down = 1; - wc = wc->next; - } - uv_mutex_unlock(&aclk_async_lock); -} - -enum { - IDX_HOST_ID, - IDX_HOSTNAME, - IDX_REGISTRY, - IDX_UPDATE_EVERY, - IDX_OS, - IDX_TIMEZONE, - IDX_TAGS, - IDX_HOPS, - IDX_MEMORY_MODE, - IDX_ABBREV_TIMEZONE, - IDX_UTC_OFFSET, - IDX_PROGRAM_NAME, - IDX_PROGRAM_VERSION, - IDX_ENTRIES, - IDX_HEALTH_ENABLED, -}; - -static int create_host_callback(void *data, int argc, char **argv, char **column) -{ - UNUSED(data); - UNUSED(argc); - UNUSED(column); - - char guid[UUID_STR_LEN]; - uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); - - struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); - __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); - - system_info->hops = str2i((const char *) argv[IDX_HOPS]); - - sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); - - RRDHOST *host = rrdhost_find_or_create( - (const char *) argv[IDX_HOSTNAME] - , (const char *) argv[IDX_REGISTRY] - , guid - , (const char *) argv[IDX_OS] - , (const char *) argv[IDX_TIMEZONE] - , (const char *) argv[IDX_ABBREV_TIMEZONE] - , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0 - , (const char *) argv[IDX_TAGS] - , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") - , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") - , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 - , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0 - , default_rrd_memory_mode - , 0 // health - , 0 // rrdpush enabled - , NULL //destination - , NULL // api key - , NULL // send charts matching - , false // rrdpush_enable_replication - , 0 // rrdpush_seconds_to_replicate - , 0 // rrdpush_replication_step - , system_info - , 1 - ); - if (likely(host)) - host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); - -#ifdef NETDATA_INTERNAL_CHECKS - char node_str[UUID_STR_LEN] = "<none>"; - if (likely(host->node_id)) - uuid_unparse_lower(*host->node_id, node_str); - internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); -#endif - return 0; -} - -#ifdef ENABLE_ACLK -static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) +static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { char uuid_str[GUID_LEN + 1]; - UNUSED(data); - UNUSED(argc); - UNUSED(column); - uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); RRDHOST *host = rrdhost_find_by_guid(uuid_str); @@ -415,156 +325,81 @@ static int aclk_start_sync_thread(void *data, int argc, char **argv, char **colu sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]); return 0; } -#endif -void sql_aclk_sync_init(void) -{ - char *err_msg = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { - return; - } - error_report("Database has not been initialized"); - return; - } - - info("Creating archived hosts"); - rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " - "program_version, entries, health_enabled FROM host WHERE hops >0;", - create_host_callback, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); - sqlite3_free(err_msg); - } - -#ifdef ENABLE_ACLK - fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); - rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " - "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); - sqlite3_free(err_msg); - } - info("ACLK sync initialization completed"); -#endif -} static void async_cb(uv_async_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); - debug(D_ACLK_SYNC, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } #define TIMER_PERIOD_MS (1000) -static void timer_cb(uv_timer_t* handle) +static void timer_cb(uv_timer_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); -#ifdef ENABLE_ACLK - struct aclk_database_worker_config *wc = handle->data; + struct aclk_sync_config_s *config = handle->data; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_TIMER; - aclk_database_enq_cmd_noblock(wc, &cmd); time_t now = now_realtime_sec(); - if (wc->cleanup_after && wc->cleanup_after < now) { + if (config->cleanup_after && config->cleanup_after < now) { cmd.opcode = ACLK_DATABASE_CLEANUP; - if (!aclk_database_enq_cmd_noblock(wc, &cmd)) - wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; + if (!aclk_database_enq_cmd_noblock(&cmd)) + config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } if (aclk_connected) { - if (wc->alert_updates && !wc->pause_alert_updates) { - cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - cmd.count = ACLK_MAX_ALERT_UPDATES; - aclk_database_enq_cmd_noblock(wc, &cmd); - } + cmd.opcode = ACLK_DATABASE_PUSH_ALERT; + aclk_database_enq_cmd_noblock(&cmd); + + aclk_check_node_info_and_collectors(); } -#endif } -static void aclk_database_worker(void *arg) +static void aclk_synchronization(void *arg __maybe_unused) { - service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + struct aclk_sync_config_s *config = arg; + uv_thread_set_name_np(config->thread, "ACLKSYNC"); worker_register("ACLKSYNC"); + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); - worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); - worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info"); - worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors"); + worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); - struct aclk_database_worker_config *wc = arg; - uv_loop_t *loop; - int ret; - enum aclk_database_opcode opcode; - uv_timer_t timer_req; - struct aclk_database_cmd cmd; + uv_loop_t *loop = &config->loop; + fatal_assert(0 == uv_loop_init(loop)); + fatal_assert(0 == uv_async_init(loop, &config->async, async_cb)); - char threadname[NETDATA_THREAD_NAME_MAX+1]; - if (wc->host) - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); - else { - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str); - threadname[11] = '\0'; - } - uv_thread_set_name_np(wc->thread, threadname); - - loop = wc->loop = mallocz(sizeof(uv_loop_t)); - ret = uv_loop_init(loop); - if (ret) { - error("uv_loop_init(): %s", uv_strerror(ret)); - goto error_after_loop_init; - } - loop->data = wc; + fatal_assert(0 == uv_timer_init(loop, &config->timer_req)); + config->timer_req.data = config; + fatal_assert(0 == uv_timer_start(&config->timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - ret = uv_async_init(wc->loop, &wc->async, async_cb); - if (ret) { - error("uv_async_init(): %s", uv_strerror(ret)); - goto error_after_async_init; - } - wc->async.data = wc; - - ret = uv_timer_init(loop, &timer_req); - if (ret) { - error("uv_timer_init(): %s", uv_strerror(ret)); - goto error_after_timer_init; - } - timer_req.data = wc; - fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - - wc->node_info_send = 1; - info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, (unsigned long int) sizeof(*wc)); - - memset(&cmd, 0, sizeof(cmd)); + info("Starting ACLK synchronization thread"); - wc->startup_time = now_realtime_sec(); - wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; + config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST; + config->initialized = true; - debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); - - while (likely(!netdata_exit)) { + while (likely(service_running(SERVICE_ACLKSYNC))) { + enum aclk_database_opcode opcode; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ do { - cmd = aclk_database_deq_cmd(wc); + struct aclk_database_cmd cmd = aclk_database_deq_cmd(); - if (netdata_exit) + if (unlikely(!service_running(SERVICE_ACLKSYNC))) break; opcode = cmd.opcode; @@ -576,201 +411,216 @@ static void aclk_database_worker(void *arg) case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ break; - // MAINTENANCE case ACLK_DATABASE_CLEANUP: - debug(D_ACLK_SYNC, "Database cleanup for %s", wc->host_guid); - - if (wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST + 2 < now_realtime_sec() && claimed() && aclk_connected) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - cmd.completion = NULL; - (void) aclk_database_enq_cmd_noblock(wc, &cmd); - } - - sql_maint_aclk_sync_database(wc, cmd); - if (wc->host == localhost) - sql_check_aclk_table_list(wc); + // Scan all aclk_alert_ tables and cleanup as needed + sql_maint_aclk_sync_database_all(); + sql_check_aclk_table_list(); break; case ACLK_DATABASE_DELETE_HOST: - debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data); - sql_delete_aclk_table_list(wc, cmd); + sql_delete_aclk_table_list(cmd.param[0]); + break; +// NODE STATE + case ACLK_DATABASE_NODE_STATE:; + RRDHOST *host = cmd.param[0]; + int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; + struct aclk_sync_host_config *ahc = host->aclk_sync_host_config; + if (unlikely(!ahc)) + sql_create_aclk_table(host, &host->host_uuid, host->node_id); + aclk_host_state_update(host, live); break; - // ALERTS case ACLK_DATABASE_PUSH_ALERT_CONFIG: - debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid); - aclk_push_alert_config_event(wc, cmd); + aclk_push_alert_config_event(cmd.param[0], cmd.param[1]); break; case ACLK_DATABASE_PUSH_ALERT: - debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid); - aclk_push_alert_event(wc, cmd); - break; - case ACLK_DATABASE_ALARM_HEALTH_LOG: - debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid); - aclk_push_alarm_health_log(wc, cmd); + aclk_push_alert_events_for_all_hosts(); break; - case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT: - debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid); - aclk_push_alert_snapshot_event(wc, cmd); + case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; + aclk_push_alert_snapshot_event(cmd.param[0]); break; case ACLK_DATABASE_QUEUE_REMOVED_ALERTS: - debug(D_ACLK_SYNC, "Queueing removed alerts for node %s", wc->host_guid); - sql_process_queue_removed_alerts_to_aclk(wc, cmd); - break; - -// NODE OPERATIONS - case ACLK_DATABASE_NODE_INFO: - debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str); - sql_build_node_info(wc, cmd); - break; - case ACLK_DATABASE_NODE_COLLECTORS: - debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str); - sql_build_node_collectors(wc); - break; -#ifdef ENABLE_ACLK - -// NODE_INSTANCE DETECTION - case ACLK_DATABASE_ORPHAN_HOST: - wc->host = NULL; - wc->is_orphan = 1; - aclk_add_worker_thread(wc); - break; -#endif - case ACLK_DATABASE_TIMER: - if (unlikely(localhost && !wc->host && !wc->is_orphan)) { - if (claimed()) { - wc->host = rrdhost_find_by_guid(wc->host_guid); - if (wc->host) { - info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid); - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); - uv_thread_set_name_np(wc->thread, threadname); - wc->host->dbsync_worker = wc; - if (unlikely(!wc->hostname)) - wc->hostname = strdupz(rrdhost_hostname(wc->host)); - aclk_del_worker_thread(wc); - wc->node_info_send = 1; - } - } - } - if (wc->node_info_send && localhost && claimed() && aclk_connected) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - cmd.completion = NULL; - wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); - } - if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { - cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS; - cmd.completion = NULL; - wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd); - } - if (localhost == wc->host) - (void) sqlite3_wal_checkpoint(db_meta, NULL); + sql_process_queue_removed_alerts_to_aclk(cmd.param[0]); break; default: debug(D_ACLK_SYNC, "%s: default.", __func__); break; } if (cmd.completion) - aclk_complete(cmd.completion); + completion_mark_complete(cmd.completion); } while (opcode != ACLK_DATABASE_NOOP); } - if (!uv_timer_stop(&timer_req)) - uv_close((uv_handle_t *)&timer_req, NULL); - - /* cleanup operations of the event loop */ - //info("Shutting down ACLK sync event loop for %s", wc->host_guid); + if (!uv_timer_stop(&config->timer_req)) + uv_close((uv_handle_t *)&config->timer_req, NULL); - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&wc->async, NULL); - uv_run(loop, UV_RUN_DEFAULT); + uv_close((uv_handle_t *)&config->async, NULL); +// uv_close((uv_handle_t *)&config->async_exit, NULL); + uv_cond_destroy(&config->cmd_cond); + (void) uv_loop_close(loop); - info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid); - /* TODO: don't let the API block by waiting to enqueue commands */ - uv_cond_destroy(&wc->cmd_cond); - - int rc; - do { - rc = uv_loop_close(loop); - } while (rc != UV_EBUSY); - - freez(loop); + worker_unregister(); + service_exits(); + info("ACLK SYNC: Shutting down ACLK synchronization event loop"); +} - rrd_rdlock(); - if (likely(wc->host)) - wc->host->dbsync_worker = NULL; - freez(wc->hostname); - freez(wc); - rrd_unlock(); +static void aclk_synchronization_init(void) +{ + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; + aclk_sync_config.queue_size = 0; + fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond)); + fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex)); - worker_unregister(); - return; - -error_after_timer_init: - uv_close((uv_handle_t *)&wc->async, NULL); -error_after_async_init: - fatal_assert(0 == uv_loop_close(loop)); -error_after_loop_init: - freez(loop); - worker_unregister(); + fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } +#endif // ------------------------------------------------------------- -void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) +void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK char uuid_str[GUID_LEN + 1]; char host_guid[GUID_LEN + 1]; + int rc; uuid_unparse_lower_fix(host_uuid, uuid_str); - - if (aclk_worker_thread_exists(uuid_str)) - return; - uuid_unparse_lower(*host_uuid, host_guid); - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + char sql[ACLK_SYNC_QUERY_SIZE]; - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, INDEX_ACLK_ALERT, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, TABLE_ACLK_ALERT, uuid_str); + rc = db_execute(db_meta, sql); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid); + else { + snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT, uuid_str, uuid_str); + rc = db_execute(db_meta, sql); + if (unlikely(rc)) + error_report("Failed to create ACLK alert table index for host %s", host ? string2str(host->hostname) : host_guid); + } + if (likely(host) && unlikely(host->aclk_sync_host_config)) + return; - if (likely(host) && unlikely(host->dbsync_worker)) + if (unlikely(!host)) return; - struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config)); + struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config)); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); - if (likely(host)) { - host->dbsync_worker = (void *)wc; - wc->hostname = strdupz(rrdhost_hostname(host)); - if (node_id && !host->node_id) { - host->node_id = mallocz(sizeof(*host->node_id)); - uuid_copy(*host->node_id, *node_id); - } + + host->aclk_sync_host_config = (void *)wc; + if (node_id && !host->node_id) { + host->node_id = mallocz(sizeof(*host->node_id)); + uuid_copy(*host->node_id, *node_id); } - else - wc->hostname = get_hostname_by_node_id(wc->node_id); + wc->host = host; strcpy(wc->uuid_str, uuid_str); - strcpy(wc->host_guid, host_guid); wc->alert_updates = 0; - aclk_database_init_cmd_queue(wc); - aclk_add_worker_thread(wc); - fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc)); -#else - UNUSED(host); - UNUSED(host_uuid); - UNUSED(node_id); + time_t now = now_realtime_sec(); + wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; #endif -}
\ No newline at end of file +} + +#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \ + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ + "program_version, entries, health_enabled FROM host WHERE hops >0;" + +#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ + "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " +void sql_aclk_sync_init(void) +{ + char *err_msg = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { + return; + } + error_report("Database has not been initialized"); + return; + } + + info("Creating archived hosts"); + int number_of_children = 0; + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_HOSTS, create_host_callback, &number_of_children, &err_msg); + + if (rc != SQLITE_OK) { + error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + + info("Created %d archived hosts", number_of_children); + // Trigger host context load for hosts that have been created + metadata_queue_load_host_context(NULL); + +#ifdef ENABLE_ACLK + if (!number_of_children) + aclk_queue_node_info(localhost, true); + + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg); + + if (rc != SQLITE_OK) { + error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + aclk_synchronization_init(); + + info("ACLK sync initialization completed"); +#endif +} + +// Public + +static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1) +{ + struct aclk_database_cmd cmd; + cmd.opcode = opcode; + cmd.param[0] = (void *) param0; + cmd.param[1] = (void *) param1; + cmd.completion = NULL; + aclk_database_enq_cmd(&cmd); +} + +// Public +void aclk_push_alert_config(const char *node_id, const char *config_hash) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash)); +} + +void aclk_push_node_alert_snapshot(const char *node_id) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL); +} + + +void aclk_push_node_removed_alerts(const char *node_id) +{ + if (unlikely(!aclk_sync_config.initialized)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL); +} + +void schedule_node_info_update(RRDHOST *host __maybe_unused) +{ +#ifdef ENABLE_ACLK + if (unlikely(!host)) + return; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_NODE_STATE; + cmd.param[0] = host; + cmd.completion = NULL; + aclk_database_enq_cmd(&cmd); +#endif +} |