diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 336 |
1 files changed, 154 insertions, 182 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 7e3a9b2eb..3b0c40522 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -10,10 +10,140 @@ void sanity_check(void) { BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } -const char *aclk_sync_config[] = { +static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) +{ + struct aclk_database_worker_config *wc = data; + UNUSED(argc); + UNUSED(column); - NULL, -}; + debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DELETE_HOST; + cmd.data = strdupz((char *) argv[0]); + aclk_database_enq_cmd_noblock(wc, &cmd); + return 0; +} + +#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" + +static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) +{ + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); + } +} + +static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + UNUSED(cmd); + + debug(D_ACLK, "Checking database for %s", wc->host_guid); + + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + + buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " + "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); + db_execute(buffer_tostring(sql)); + + buffer_free(sql); +} + + +#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" + +static int is_host_available(uuid_t *host_id) +{ + sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) + error_report("Database has not been initialized"); + return 1; + } + + rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to select node instance information for a node"); + return 1; + } + + rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to select node instance information"); + goto failed; + } + rc = sqlite3_step_monitored(res); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when checking host existence"); + + return (rc == SQLITE_ROW); +} + +// OPCODE: ACLK_DATABASE_DELETE_HOST +void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + UNUSED(wc); + char uuid_str[GUID_LEN + 1]; + char host_str[GUID_LEN + 1]; + + int rc; + uuid_t host_uuid; + char *host_guid = (char *)cmd.data; + + if (unlikely(!host_guid)) + return; + + rc = uuid_parse(host_guid, host_uuid); + freez(host_guid); + if (rc) + return; + + uuid_unparse_lower(host_uuid, host_str); + uuid_unparse_lower_fix(&host_uuid, uuid_str); + + debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str); + + if (is_host_available(&host_uuid)) { + debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str); + return; + } + + debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str); + + sqlite3_stmt *res = NULL; + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + + buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str); + + rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to clean up aclk tables"); + goto fail; + } + buffer_flush(sql); + + while (sqlite3_step_monitored(res) == SQLITE_ROW) + buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); + + db_execute(buffer_tostring(sql)); + +fail: + buffer_free(sql); +} uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; @@ -38,7 +168,6 @@ void aclk_add_worker_thread(struct aclk_database_worker_config *wc) aclk_thread_head = wc; } uv_mutex_unlock(&aclk_async_lock); - return; } void aclk_del_worker_thread(struct aclk_database_worker_config *wc) @@ -53,7 +182,6 @@ void aclk_del_worker_thread(struct aclk_database_worker_config *wc) if (*tmp) *tmp = wc->next; uv_mutex_unlock(&aclk_async_lock); - return; } int aclk_worker_thread_exists(char *guid) @@ -199,7 +327,6 @@ void aclk_sync_exit_all() uv_mutex_unlock(&aclk_async_lock); } -#ifdef ENABLE_ACLK enum { IDX_HOST_ID, IDX_HOSTNAME, @@ -228,6 +355,8 @@ static int create_host_callback(void *data, int argc, char **argv, char **column uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); + system_info->hops = str2i((const char *) argv[IDX_HOPS]); sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); @@ -268,9 +397,9 @@ static int create_host_callback(void *data, int argc, char **argv, char **column #endif return 0; } -#endif -int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) +#ifdef ENABLE_ACLK +static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) { char uuid_str[GUID_LEN + 1]; UNUSED(data); @@ -286,10 +415,9 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]); return 0; } - +#endif void sql_aclk_sync_init(void) { -#ifdef ENABLE_ACLK char *err_msg = NULL; int rc; @@ -301,21 +429,7 @@ void sql_aclk_sync_init(void) return; } - info("SQLite aclk sync initialization"); - - for (int i = 0; aclk_sync_config[i]; i++) { - debug(D_ACLK_SYNC, "Executing %s", aclk_sync_config[i]); - rc = sqlite3_exec_monitored(db_meta, aclk_sync_config[i], 0, 0, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error aclk sync initialization setup, rc = %d (%s)", rc, err_msg); - error_report("SQLite failed statement %s", aclk_sync_config[i]); - sqlite3_free(err_msg); - return; - } - } - info("SQLite aclk sync initialization completed"); - fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); - + info("Creating archived hosts"); rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " "program_version, entries, health_enabled FROM host WHERE hops >0;", @@ -325,14 +439,16 @@ void sql_aclk_sync_init(void) sqlite3_free(err_msg); } +#ifdef ENABLE_ACLK + fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); sqlite3_free(err_msg); } + info("ACLK sync initialization completed"); #endif - return; } static void async_cb(uv_async_t *handle) @@ -374,10 +490,9 @@ static void timer_cb(uv_timer_t* handle) #endif } -#define MAX_CMD_BATCH_SIZE (256) - -void aclk_database_worker(void *arg) +static void aclk_database_worker(void *arg) { + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register("ACLKSYNC"); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); @@ -398,15 +513,12 @@ void aclk_database_worker(void *arg) enum aclk_database_opcode opcode; uv_timer_t timer_req; struct aclk_database_cmd cmd; - unsigned cmd_batch_size; - - //aclk_database_init_cmd_queue(wc); char threadname[NETDATA_THREAD_NAME_MAX+1]; if (wc->host) - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); else { - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->uuid_str); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str); threadname[11] = '\0'; } uv_thread_set_name_np(wc->thread, threadname); @@ -449,17 +561,13 @@ void aclk_database_worker(void *arg) uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ - cmd_batch_size = 0; do { - if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) - break; cmd = aclk_database_deq_cmd(wc); if (netdata_exit) break; opcode = cmd.opcode; - ++cmd_batch_size; if(likely(opcode != ACLK_DATABASE_NOOP)) worker_is_busy(opcode); @@ -535,7 +643,7 @@ void aclk_database_worker(void *arg) wc->host = rrdhost_find_by_guid(wc->host_guid); if (wc->host) { info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid); - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); uv_thread_set_name_np(wc->thread, threadname); wc->host->dbsync_worker = wc; if (unlikely(!wc->hostname)) @@ -584,10 +692,8 @@ void aclk_database_worker(void *arg) info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); -/* uv_mutex_destroy(&wc->cmd_mutex); */ - //fatal_assert(0 == uv_loop_close(loop)); - int rc; + int rc; do { rc = uv_loop_close(loop); } while (rc != UV_EBUSY); @@ -628,7 +734,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) uuid_unparse_lower(*host_uuid, host_guid); - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); db_execute(buffer_tostring(sql)); @@ -648,6 +754,10 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) if (likely(host)) { host->dbsync_worker = (void *)wc; wc->hostname = strdupz(rrdhost_hostname(host)); + if (node_id && !host->node_id) { + host->node_id = mallocz(sizeof(*host->node_id)); + uuid_copy(*host->node_id, *node_id); + } } else wc->hostname = get_hostname_by_node_id(wc->node_id); @@ -663,142 +773,4 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) UNUSED(host_uuid); UNUSED(node_id); #endif - return; -} - -void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); - - debug(D_ACLK, "Checking database for %s", wc->host_guid); - - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); - return; -} - -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" - -static int is_host_available(uuid_t *host_id) -{ - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return 1; - } - - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to select node instance information for a node"); - return 1; - } - - rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); - goto failed; - } - rc = sqlite3_step_monitored(res); - - failed: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when checking host existence"); - - return (rc == SQLITE_ROW); -} - -// OPCODE: ACLK_DATABASE_DELETE_HOST -void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(wc); - char uuid_str[GUID_LEN + 1]; - char host_str[GUID_LEN + 1]; - - int rc; - uuid_t host_uuid; - char *host_guid = (char *)cmd.data; - - if (unlikely(!host_guid)) - return; - - rc = uuid_parse(host_guid, host_uuid); - freez(host_guid); - if (rc) - return; - - uuid_unparse_lower(host_uuid, host_str); - uuid_unparse_lower_fix(&host_uuid, uuid_str); - - debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str); - - if (is_host_available(&host_uuid)) { - debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str); - return; - } - - debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str); - - sqlite3_stmt *res = NULL; - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - - buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str); - - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to clean up aclk tables"); - goto fail; - } - buffer_flush(sql); - - while (sqlite3_step_monitored(res) == SQLITE_ROW) - buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); - - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); - - db_execute(buffer_tostring(sql)); - -fail: - buffer_free(sql); - return; -} - -static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) -{ - struct aclk_database_worker_config *wc = data; - UNUSED(argc); - UNUSED(column); - - debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.data = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(wc, &cmd); - return 0; -} - -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" - -void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) -{ - char *err_msg = NULL; - debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } - return; -} +}
\ No newline at end of file |