diff options
Diffstat (limited to 'src/database/sqlite/sqlite_aclk.c')
-rw-r--r-- | src/database/sqlite/sqlite_aclk.c | 222 |
1 files changed, 186 insertions, 36 deletions
diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index adbe4d9d3..b3f926e2f 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -3,7 +3,14 @@ #include "sqlite_functions.h" #include "sqlite_aclk.h" +void sanity_check(void) { + // make sure the compiler will stop on misconfigurations + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); +} + #include "sqlite_aclk_node.h" +#include "../aclk_query_queue.h" +#include "../aclk_query.h" struct aclk_sync_config_s { uv_thread_t thread; @@ -11,16 +18,12 @@ struct aclk_sync_config_s { uv_timer_t timer_req; uv_async_t async; bool initialized; + mqtt_wss_client client; + int aclk_queries_running; SPINLOCK cmd_queue_lock; struct aclk_database_cmd *cmd_base; } aclk_sync_config = { 0 }; -void sanity_check(void) { - // make sure the compiler will stop on misconfigurations - BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); -} - -#ifdef ENABLE_ACLK static struct aclk_database_cmd aclk_database_deq_cmd(void) { struct aclk_database_cmd ret = { 0 }; @@ -39,7 +42,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void) return ret; } -#endif static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { @@ -165,14 +167,14 @@ static int create_host_callback(void *data, int argc, char **argv, char **column #ifdef NETDATA_INTERNAL_CHECKS char node_str[UUID_STR_LEN] = "<none>"; - if (likely(host->node_id)) - uuid_unparse_lower(*host->node_id, node_str); - internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d", rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral); + if (likely(!UUIDiszero(host->node_id))) + uuid_unparse_lower(host->node_id.uuid, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d", + rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral); #endif return 0; } -#ifdef ENABLE_ACLK #define SQL_SELECT_ACLK_ALERT_TABLES \ "SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')" @@ -204,8 +206,6 @@ fail: static void invalidate_host_last_connected(nd_uuid_t *host_uuid) { sqlite3_stmt *res = NULL; - if (!host_uuid) - return; if (!PREPARE_STATEMENT(db_meta, SQL_INVALIDATE_HOST_LAST_CONNECTED, &res)) return; @@ -291,13 +291,92 @@ static void timer_cb(uv_timer_t *handle) uv_update_time(handle->loop); struct aclk_database_cmd cmd = { 0 }; - if (aclk_connected) { + if (aclk_online_for_alerts()) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; aclk_database_enq_cmd(&cmd); aclk_check_node_info_and_collectors(); } } +struct aclk_query_payload { + uv_work_t request; + void *data; + struct aclk_sync_config_s *config; +}; + +static void after_aclk_run_query_job(uv_work_t *req, int status __maybe_unused) +{ + worker_is_busy(ACLK_QUERY_EXECUTE); + struct aclk_query_payload *payload = req->data; + struct aclk_sync_config_s *config = payload->config; + config->aclk_queries_running--; + freez(payload); +} + +static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query) +{ + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + return; + } + + if (query->type == HTTP_API_V2) { + http_api_v2(config->client, query); + } else { + send_bin_msg(config->client, query); + } + aclk_query_free(query); +} + +static void aclk_run_query_job(uv_work_t *req) +{ + struct aclk_query_payload *payload = req->data; + struct aclk_sync_config_s *config = payload->config; + aclk_query_t query = (aclk_query_t) payload->data; + + aclk_run_query(config, query); +} + +static int read_query_thread_count() +{ + int threads = MIN(get_netdata_cpus()/2, 6); + threads = MAX(threads, 2); + threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + if(threads < 1) { + netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + threads = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + } + else { + if (threads > libuv_worker_threads / 2) { + threads = MAX(libuv_worker_threads / 2, 2); + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + } + } + return threads; +} + +static void node_update_timer_cb(uv_timer_t *handle) +{ + struct aclk_sync_cfg_t *ahc = handle->data; + RRDHOST *host = ahc->host; + + spinlock_lock(&host->receiver_lock); + int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; + spinlock_unlock(&host->receiver_lock); + nd_log(NDLS_ACLK, NDLP_DEBUG,"Timer: Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live); + aclk_host_state_update(host, live, 1); +} + +static void close_callback(uv_handle_t *handle, void *data __maybe_unused) +{ + if (handle->type == UV_TIMER) { + uv_timer_stop((uv_timer_t *)handle); + } + + uv_close(handle, NULL); // Automatically close and free the handle +} + static void aclk_synchronization(void *arg) { struct aclk_sync_config_s *config = arg; @@ -309,6 +388,8 @@ static void aclk_synchronization(void *arg) worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_QUERY_EXECUTE, "query execute"); + worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "query execute sync"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); uv_loop_t *loop = &config->loop; @@ -325,7 +406,10 @@ static void aclk_synchronization(void *arg) sql_delete_aclk_table_list(); - while (likely(service_running(SERVICE_ACLKSYNC))) { + int query_thread_count = read_query_thread_count(); + netdata_log_info("Starting ACLK synchronization thread with %d parallel query threads", query_thread_count); + + while (likely(service_running(SERVICE_ACLK))) { enum aclk_database_opcode opcode; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); @@ -334,27 +418,54 @@ static void aclk_synchronization(void *arg) do { struct aclk_database_cmd cmd = aclk_database_deq_cmd(); - if (unlikely(!service_running(SERVICE_ACLKSYNC))) + if (unlikely(!service_running(SERVICE_ACLK))) break; opcode = cmd.opcode; - if(likely(opcode != ACLK_DATABASE_NOOP)) + if(likely(opcode != ACLK_DATABASE_NOOP && opcode != ACLK_QUERY_EXECUTE)) worker_is_busy(opcode); switch (opcode) { - default: case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ break; // NODE STATE case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; - int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; struct aclk_sync_cfg_t *ahc = host->aclk_config; - if (unlikely(!ahc)) - create_aclk_config(host, &host->host_uuid, host->node_id); + if (unlikely(!ahc)) { + create_aclk_config(host, &host->host_id.uuid, &host->node_id.uuid); + ahc = host->aclk_config; + } + + if (ahc) { + uint64_t schedule_time = (uint64_t)(uintptr_t)cmd.param[1]; + if (!ahc->timer_initialized) { + int rc = uv_timer_init(loop, &ahc->timer); + if (!rc) { + ahc->timer_initialized = true; + ahc->timer.data = ahc; + } + } + + if (ahc->timer_initialized) { + if (uv_is_active((uv_handle_t *)&ahc->timer)) + uv_timer_stop(&ahc->timer); + + ahc->timer.data = ahc; + int rc = uv_timer_start(&ahc->timer, node_update_timer_cb, schedule_time, 0); + if (!rc) + break; // Timer started, exit + } + } + + // This is fallback if timer fails + spinlock_lock(&host->receiver_lock); + int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; + spinlock_unlock(&host->receiver_lock); aclk_host_state_update(host, live, 1); + nd_log(NDLS_ACLK, NDLP_DEBUG,"Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live); break; case ACLK_DATABASE_NODE_UNREGISTER: sql_unregister_node(cmd.param[0]); @@ -366,14 +477,49 @@ static void aclk_synchronization(void *arg) case ACLK_DATABASE_PUSH_ALERT: aclk_push_alert_events_for_all_hosts(); break; + + case ACLK_MQTT_WSS_CLIENT: + config->client = (mqtt_wss_client) cmd.param[0]; + break; + + case ACLK_QUERY_EXECUTE:; + aclk_query_t query = (aclk_query_t)cmd.param[0]; + + struct aclk_query_payload *payload = NULL; + config->aclk_queries_running++; + bool execute_now = (config->aclk_queries_running > query_thread_count); + if (!execute_now) { + payload = mallocz(sizeof(*payload)); + payload->request.data = payload; + payload->config = config; + payload->data = query; + execute_now = uv_queue_work(loop, &payload->request, aclk_run_query_job, after_aclk_run_query_job); + } + + if (execute_now) { + worker_is_busy(ACLK_QUERY_EXECUTE_SYNC); + aclk_run_query(config, query); + freez(payload); + config->aclk_queries_running--; + } + break; + + default: + break; } } while (opcode != ACLK_DATABASE_NOOP); } + config->initialized = false; if (!uv_timer_stop(&config->timer_req)) uv_close((uv_handle_t *)&config->timer_req, NULL); uv_close((uv_handle_t *)&config->async, NULL); + uv_run(loop, UV_RUN_DEFAULT); + + uv_walk(loop, (uv_walk_cb) close_callback, NULL); + uv_run(loop, UV_RUN_DEFAULT); + (void) uv_loop_close(loop); worker_unregister(); @@ -386,13 +532,11 @@ static void aclk_synchronization_init(void) memset(&aclk_sync_config, 0, sizeof(aclk_sync_config)); fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } -#endif // ------------------------------------------------------------- void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused) { -#ifdef ENABLE_ACLK if (!host || host->aclk_config) return; @@ -402,16 +546,14 @@ void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __may uuid_unparse_lower(*node_id, wc->node_id); host->aclk_config = wc; - if (node_id && !host->node_id) { - host->node_id = mallocz(sizeof(*host->node_id)); - uuid_copy(*host->node_id, *node_id); + if (node_id && UUIDiszero(host->node_id)) { + uuid_copy(host->node_id.uuid, *node_id); } wc->host = host; wc->stream_alerts = false; time_t now = now_realtime_sec(); wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; -#endif } #define SQL_FETCH_ALL_HOSTS \ @@ -447,7 +589,6 @@ void sql_aclk_sync_init(void) // Trigger host context load for hosts that have been created metadata_queue_load_host_context(NULL); -#ifdef ENABLE_ACLK if (!number_of_children) aclk_queue_node_info(localhost, true); @@ -460,7 +601,6 @@ void sql_aclk_sync_init(void) aclk_synchronization_init(); netdata_log_info("ACLK sync initialization completed"); -#endif } static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1) @@ -481,20 +621,30 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash) queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash)); } -void schedule_node_info_update(RRDHOST *host __maybe_unused) +void aclk_execute_query(aclk_query_t query) { -#ifdef ENABLE_ACLK - if (unlikely(!host)) + if (unlikely(!aclk_sync_config.initialized)) return; - queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL); -#endif + + queue_aclk_sync_cmd(ACLK_QUERY_EXECUTE, query, NULL); +} + +void aclk_query_init(mqtt_wss_client client) { + + queue_aclk_sync_cmd(ACLK_MQTT_WSS_CLIENT, client, NULL); +} + +void schedule_node_state_update(RRDHOST *host, uint64_t delay) +{ + if (unlikely(!aclk_sync_config.initialized || !host)) + return; + + queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, (void *)(uintptr_t)delay); } -#ifdef ENABLE_ACLK void unregister_node(const char *machine_guid) { if (unlikely(!machine_guid)) return; queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL); } -#endif |