summaryrefslogtreecommitdiffstats
path: root/src/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/sqlite/sqlite_aclk.c')
-rw-r--r--src/database/sqlite/sqlite_aclk.c222
1 files changed, 186 insertions, 36 deletions
diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c
index adbe4d9d3..b3f926e2f 100644
--- a/src/database/sqlite/sqlite_aclk.c
+++ b/src/database/sqlite/sqlite_aclk.c
@@ -3,7 +3,14 @@
#include "sqlite_functions.h"
#include "sqlite_aclk.h"
+void sanity_check(void) {
+ // make sure the compiler will stop on misconfigurations
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
+}
+
#include "sqlite_aclk_node.h"
+#include "../aclk_query_queue.h"
+#include "../aclk_query.h"
struct aclk_sync_config_s {
uv_thread_t thread;
@@ -11,16 +18,12 @@ struct aclk_sync_config_s {
uv_timer_t timer_req;
uv_async_t async;
bool initialized;
+ mqtt_wss_client client;
+ int aclk_queries_running;
SPINLOCK cmd_queue_lock;
struct aclk_database_cmd *cmd_base;
} aclk_sync_config = { 0 };
-void sanity_check(void) {
- // make sure the compiler will stop on misconfigurations
- BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
-}
-
-#ifdef ENABLE_ACLK
static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
struct aclk_database_cmd ret = { 0 };
@@ -39,7 +42,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void)
return ret;
}
-#endif
static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
{
@@ -165,14 +167,14 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
#ifdef NETDATA_INTERNAL_CHECKS
char node_str[UUID_STR_LEN] = "<none>";
- if (likely(host->node_id))
- uuid_unparse_lower(*host->node_id, node_str);
- internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d", rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral);
+ if (likely(!UUIDiszero(host->node_id)))
+ uuid_unparse_lower(host->node_id.uuid, node_str);
+ internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d",
+ rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral);
#endif
return 0;
}
-#ifdef ENABLE_ACLK
#define SQL_SELECT_ACLK_ALERT_TABLES \
"SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')"
@@ -204,8 +206,6 @@ fail:
static void invalidate_host_last_connected(nd_uuid_t *host_uuid)
{
sqlite3_stmt *res = NULL;
- if (!host_uuid)
- return;
if (!PREPARE_STATEMENT(db_meta, SQL_INVALIDATE_HOST_LAST_CONNECTED, &res))
return;
@@ -291,13 +291,92 @@ static void timer_cb(uv_timer_t *handle)
uv_update_time(handle->loop);
struct aclk_database_cmd cmd = { 0 };
- if (aclk_connected) {
+ if (aclk_online_for_alerts()) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
aclk_database_enq_cmd(&cmd);
aclk_check_node_info_and_collectors();
}
}
+struct aclk_query_payload {
+ uv_work_t request;
+ void *data;
+ struct aclk_sync_config_s *config;
+};
+
+static void after_aclk_run_query_job(uv_work_t *req, int status __maybe_unused)
+{
+ worker_is_busy(ACLK_QUERY_EXECUTE);
+ struct aclk_query_payload *payload = req->data;
+ struct aclk_sync_config_s *config = payload->config;
+ config->aclk_queries_running--;
+ freez(payload);
+}
+
+static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query)
+{
+ if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
+ error_report("Unknown query in query queue. %u", query->type);
+ return;
+ }
+
+ if (query->type == HTTP_API_V2) {
+ http_api_v2(config->client, query);
+ } else {
+ send_bin_msg(config->client, query);
+ }
+ aclk_query_free(query);
+}
+
+static void aclk_run_query_job(uv_work_t *req)
+{
+ struct aclk_query_payload *payload = req->data;
+ struct aclk_sync_config_s *config = payload->config;
+ aclk_query_t query = (aclk_query_t) payload->data;
+
+ aclk_run_query(config, query);
+}
+
+static int read_query_thread_count()
+{
+ int threads = MIN(get_netdata_cpus()/2, 6);
+ threads = MAX(threads, 2);
+ threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ if(threads < 1) {
+ netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
+ threads = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ }
+ else {
+ if (threads > libuv_worker_threads / 2) {
+ threads = MAX(libuv_worker_threads / 2, 2);
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ }
+ }
+ return threads;
+}
+
+static void node_update_timer_cb(uv_timer_t *handle)
+{
+ struct aclk_sync_cfg_t *ahc = handle->data;
+ RRDHOST *host = ahc->host;
+
+ spinlock_lock(&host->receiver_lock);
+ int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
+ spinlock_unlock(&host->receiver_lock);
+ nd_log(NDLS_ACLK, NDLP_DEBUG,"Timer: Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
+ aclk_host_state_update(host, live, 1);
+}
+
+static void close_callback(uv_handle_t *handle, void *data __maybe_unused)
+{
+ if (handle->type == UV_TIMER) {
+ uv_timer_stop((uv_timer_t *)handle);
+ }
+
+ uv_close(handle, NULL); // Automatically close and free the handle
+}
+
static void aclk_synchronization(void *arg)
{
struct aclk_sync_config_s *config = arg;
@@ -309,6 +388,8 @@ static void aclk_synchronization(void *arg)
worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
+ worker_register_job_name(ACLK_QUERY_EXECUTE, "query execute");
+ worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "query execute sync");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
uv_loop_t *loop = &config->loop;
@@ -325,7 +406,10 @@ static void aclk_synchronization(void *arg)
sql_delete_aclk_table_list();
- while (likely(service_running(SERVICE_ACLKSYNC))) {
+ int query_thread_count = read_query_thread_count();
+ netdata_log_info("Starting ACLK synchronization thread with %d parallel query threads", query_thread_count);
+
+ while (likely(service_running(SERVICE_ACLK))) {
enum aclk_database_opcode opcode;
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@@ -334,27 +418,54 @@ static void aclk_synchronization(void *arg)
do {
struct aclk_database_cmd cmd = aclk_database_deq_cmd();
- if (unlikely(!service_running(SERVICE_ACLKSYNC)))
+ if (unlikely(!service_running(SERVICE_ACLK)))
break;
opcode = cmd.opcode;
- if(likely(opcode != ACLK_DATABASE_NOOP))
+ if(likely(opcode != ACLK_DATABASE_NOOP && opcode != ACLK_QUERY_EXECUTE))
worker_is_busy(opcode);
switch (opcode) {
- default:
case ACLK_DATABASE_NOOP:
/* the command queue was empty, do nothing */
break;
// NODE STATE
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
- int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
struct aclk_sync_cfg_t *ahc = host->aclk_config;
- if (unlikely(!ahc))
- create_aclk_config(host, &host->host_uuid, host->node_id);
+ if (unlikely(!ahc)) {
+ create_aclk_config(host, &host->host_id.uuid, &host->node_id.uuid);
+ ahc = host->aclk_config;
+ }
+
+ if (ahc) {
+ uint64_t schedule_time = (uint64_t)(uintptr_t)cmd.param[1];
+ if (!ahc->timer_initialized) {
+ int rc = uv_timer_init(loop, &ahc->timer);
+ if (!rc) {
+ ahc->timer_initialized = true;
+ ahc->timer.data = ahc;
+ }
+ }
+
+ if (ahc->timer_initialized) {
+ if (uv_is_active((uv_handle_t *)&ahc->timer))
+ uv_timer_stop(&ahc->timer);
+
+ ahc->timer.data = ahc;
+ int rc = uv_timer_start(&ahc->timer, node_update_timer_cb, schedule_time, 0);
+ if (!rc)
+ break; // Timer started, exit
+ }
+ }
+
+ // This is fallback if timer fails
+ spinlock_lock(&host->receiver_lock);
+ int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
+ spinlock_unlock(&host->receiver_lock);
aclk_host_state_update(host, live, 1);
+ nd_log(NDLS_ACLK, NDLP_DEBUG,"Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
break;
case ACLK_DATABASE_NODE_UNREGISTER:
sql_unregister_node(cmd.param[0]);
@@ -366,14 +477,49 @@ static void aclk_synchronization(void *arg)
case ACLK_DATABASE_PUSH_ALERT:
aclk_push_alert_events_for_all_hosts();
break;
+
+ case ACLK_MQTT_WSS_CLIENT:
+ config->client = (mqtt_wss_client) cmd.param[0];
+ break;
+
+ case ACLK_QUERY_EXECUTE:;
+ aclk_query_t query = (aclk_query_t)cmd.param[0];
+
+ struct aclk_query_payload *payload = NULL;
+ config->aclk_queries_running++;
+ bool execute_now = (config->aclk_queries_running > query_thread_count);
+ if (!execute_now) {
+ payload = mallocz(sizeof(*payload));
+ payload->request.data = payload;
+ payload->config = config;
+ payload->data = query;
+ execute_now = uv_queue_work(loop, &payload->request, aclk_run_query_job, after_aclk_run_query_job);
+ }
+
+ if (execute_now) {
+ worker_is_busy(ACLK_QUERY_EXECUTE_SYNC);
+ aclk_run_query(config, query);
+ freez(payload);
+ config->aclk_queries_running--;
+ }
+ break;
+
+ default:
+ break;
}
} while (opcode != ACLK_DATABASE_NOOP);
}
+ config->initialized = false;
if (!uv_timer_stop(&config->timer_req))
uv_close((uv_handle_t *)&config->timer_req, NULL);
uv_close((uv_handle_t *)&config->async, NULL);
+ uv_run(loop, UV_RUN_DEFAULT);
+
+ uv_walk(loop, (uv_walk_cb) close_callback, NULL);
+ uv_run(loop, UV_RUN_DEFAULT);
+
(void) uv_loop_close(loop);
worker_unregister();
@@ -386,13 +532,11 @@ static void aclk_synchronization_init(void)
memset(&aclk_sync_config, 0, sizeof(aclk_sync_config));
fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config));
}
-#endif
// -------------------------------------------------------------
void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused)
{
-#ifdef ENABLE_ACLK
if (!host || host->aclk_config)
return;
@@ -402,16 +546,14 @@ void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __may
uuid_unparse_lower(*node_id, wc->node_id);
host->aclk_config = wc;
- if (node_id && !host->node_id) {
- host->node_id = mallocz(sizeof(*host->node_id));
- uuid_copy(*host->node_id, *node_id);
+ if (node_id && UUIDiszero(host->node_id)) {
+ uuid_copy(host->node_id.uuid, *node_id);
}
wc->host = host;
wc->stream_alerts = false;
time_t now = now_realtime_sec();
wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now;
-#endif
}
#define SQL_FETCH_ALL_HOSTS \
@@ -447,7 +589,6 @@ void sql_aclk_sync_init(void)
// Trigger host context load for hosts that have been created
metadata_queue_load_host_context(NULL);
-#ifdef ENABLE_ACLK
if (!number_of_children)
aclk_queue_node_info(localhost, true);
@@ -460,7 +601,6 @@ void sql_aclk_sync_init(void)
aclk_synchronization_init();
netdata_log_info("ACLK sync initialization completed");
-#endif
}
static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1)
@@ -481,20 +621,30 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash)
queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash));
}
-void schedule_node_info_update(RRDHOST *host __maybe_unused)
+void aclk_execute_query(aclk_query_t query)
{
-#ifdef ENABLE_ACLK
- if (unlikely(!host))
+ if (unlikely(!aclk_sync_config.initialized))
return;
- queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL);
-#endif
+
+ queue_aclk_sync_cmd(ACLK_QUERY_EXECUTE, query, NULL);
+}
+
+void aclk_query_init(mqtt_wss_client client) {
+
+ queue_aclk_sync_cmd(ACLK_MQTT_WSS_CLIENT, client, NULL);
+}
+
+void schedule_node_state_update(RRDHOST *host, uint64_t delay)
+{
+ if (unlikely(!aclk_sync_config.initialized || !host))
+ return;
+
+ queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, (void *)(uintptr_t)delay);
}
-#ifdef ENABLE_ACLK
void unregister_node(const char *machine_guid)
{
if (unlikely(!machine_guid))
return;
queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL);
}
-#endif