diff options
Diffstat (limited to '')
-rw-r--r-- | database/engine/rrdengine.c | 123 |
1 files changed, 68 insertions, 55 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 7811a5eaa..ce363183d 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -351,7 +351,7 @@ static struct { static void wal_cleanup1(void) { WAL *wal = NULL; - if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock)) + if(!spinlock_trylock(&wal_globals.protected.spinlock)) return; if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) { @@ -360,7 +360,7 @@ static void wal_cleanup1(void) { wal_globals.protected.available--; } - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); if(wal) { posix_memfree(wal->buf); @@ -375,7 +375,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { WAL *wal = NULL; - netdata_spinlock_lock(&wal_globals.protected.spinlock); + spinlock_lock(&wal_globals.protected.spinlock); if(likely(wal_globals.protected.available_items)) { wal = wal_globals.protected.available_items; @@ -384,7 +384,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { } uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); if(unlikely(!wal)) { wal = mallocz(sizeof(WAL)); @@ -416,10 +416,10 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { void wal_release(WAL *wal) { if(unlikely(!wal)) return; - netdata_spinlock_lock(&wal_globals.protected.spinlock); + spinlock_lock(&wal_globals.protected.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); wal_globals.protected.available++; - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); } // ---------------------------------------------------------------------------- @@ -459,7 +459,7 @@ void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) { } void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) { - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); struct rrdeng_cmd *cmd = get_cmd_cb(data); if(cmd) { @@ -472,7 +472,7 @@ void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY } } - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); } void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion, @@ -489,12 +489,12 @@ void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, v cmd->priority = priority; cmd->dequeue_cb = dequeue_cb; - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); rrdeng_main.cmd_queue.unsafe.waiting++; if(enqueue_cb) enqueue_cb(cmd); - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); fatal_assert(0 == uv_async_send(&rrdeng_main.async)); } @@ -532,7 +532,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { } // find an opcode to execute from the queue - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) { cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; if(cmd) { @@ -559,7 +559,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { cmd->dequeue_cb = NULL; } - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); struct rrdeng_cmd ret; if(cmd) { @@ -712,9 +712,9 @@ static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __m posix_memfree(xt_io_descr->buf); extent_io_descriptor_release(xt_io_descr); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.flushed_to_open_running--; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running) // we just finished a flushing on a datafile that is not the active one @@ -733,15 +733,15 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { if (uv_fs_request->result < 0) { ctx_io_error(ctx); - error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); + netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); } journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running--; datafile->writers.flushed_to_open_running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); rrdeng_enq_cmd(xt_io_descr->ctx, RRDENG_OPCODE_FLUSHED_TO_OPEN, @@ -756,12 +756,12 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { bool ret = false; - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx)) ret = true; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); return ret; } @@ -773,9 +773,9 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ uv_rwlock_rdlock(&ctx->datafiles.rwlock); datafile = ctx->datafiles.first->prev; // become a writer on this datafile, to prevent it from vanishing - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_rdunlock(&ctx->datafiles.rwlock); if(datafile_is_full(ctx, datafile)) { @@ -791,7 +791,7 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ datafile = ctx->datafiles.first->prev; uv_rwlock_rdunlock(&ctx->datafiles.rwlock); - if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0) + if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0) rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); @@ -801,15 +801,15 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ uv_rwlock_rdlock(&ctx->datafiles.rwlock); datafile = ctx->datafiles.first->prev; // become a writer on this datafile, to prevent it from vanishing - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_rdunlock(&ctx->datafiles.rwlock); // release the writers on the old datafile - netdata_spinlock_lock(&old_datafile->writers.spinlock); + spinlock_lock(&old_datafile->writers.spinlock); old_datafile->writers.running--; - netdata_spinlock_unlock(&old_datafile->writers.spinlock); + spinlock_unlock(&old_datafile->writers.spinlock); } return datafile; @@ -921,11 +921,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta real_io_size = ALIGN_BYTES_CEILING(size_bytes); datafile = get_datafile_to_write_extent(ctx); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); xt_io_descr->datafile = datafile; xt_io_descr->pos = datafile->pos; datafile->pos += real_io_size; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); xt_io_descr->bytes = size_bytes; xt_io_descr->uv_fs_request.data = xt_io_descr; @@ -998,12 +998,14 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc return next_datafile; } -void find_uuid_first_time( +time_t find_uuid_first_time( struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, struct uuid_first_time_s *uuid_first_entry_list, size_t count) { + time_t global_first_time_s = LONG_MAX; + // acquire the datafile to work with it uv_rwlock_rdlock(&ctx->datafiles.rwlock); while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION)) @@ -1011,7 +1013,7 @@ void find_uuid_first_time( uv_rwlock_rdunlock(&ctx->datafiles.rwlock); if (unlikely(!datafile)) - return; + return global_first_time_s; unsigned journalfile_count = 0; size_t binary_match = 0; @@ -1025,6 +1027,10 @@ void find_uuid_first_time( } time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + + if(journal_start_time_s < global_first_time_s) + global_first_time_s = journal_start_time_s; + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); struct uuid_first_time_s *uuid_original_entry; @@ -1137,9 +1143,13 @@ void find_uuid_first_time( without_retention, without_metric ); + + return global_first_time_s; } static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { + time_t global_first_time_s = LONG_MAX; + if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS); @@ -1174,7 +1184,7 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r added++; } - info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", + netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", ctx->config.tier, count, first_datafile_remaining->fileno); journalfile_v2_data_release(journalfile); @@ -1184,12 +1194,12 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION); - find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); + global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); if(worker) worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); - info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", + netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", ctx->config.tier, added); size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0; @@ -1223,6 +1233,9 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry", deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier); + if(global_first_time_s != LONG_MAX) + __atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED); + if(worker) worker_is_idle(); } @@ -1243,7 +1256,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); if (!datafile_got_for_deletion) { - info("DBENGINE: waiting for data file '%s/" + netdata_log_info("DBENGINE: waiting for data file '%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION "' to be available for deletion, " "it is in use currently by %u users.", @@ -1255,7 +1268,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * } __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED); - info("DBENGINE: deleting data file '%s/" + netdata_log_info("DBENGINE: deleting data file '%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION "'.", ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); @@ -1277,26 +1290,26 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * journal_file_bytes = journalfile_current_size(journal_file); deleted_bytes = journalfile_v2_data_size_get(journal_file); - info("DBENGINE: deleting data and journal files to maintain disk quota"); + netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota"); ret = journalfile_destroy_unsafe(journal_file, datafile); if (!ret) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); journalfile_v2_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); deleted_bytes += journal_file_bytes; } ret = destroy_data_file_unsafe(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: deleted data file \"%s\".", path); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); deleted_bytes += datafile_bytes; } freez(journal_file); freez(datafile); ctx_current_disk_space_decrease(ctx, deleted_bytes); - info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); + netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); } static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { @@ -1334,11 +1347,11 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse // find a datafile to work uv_rwlock_rdlock(&ctx->datafiles.rwlock); for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { - if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock)) + if(!spinlock_trylock(&datafile->populate_mrg.spinlock)) continue; if(datafile->populate_mrg.populated) { - netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); + spinlock_unlock(&datafile->populate_mrg.spinlock); continue; } @@ -1352,7 +1365,7 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile); datafile->populate_mrg.populated = true; - netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); + spinlock_unlock(&datafile->populate_mrg.spinlock); } while(1); @@ -1376,7 +1389,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { if(!logged) { logged = true; - info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), (ctx->config.legacy) ? -1 : ctx->config.tier); } @@ -1444,7 +1457,7 @@ void async_cb(uv_async_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); - debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); + netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } #define TIMER_PERIOD_MS (1000) @@ -1496,17 +1509,17 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb continue; } - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); if(!available) { - info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); + netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); datafile = datafile->next; continue; } - info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); @@ -1623,21 +1636,21 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { static bool spawned = false; static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); if(!spawned) { int ret; ret = uv_loop_init(&rrdeng_main.loop); if (ret) { - error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); return false; } rrdeng_main.loop.data = &rrdeng_main; ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb); if (ret) { - error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); return false; } @@ -1645,7 +1658,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer); if (ret) { - error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); uv_close((uv_handle_t *)&rrdeng_main.async, NULL); fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); return false; @@ -1658,7 +1671,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { spawned = true; } - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); return true; } @@ -1860,7 +1873,7 @@ void dbengine_event_loop(void* arg) { } /* cleanup operations of the event loop */ - info("DBENGINE: shutting down dbengine thread"); + netdata_log_info("DBENGINE: shutting down dbengine thread"); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, |