diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 118 |
1 files changed, 111 insertions, 7 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 989328097..950856d9a 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -10,6 +10,11 @@ #include "../../aclk/aclk.h" #endif +void sanity_check(void) { + // make sure the compiler will stop on misconfigurations + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); +} + const char *aclk_sync_config[] = { "CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, " "dim_id blob, chart_id blob, host_id blob, date_created);", @@ -29,6 +34,28 @@ const char *aclk_sync_config[] = { uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; +int retention_running = 0; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void stop_retention_run() +{ + uv_mutex_lock(&aclk_async_lock); + retention_running = 0; + uv_mutex_unlock(&aclk_async_lock); +} + +static int request_retention_run() +{ + int rc = 0; + uv_mutex_lock(&aclk_async_lock); + if (unlikely(retention_running)) + rc = 1; + else + retention_running = 1; + uv_mutex_unlock(&aclk_async_lock); + return rc; +} +#endif int claimed() { @@ -313,9 +340,6 @@ static void timer_cb(uv_timer_t* handle) if (aclk_use_new_cloud_arch && aclk_connected) { if (wc->rotation_after && wc->rotation_after < now) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - aclk_database_enq_cmd_noblock(wc, &cmd); - cmd.opcode = ACLK_DATABASE_UPD_RETENTION; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; @@ -339,7 +363,7 @@ static void timer_cb(uv_timer_t* handle) } } - if (wc->alert_updates) { + if (wc->alert_updates && !wc->pause_alert_updates) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; cmd.count = ACLK_MAX_ALERT_UPDATES; aclk_database_enq_cmd_noblock(wc, &cmd); @@ -348,10 +372,65 @@ static void timer_cb(uv_timer_t* handle) #endif } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void after_send_retention(uv_work_t *req, int status) +{ + struct aclk_database_worker_config *wc = req->data; + (void)status; + stop_retention_run(); + wc->retention_running = 0; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DIM_DELETION; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a dimension deletion message"); + + cmd.opcode = ACLK_DATABASE_NODE_INFO; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a node update info message"); +} + + +static void send_retention(uv_work_t *req) +{ + struct aclk_database_worker_config *wc = req->data; + + if (unlikely(wc->is_shutting_down)) + return; + + aclk_update_retention(wc); +} +#endif + #define MAX_CMD_BATCH_SIZE (256) void aclk_database_worker(void *arg) { + worker_register("ACLKSYNC"); + worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add"); + worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add"); + worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push"); + worker_register_job_name(ACLK_DATABASE_PUSH_CHART_CONFIG, "chart conf push"); + worker_register_job_name(ACLK_DATABASE_RESET_CHART, "chart reset"); + worker_register_job_name(ACLK_DATABASE_CHART_ACK, "chart ack"); + worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check"); + worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete"); + worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); +#endif + worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); + worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); + worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); + worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); + worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); + worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); + struct aclk_database_worker_config *wc = arg; uv_loop_t *loop; int ret; @@ -401,6 +480,7 @@ void aclk_database_worker(void *arg) memset(&cmd, 0, sizeof(cmd)); #ifdef ENABLE_NEW_CLOUD_PROTOCOL + uv_work_t retention_work; sql_get_last_chart_sequence(wc); wc->chart_payload_count = sql_get_pending_count(wc); if (!wc->chart_payload_count) @@ -412,7 +492,9 @@ void aclk_database_worker(void *arg) wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); + while (likely(!netdata_exit)) { + worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ @@ -427,6 +509,10 @@ void aclk_database_worker(void *arg) opcode = cmd.opcode; ++cmd_batch_size; + + if(likely(opcode != ACLK_DATABASE_NOOP)) + worker_is_busy(opcode); + switch (opcode) { case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ @@ -439,6 +525,7 @@ void aclk_database_worker(void *arg) if (wc->host == localhost) sql_check_aclk_table_list(wc); break; + case ACLK_DATABASE_DELETE_HOST: debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data); sql_delete_aclk_table_list(wc, cmd); @@ -504,9 +591,21 @@ void aclk_database_worker(void *arg) aclk_process_dimension_deletion(wc, cmd); break; case ACLK_DATABASE_UPD_RETENTION: + if (unlikely(wc->retention_running)) + break; + + if (unlikely(request_retention_run())) { + wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY; + break; + } + debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); - aclk_update_retention(wc, cmd); - aclk_process_dimension_deletion(wc, cmd); + retention_work.data = wc; + wc->retention_running = 1; + if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) { + wc->retention_running = 0; + stop_retention_run(); + } break; // NODE_INSTANCE DETECTION @@ -535,6 +634,8 @@ void aclk_database_worker(void *arg) cmd.completion = NULL; wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); } + if (localhost == wc->host) + (void) sqlite3_wal_checkpoint(db_meta, NULL); break; default: debug(D_ACLK_SYNC, "%s: default.", __func__); @@ -577,6 +678,8 @@ void aclk_database_worker(void *arg) wc->host->dbsync_worker = NULL; freez(wc); rrd_unlock(); + + worker_unregister(); return; error_after_timer_init: @@ -585,6 +688,7 @@ error_after_async_init: fatal_assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); + worker_unregister(); } // ------------------------------------------------------------- @@ -628,7 +732,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) db_execute(buffer_tostring(sql)); buffer_flush(sql); - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str, uuid_str, uuid_str); + buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); db_execute(buffer_tostring(sql)); buffer_flush(sql); |