diff options
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 379 |
1 files changed, 240 insertions, 139 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9998ee54..24d3c1c6 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -1,57 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" - -// DBENGINE2: Helper - -static void update_metric_retention_and_granularity_by_uuid( - struct rrdengine_instance *ctx, uuid_t *uuid, - time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s) -{ - if(unlikely(last_time_s > now_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " - "fixing last time to now", - first_time_s, last_time_s, now_s); - last_time_s = now_s; - } - - if (unlikely(first_time_s > last_time_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " - "fixing first time to last time", - first_time_s, last_time_s, now_s); - - first_time_s = last_time_s; - } - - if (unlikely(first_time_s == 0 || last_time_s == 0)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " - "using them as-is", - first_time_s, last_time_s, now_s); - } - - bool added = false; - METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx); - if (!metric) { - MRG_ENTRY entry = { - .section = (Word_t) ctx, - .first_time_s = first_time_s, - .last_time_s = last_time_s, - .latest_update_every_s = (uint32_t) update_every_s - }; - uuid_copy(entry.uuid, *uuid); - metric = mrg_metric_add_and_acquire(main_mrg, entry, &added); - } - - if (likely(!added)) - mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s); - - mrg_metric_release(main_mrg, metric); -} - static void after_extent_write_journalfile_v1_io(uv_fs_t* req) { worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB); @@ -60,12 +9,12 @@ static void after_extent_write_journalfile_v1_io(uv_fs_t* req) struct generic_io_descriptor *io_descr = &wal->io_descr; struct rrdengine_instance *ctx = io_descr->ctx; - debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); if (req->result < 0) { ctx_io_error(ctx); - error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); } else { - debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); } uv_fs_req_cleanup(req); @@ -92,10 +41,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin io_descr->buf = wal->buf; io_descr->bytes = wal->buf_size; - netdata_spinlock_lock(&journalfile->unsafe.spinlock); + spinlock_lock(&journalfile->unsafe.spinlock); io_descr->pos = journalfile->unsafe.pos; journalfile->unsafe.pos += wal->buf_size; - netdata_spinlock_unlock(&journalfile->unsafe.spinlock); + spinlock_unlock(&journalfile->unsafe.spinlock); io_descr->req.data = wal; io_descr->data = journalfile; @@ -122,10 +71,129 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } +// ---------------------------------------------------------------------------- + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) { + struct rrdengine_datafile *datafile = NULL; + + rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock); + + Pvoid_t *PValue = NULL; + + if(unlikely(!s->init)) { + s->init = true; + s->last = s->wanted_start_time_s; + + PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + s->last = 0; + PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) + s->last = s->wanted_start_time_s; + } + } + + while(1) { + if (likely(!PValue)) { + PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + // cannot find anything after that point + datafile = NULL; + break; + } + } + + datafile = *PValue; + TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s, + datafile->journalfile->v2.last_time_s, + s->wanted_start_time_s, + s->wanted_end_time_s); + + if(rc == PAGE_IS_IN_RANGE) { + // this is good to return + break; + } + else if(rc == PAGE_IS_IN_THE_PAST) { + // continue to get the next + datafile = NULL; + PValue = NULL; + continue; + } + else /* PAGE_IS_IN_THE_FUTURE */ { + // we finished - no more datafiles + datafile = NULL; + PValue = NULL; + break; + } + } + + if(datafile) + s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL, + s->wanted_start_time_s, + s->wanted_end_time_s); + else + s->j2_header_acquired = NULL; + + rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock); + + return datafile; +} + +static void njfv2idx_add(struct rrdengine_datafile *datafile) { + internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s; + + do { + internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed"); + + Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if (unlikely(*PValue)) { + // already there + datafile->journalfile->njfv2idx.indexed_as++; + } + else { + *PValue = datafile; + break; + } + } while(0); + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +static void njfv2idx_remove(struct rrdengine_datafile *datafile) { + internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + + int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + (void)rc; + internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry"); + + datafile->journalfile->njfv2idx.indexed_as = 0; + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +// ---------------------------------------------------------------------------- + static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { struct journal_v2_header *j2_header = NULL; - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!journalfile->mmap.data) { journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); @@ -136,9 +204,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin journalfile->mmap.data = NULL; journalfile->mmap.size = 0; - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); ctx_fs_error(journalfile->datafile->ctx); } @@ -147,12 +215,21 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size); madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size); - madvise_random(journalfile->mmap.data, journalfile->mmap.size); - madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; - netdata_spinlock_unlock(&journalfile->v2.spinlock); + JOURNALFILE_FLAGS flags = journalfile->v2.flags; + spinlock_unlock(&journalfile->v2.spinlock); + + if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) { + // we need the entire metrics directory into memory to process it + madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory); + } + else { + // let the kernel know that we don't want read-ahead on this file + madvise_random(journalfile->mmap.data, journalfile->mmap.size); + // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); + } } } @@ -163,7 +240,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin *data_size = journalfile->mmap.size; } - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return j2_header; } @@ -173,20 +250,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if(!have_locks) { if(!wait) { - if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock)) + if (!spinlock_trylock(&journalfile->mmap.spinlock)) return false; } else - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!wait) { - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) { - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + if(!spinlock_trylock(&journalfile->v2.spinlock)) { + spinlock_unlock(&journalfile->mmap.spinlock); return false; } } else - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); } if(!journalfile->v2.refcount) { @@ -194,7 +271,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if (munmap(journalfile->mmap.data, journalfile->mmap.size)) { char path[RRDENG_PATH_MAX]; journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path)); - error("DBENGINE: failed to unmap index file '%s'", path); + netdata_log_error("DBENGINE: failed to unmap index file '%s'", path); internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path); ctx_fs_error(journalfile->datafile->ctx); } @@ -209,8 +286,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo } if(!have_locks) { - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } return unmounted; @@ -230,7 +307,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) { struct rrdengine_journalfile *journalfile = datafile->journalfile; - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) + if(!spinlock_trylock(&journalfile->v2.spinlock)) continue; bool unmount = false; @@ -244,7 +321,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { // 2 minutes have passed since last use unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if (unmount) journalfile_v2_mounted_data_unmount(journalfile, false, false); @@ -254,7 +331,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { } struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); @@ -276,7 +353,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(do_we_need_it) return journalfile_v2_mounted_data_get(journalfile, data_size); @@ -285,7 +362,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); @@ -300,7 +377,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(unmount) journalfile_v2_mounted_data_unmount(journalfile, false, true); @@ -308,25 +385,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); return has_data; } size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); size_t data_size = journalfile->mmap.size; - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return data_size; } void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); @@ -341,22 +418,27 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, struct journal_v2_header *j2_header = journalfile->mmap.data; journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC); journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC); + journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list); journalfile_v2_mounted_data_unmount(journalfile, true, true); - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + njfv2idx_add(journalfile->datafile); } static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + njfv2idx_remove(journalfile->datafile); + bool has_references = false; do { if (has_references) sleep_usec(10 * USEC_PER_MS); - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) { if(journalfile->mmap.fd != -1) @@ -374,8 +456,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile * internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); } - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } while(has_references); } @@ -384,9 +466,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi { struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); journalfile->datafile = datafile; - netdata_spinlock_init(&journalfile->mmap.spinlock); - netdata_spinlock_init(&journalfile->v2.spinlock); - netdata_spinlock_init(&journalfile->unsafe.spinlock); + spinlock_init(&journalfile->mmap.spinlock); + spinlock_init(&journalfile->v2.spinlock); + spinlock_init(&journalfile->unsafe.spinlock); journalfile->mmap.fd = -1; datafile->journalfile = journalfile; return journalfile; @@ -401,7 +483,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(datafile->ctx); } uv_fs_req_cleanup(&req); @@ -430,7 +512,7 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile) ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -454,7 +536,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct if (journalfile->file) { ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -464,14 +546,14 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct // This is the new journal v2 index file ret = uv_fs_unlink(NULL, &req, path_v2, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -516,7 +598,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); - error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); ctx_io_error(ctx); } uv_fs_req_cleanup(&req); @@ -548,7 +630,7 @@ static int journalfile_check_superblock(uv_file file) ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); uv_fs_req_cleanup(&req); goto error; } @@ -557,7 +639,7 @@ static int journalfile_check_superblock(uv_file file) if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { - error("DBENGINE: File has invalid superblock."); + netdata_log_error("DBENGINE: File has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -569,7 +651,7 @@ static int journalfile_check_superblock(uv_file file) static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) { - static BITMAP256 page_error_map; + static BITMAP256 page_error_map = BITMAP256_INITIALIZER; unsigned i, count, payload_length, descr_size; struct rrdeng_jf_store_data *jf_metric_data; @@ -578,7 +660,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, descr_size = sizeof(*jf_metric_data->descr) * count; payload_length = sizeof(*jf_metric_data) + descr_size; if (payload_length > max_size) { - error("DBENGINE: corrupted transaction payload."); + netdata_log_error("DBENGINE: corrupted transaction payload."); return; } @@ -589,7 +671,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, if (page_type > PAGE_TYPE_MAX) { if (!bitmap256_get_bit(&page_error_map, page_type)) { - error("DBENGINE: unknown page type %d encountered.", page_type); + netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type); bitmap256_set_bit(&page_error_map, page_type, 1); } continue; @@ -658,36 +740,36 @@ static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, s *id = 0; jf_header = buf; if (STORE_PADDING == jf_header->type) { - debug(D_RRDENGINE, "Skipping padding."); + netdata_log_debug(D_RRDENGINE, "Skipping padding."); return 0; } if (sizeof(*jf_header) > max_size) { - error("DBENGINE: corrupted transaction record, skipping."); + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); return 0; } *id = jf_header->id; payload_length = jf_header->payload_length; size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); if (size_bytes > max_size) { - error("DBENGINE: corrupted transaction record, skipping."); + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); return 0; } jf_trailer = buf + sizeof(*jf_header) + payload_length; crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); ret = crc32cmp(jf_trailer->checksum, crc); - debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); if (unlikely(ret)) { - error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); + netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); return size_bytes; } switch (jf_header->type) { case STORE_DATA: - debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); break; default: - error("DBENGINE: unknown transaction type, skipping record."); + netdata_log_error("DBENGINE: unknown transaction type, skipping record."); break; } @@ -725,7 +807,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, iov = uv_buf_init(buf, size_bytes); ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); uv_fs_req_cleanup(&req); goto skip_file; } @@ -764,7 +846,7 @@ static int journalfile_check_v2_extent_list (void *data_start, size_t file_size) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("DBENGINE: extent list CRC32 check: FAILED"); + netdata_log_error("DBENGINE: extent list CRC32 check: FAILED"); return 1; } @@ -784,7 +866,7 @@ static int journalfile_check_v2_metric_list(void *data_start, size_t file_size) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("DBENGINE: metric list CRC32 check: FAILED"); + netdata_log_error("DBENGINE: metric list CRC32 check: FAILED"); return 1; } return 0; @@ -828,19 +910,19 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size rc = crc32cmp(journal_v2_trailer->checksum, crc); if (unlikely(rc)) { - error("DBENGINE: file CRC32 check: FAILED"); + netdata_log_error("DBENGINE: file CRC32 check: FAILED"); return 1; } rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size); if (rc) return 1; - rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size); - if (rc) return 1; - if (!db_engine_journal_check) return 0; + rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size); + if (rc) return 1; + // Verify complete UUID chain struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset); @@ -849,7 +931,7 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size unsigned entries; unsigned total_pages = 0; - info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count); + netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count); for (entries = 0; entries < j2_header->metric_count; entries++) { char uuid_str[UUID_STR_LEN]; @@ -880,16 +962,16 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size metric++; if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) { - info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified); + netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified); return 1; } } if (entries != verified) { - info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified); + netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified); return 1; } - info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); + netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); return 0; } @@ -905,15 +987,25 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st uint8_t *data_start = (uint8_t *)j2_header; uint32_t entries = j2_header->metric_count; + if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK; + if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE; + // needs rebuild + return; + } + } + struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset); time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + time_t global_first_time_s = header_start_time_s; time_t now_s = max_acceptable_collected_time(); for (size_t i=0; i < entries; i++) { time_t start_time_s = header_start_time_s + metric->delta_start_s; time_t end_time_s = header_start_time_s + metric->delta_end_s; - update_metric_retention_and_granularity_by_uuid( - ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); + mrg_update_metric_retention_and_granularity_by_uuid( + main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); metric++; } @@ -921,12 +1013,18 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st journalfile_v2_data_release(journalfile); usec_t ended_ut = now_monotonic_usec(); - info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" + netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" , ctx->config.tier, journalfile->datafile->fileno , (double)data_size / 1024 / 1024 , (double)entries / 1000 , ((double)(ended_ut - started_ut) / USEC_PER_MS) ); + + time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);; + do { + if(old <= global_first_time_s) + break; + } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); } int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -949,13 +1047,13 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal if (errno == ENOENT) return 1; ctx_fs_error(ctx); - error("DBENGINE: failed to open '%s'", path_v2); + netdata_log_error("DBENGINE: failed to open '%s'", path_v2); return 1; } ret = fstat(fd, &statbuf); if (ret) { - error("DBENGINE: failed to get file information for '%s'", path_v2); + netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2); close(fd); return 1; } @@ -975,7 +1073,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal return 1; } - info("DBENGINE: checking integrity of '%s'", path_v2); + netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2); usec_t validation_start_ut = now_monotonic_usec(); int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size); if (unlikely(rc)) { @@ -987,7 +1085,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal error_report("File %s is invalid and it will be rebuilt", path_v2); if (unlikely(munmap(data_start, journal_v2_file_size))) - error("DBENGINE: failed to unmap '%s'", path_v2); + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); close(fd); return rc; @@ -998,7 +1096,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal if (unlikely(!entries)) { if (unlikely(munmap(data_start, journal_v2_file_size))) - error("DBENGINE: failed to unmap '%s'", path_v2); + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); close(fd); return 1; @@ -1006,7 +1104,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal usec_t finished_ut = now_monotonic_usec(); - info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " + netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " "mmap: %0.2f ms, validate: %0.2f ms" , path_v2 , (double)journal_v2_file_size / 1024 / 1024 @@ -1016,6 +1114,9 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal ); // Initialize the journal file to be able to access the data + + if (!db_engine_journal_check) + journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK; journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size); ctx_current_disk_space_increase(ctx, journal_v2_file_size); @@ -1179,7 +1280,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno journalfile_v2_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", + netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", path, number_of_extents, number_of_metrics, @@ -1350,7 +1451,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); - info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); + netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); // msync(data_start, total_file_size, MS_SYNC); journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size); @@ -1361,7 +1462,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno return; } else { - info("DBENGINE: failed to build index '%s', file will be skipped", path); + netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path); j2_header.data = NULL; j2_header.magic = JOURVAL_V2_SKIP_MAGIC; memcpy(data_start, &j2_header, sizeof(j2_header)); @@ -1378,7 +1479,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno if (ret < 0) { ctx_current_disk_space_increase(ctx, total_file_size); ctx_fs_error(ctx); - error("DBENGINE: failed to resize file '%s'", path); + netdata_log_error("DBENGINE: failed to resize file '%s'", path); } else ctx_current_disk_space_increase(ctx, resize_file_to); @@ -1428,19 +1529,19 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil ret = journalfile_check_superblock(file); if (ret) { - info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path); + netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path); error = ret; goto cleanup; } ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb)); - info("DBENGINE: loading journal file '%s'", path); + netdata_log_info("DBENGINE: loading journal file '%s'", path); max_id = journalfile_iterate_transactions(ctx, journalfile); __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED); - info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size); + netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size); bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno); if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) { @@ -1459,7 +1560,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil cleanup: ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); |