summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
commit89f3604407aff8f4cb2ed958252c61e23c767e24 (patch)
tree7fbf408102cab051557d38193524d8c6e991d070 /database/sqlite/sqlite_aclk.c
parentAdding upstream version 1.34.1. (diff)
downloadnetdata-89f3604407aff8f4cb2ed958252c61e23c767e24.tar.xz
netdata-89f3604407aff8f4cb2ed958252c61e23c767e24.zip
Adding upstream version 1.35.0.upstream/1.35.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c118
1 files changed, 111 insertions, 7 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 989328097..950856d9a 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -10,6 +10,11 @@
#include "../../aclk/aclk.h"
#endif
+void sanity_check(void) {
+ // make sure the compiler will stop on misconfigurations
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
+}
+
const char *aclk_sync_config[] = {
"CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, "
"dim_id blob, chart_id blob, host_id blob, date_created);",
@@ -29,6 +34,28 @@ const char *aclk_sync_config[] = {
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
+int retention_running = 0;
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void stop_retention_run()
+{
+ uv_mutex_lock(&aclk_async_lock);
+ retention_running = 0;
+ uv_mutex_unlock(&aclk_async_lock);
+}
+
+static int request_retention_run()
+{
+ int rc = 0;
+ uv_mutex_lock(&aclk_async_lock);
+ if (unlikely(retention_running))
+ rc = 1;
+ else
+ retention_running = 1;
+ uv_mutex_unlock(&aclk_async_lock);
+ return rc;
+}
+#endif
int claimed()
{
@@ -313,9 +340,6 @@ static void timer_cb(uv_timer_t* handle)
if (aclk_use_new_cloud_arch && aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
- cmd.opcode = ACLK_DATABASE_NODE_INFO;
- aclk_database_enq_cmd_noblock(wc, &cmd);
-
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
@@ -339,7 +363,7 @@ static void timer_cb(uv_timer_t* handle)
}
}
- if (wc->alert_updates) {
+ if (wc->alert_updates && !wc->pause_alert_updates) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
cmd.count = ACLK_MAX_ALERT_UPDATES;
aclk_database_enq_cmd_noblock(wc, &cmd);
@@ -348,10 +372,65 @@ static void timer_cb(uv_timer_t* handle)
#endif
}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void after_send_retention(uv_work_t *req, int status)
+{
+ struct aclk_database_worker_config *wc = req->data;
+ (void)status;
+ stop_retention_run();
+ wc->retention_running = 0;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DIM_DELETION;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a dimension deletion message");
+
+ cmd.opcode = ACLK_DATABASE_NODE_INFO;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a node update info message");
+}
+
+
+static void send_retention(uv_work_t *req)
+{
+ struct aclk_database_worker_config *wc = req->data;
+
+ if (unlikely(wc->is_shutting_down))
+ return;
+
+ aclk_update_retention(wc);
+}
+#endif
+
#define MAX_CMD_BATCH_SIZE (256)
void aclk_database_worker(void *arg)
{
+ worker_register("ACLKSYNC");
+ worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add");
+ worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add");
+ worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push");
+ worker_register_job_name(ACLK_DATABASE_PUSH_CHART_CONFIG, "chart conf push");
+ worker_register_job_name(ACLK_DATABASE_RESET_CHART, "chart reset");
+ worker_register_job_name(ACLK_DATABASE_CHART_ACK, "chart ack");
+ worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check");
+ worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete");
+ worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
+#endif
+ worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
+ worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
+ worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
+ worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info");
+ worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
+ worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
+ worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
+ worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
+ worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
+
struct aclk_database_worker_config *wc = arg;
uv_loop_t *loop;
int ret;
@@ -401,6 +480,7 @@ void aclk_database_worker(void *arg)
memset(&cmd, 0, sizeof(cmd));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
@@ -412,7 +492,9 @@ void aclk_database_worker(void *arg)
wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
+
while (likely(!netdata_exit)) {
+ worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
/* wait for commands */
@@ -427,6 +509,10 @@ void aclk_database_worker(void *arg)
opcode = cmd.opcode;
++cmd_batch_size;
+
+ if(likely(opcode != ACLK_DATABASE_NOOP))
+ worker_is_busy(opcode);
+
switch (opcode) {
case ACLK_DATABASE_NOOP:
/* the command queue was empty, do nothing */
@@ -439,6 +525,7 @@ void aclk_database_worker(void *arg)
if (wc->host == localhost)
sql_check_aclk_table_list(wc);
break;
+
case ACLK_DATABASE_DELETE_HOST:
debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data);
sql_delete_aclk_table_list(wc, cmd);
@@ -504,9 +591,21 @@ void aclk_database_worker(void *arg)
aclk_process_dimension_deletion(wc, cmd);
break;
case ACLK_DATABASE_UPD_RETENTION:
+ if (unlikely(wc->retention_running))
+ break;
+
+ if (unlikely(request_retention_run())) {
+ wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY;
+ break;
+ }
+
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
- aclk_update_retention(wc, cmd);
- aclk_process_dimension_deletion(wc, cmd);
+ retention_work.data = wc;
+ wc->retention_running = 1;
+ if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) {
+ wc->retention_running = 0;
+ stop_retention_run();
+ }
break;
// NODE_INSTANCE DETECTION
@@ -535,6 +634,8 @@ void aclk_database_worker(void *arg)
cmd.completion = NULL;
wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
}
+ if (localhost == wc->host)
+ (void) sqlite3_wal_checkpoint(db_meta, NULL);
break;
default:
debug(D_ACLK_SYNC, "%s: default.", __func__);
@@ -577,6 +678,8 @@ void aclk_database_worker(void *arg)
wc->host->dbsync_worker = NULL;
freez(wc);
rrd_unlock();
+
+ worker_unregister();
return;
error_after_timer_init:
@@ -585,6 +688,7 @@ error_after_async_init:
fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
+ worker_unregister();
}
// -------------------------------------------------------------
@@ -628,7 +732,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
db_execute(buffer_tostring(sql));
buffer_flush(sql);
- buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str, uuid_str, uuid_str);
+ buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);