diff options
Diffstat (limited to 'src/database/sqlite/sqlite_aclk.c')
-rw-r--r-- | src/database/sqlite/sqlite_aclk.c | 228 |
1 files changed, 20 insertions, 208 deletions
diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 8dc2231b4..027ee8f93 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -9,7 +9,6 @@ struct aclk_sync_config_s { uv_thread_t thread; uv_loop_t loop; uv_timer_t timer_req; - time_t cleanup_after; // Start a cleanup after this timestamp uv_async_t async; bool initialized; SPINLOCK cmd_queue_lock; @@ -24,7 +23,7 @@ void sanity_check(void) { #ifdef ENABLE_ACLK static struct aclk_database_cmd aclk_database_deq_cmd(void) { - struct aclk_database_cmd ret; + struct aclk_database_cmd ret = { 0 }; spinlock_lock(&aclk_sync_config.cmd_queue_lock); if(aclk_sync_config.cmd_base) { @@ -35,7 +34,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void) } else { ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; } spinlock_unlock(&aclk_sync_config.cmd_queue_lock); @@ -176,70 +174,24 @@ static int create_host_callback(void *data, int argc, char **argv, char **column #ifdef ENABLE_ACLK -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id" -static int is_host_available(nd_uuid_t *host_id) -{ - sqlite3_stmt *res = NULL; - int rc = 0; - - if (!REQUIRE_DB(db_meta)) - return 1; - - if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_BY_UUID, &res)) - return 1; - - int param = 0; - SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC)); - - param = 0; - rc = sqlite3_step_monitored(res); - -done: - REPORT_BIND_FAIL(res, param); - SQLITE_FINALIZE(res); - return (rc == SQLITE_ROW); -} +#define SQL_SELECT_ACLK_ALERT_TABLES \ + "SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')" -// OPCODE: ACLK_DATABASE_DELETE_HOST -static void sql_delete_aclk_table_list(char *host_guid) +static void sql_delete_aclk_table_list(void) { - char uuid_str[UUID_STR_LEN]; - char host_str[UUID_STR_LEN]; - - int rc; - nd_uuid_t host_uuid; - - if (unlikely(!host_guid)) - return; - - rc = uuid_parse(host_guid, host_uuid); - freez(host_guid); - if (rc) - return; - - uuid_unparse_lower(host_uuid, host_str); - uuid_unparse_lower_fix(&host_uuid, uuid_str); - - if (is_host_available(&host_uuid)) - return; - sqlite3_stmt *res = NULL; - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); - buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str); + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, NULL); - if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res)) + if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ACLK_ALERT_TABLES, &res)) goto fail; - buffer_flush(sql); - while (sqlite3_step_monitored(res) == SQLITE_ROW) buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); SQLITE_FINALIZE(res); - rc = db_execute(db_meta, buffer_tostring(sql)); + int rc = db_execute(db_meta, buffer_tostring(sql)); if (unlikely(rc)) netdata_log_error("Failed to drop unused ACLK tables"); @@ -285,53 +237,6 @@ skip: freez(machine_guid); } - -static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) -{ - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.param[0] = strdupz((char *) argv[0]); - aclk_database_enq_cmd(&cmd); - return 0; -} - -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')" - -static void sql_check_aclk_table_list(void) -{ - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } -} - -#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d" - -static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) -{ - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); - if (unlikely(db_execute(db_meta, sql))) - error_report("Failed to clean stale ACLK alert entries"); - return 0; -} - -#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')" - -static void sql_maint_aclk_sync_database_all(void) -{ - char *err_msg = NULL; - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } -} - static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { char uuid_str[UUID_STR_LEN]; @@ -339,7 +244,7 @@ static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_un RRDHOST *host = rrdhost_find_by_guid(uuid_str); if (host != localhost) - sql_create_aclk_table(host, (nd_uuid_t *) argv[0], (nd_uuid_t *) argv[1]); + create_aclk_config(host, (nd_uuid_t *)argv[0], (nd_uuid_t *)argv[1]); return 0; } @@ -356,16 +261,7 @@ static void timer_cb(uv_timer_t *handle) uv_stop(handle->loop); uv_update_time(handle->loop); - struct aclk_sync_config_s *config = handle->data; - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - - if (config->cleanup_after < now_realtime_sec()) { - cmd.opcode = ACLK_DATABASE_CLEANUP; - aclk_database_enq_cmd(&cmd); - config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; - } - + struct aclk_database_cmd cmd = { 0 }; if (aclk_connected) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; aclk_database_enq_cmd(&cmd); @@ -373,7 +269,7 @@ static void timer_cb(uv_timer_t *handle) } } -static void aclk_synchronization(void *arg __maybe_unused) +static void aclk_synchronization(void *arg) { struct aclk_sync_config_s *config = arg; uv_thread_set_name_np("ACLKSYNC"); @@ -381,14 +277,9 @@ static void aclk_synchronization(void *arg __maybe_unused) service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); - worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); - worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint"); - worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); - worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); uv_loop_t *loop = &config->loop; @@ -401,9 +292,10 @@ static void aclk_synchronization(void *arg __maybe_unused) netdata_log_info("Starting ACLK synchronization thread"); - config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST; config->initialized = true; + sql_delete_aclk_table_list(); + while (likely(service_running(SERVICE_ACLKSYNC))) { enum aclk_database_opcode opcode; worker_is_idle(); @@ -422,26 +314,17 @@ static void aclk_synchronization(void *arg __maybe_unused) worker_is_busy(opcode); switch (opcode) { + default: case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ break; -// MAINTENANCE - case ACLK_DATABASE_CLEANUP: - // Scan all aclk_alert_ tables and cleanup as needed - sql_maint_aclk_sync_database_all(); - sql_check_aclk_table_list(); - break; - - case ACLK_DATABASE_DELETE_HOST: - sql_delete_aclk_table_list(cmd.param[0]); - break; // NODE STATE case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; struct aclk_sync_cfg_t *ahc = host->aclk_config; if (unlikely(!ahc)) - sql_create_aclk_table(host, &host->host_uuid, host->node_id); + create_aclk_config(host, &host->host_uuid, host->node_id); aclk_host_state_update(host, live, 1); break; case ACLK_DATABASE_NODE_UNREGISTER: @@ -455,17 +338,7 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_PUSH_ALERT: aclk_push_alert_events_for_all_hosts(); break; - case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; - aclk_push_alert_snapshot_event(cmd.param[0]); - break; - case ACLK_DATABASE_QUEUE_REMOVED_ALERTS: - sql_process_queue_removed_alerts_to_aclk(cmd.param[0]); - break; - default: - break; } - if (cmd.completion) - completion_mark_complete(cmd.completion); } while (opcode != ACLK_DATABASE_NOOP); } @@ -489,39 +362,11 @@ static void aclk_synchronization_init(void) // ------------------------------------------------------------- -void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused) +void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK - char uuid_str[UUID_STR_LEN]; - char host_guid[UUID_STR_LEN]; - int rc; - - uuid_unparse_lower_fix(host_uuid, uuid_str); - uuid_unparse_lower(*host_uuid, host_guid); - - char sql[ACLK_SYNC_QUERY_SIZE]; - - snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid); - else { - snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report( - "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid); - - snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str); - rc = db_execute(db_meta, sql); - if (unlikely(rc)) - error_report( - "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); - } - if (likely(host) && unlikely(host->aclk_config)) - return; - if (unlikely(!host)) + if (!host || host->aclk_config) return; struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t)); @@ -535,8 +380,7 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __ } wc->host = host; - strcpy(wc->uuid_str, uuid_str); - wc->alert_updates = 0; + wc->stream_alerts = false; time_t now = now_realtime_sec(); wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; #endif @@ -579,7 +423,7 @@ void sql_aclk_sync_init(void) if (!number_of_children) aclk_queue_node_info(localhost, true); - rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg); + rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES, aclk_config_parameters, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg); @@ -591,15 +435,12 @@ void sql_aclk_sync_init(void) #endif } -// Public - static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1) { struct aclk_database_cmd cmd; cmd.opcode = opcode; cmd.param[0] = (void *) param0; cmd.param[1] = (void *) param1; - cmd.completion = NULL; aclk_database_enq_cmd(&cmd); } @@ -612,35 +453,12 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash) queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash)); } -void aclk_push_node_alert_snapshot(const char *node_id) -{ - if (unlikely(!aclk_sync_config.initialized)) - return; - - queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL); -} - - -void aclk_push_node_removed_alerts(const char *node_id) -{ - if (unlikely(!aclk_sync_config.initialized)) - return; - - queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL); -} - void schedule_node_info_update(RRDHOST *host __maybe_unused) { #ifdef ENABLE_ACLK if (unlikely(!host)) return; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_NODE_STATE; - cmd.param[0] = host; - cmd.completion = NULL; - aclk_database_enq_cmd(&cmd); + queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL); #endif } @@ -649,12 +467,6 @@ void unregister_node(const char *machine_guid) { if (unlikely(!machine_guid)) return; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER; - cmd.param[0] = strdupz(machine_guid); - cmd.completion = NULL; - aclk_database_enq_cmd(&cmd); + queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL); } #endif |