diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /database/engine/journalfile.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 1569 |
1 files changed, 1569 insertions, 0 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c new file mode 100644 index 00000000..9005b81c --- /dev/null +++ b/database/engine/journalfile.c @@ -0,0 +1,1569 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +static void after_extent_write_journalfile_v1_io(uv_fs_t* req) +{ + worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB); + + WAL *wal = req->data; + struct generic_io_descriptor *io_descr = &wal->io_descr; + struct rrdengine_instance *ctx = io_descr->ctx; + + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + if (req->result < 0) { + ctx_io_error(ctx); + netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } else { + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + } + + uv_fs_req_cleanup(req); + wal_release(wal); + + __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED); + + worker_is_idle(); +} + +/* Careful to always call this before creating a new journal file */ +void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop) +{ + int ret; + struct generic_io_descriptor *io_descr; + struct rrdengine_journalfile *journalfile = datafile->journalfile; + + io_descr = &wal->io_descr; + io_descr->ctx = ctx; + if (wal->size < wal->buf_size) { + /* simulate an empty transaction to skip the rest of the block */ + *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING; + } + io_descr->buf = wal->buf; + io_descr->bytes = wal->buf_size; + + spinlock_lock(&journalfile->unsafe.spinlock); + io_descr->pos = journalfile->unsafe.pos; + journalfile->unsafe.pos += wal->buf_size; + spinlock_unlock(&journalfile->unsafe.spinlock); + + io_descr->req.data = wal; + io_descr->data = journalfile; + io_descr->completion = NULL; + + io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size); + ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, + (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io); + fatal_assert(-1 != ret); + + ctx_current_disk_space_increase(ctx, wal->buf_size); + ctx_io_write_op_bytes(ctx, wal->buf_size); +} + +void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2, + datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); +} + +void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen - 1, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); +} + +// ---------------------------------------------------------------------------- + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) { + struct rrdengine_datafile *datafile = NULL; + + rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock); + + Pvoid_t *PValue = NULL; + + if(unlikely(!s->init)) { + s->init = true; + s->last = s->wanted_start_time_s; + + PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + s->last = 0; + PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) + s->last = s->wanted_start_time_s; + } + } + + while(1) { + if (likely(!PValue)) { + PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + // cannot find anything after that point + datafile = NULL; + break; + } + } + + datafile = *PValue; + TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s, + datafile->journalfile->v2.last_time_s, + s->wanted_start_time_s, + s->wanted_end_time_s); + + if(rc == PAGE_IS_IN_RANGE) { + // this is good to return + break; + } + else if(rc == PAGE_IS_IN_THE_PAST) { + // continue to get the next + datafile = NULL; + PValue = NULL; + continue; + } + else /* PAGE_IS_IN_THE_FUTURE */ { + // we finished - no more datafiles + datafile = NULL; + PValue = NULL; + break; + } + } + + if(datafile) + s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL, + s->wanted_start_time_s, + s->wanted_end_time_s); + else + s->j2_header_acquired = NULL; + + rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock); + + return datafile; +} + +static void njfv2idx_add(struct rrdengine_datafile *datafile) { + internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s; + + do { + internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed"); + + Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if (unlikely(*PValue)) { + // already there + datafile->journalfile->njfv2idx.indexed_as++; + } + else { + *PValue = datafile; + break; + } + } while(1); + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +static void njfv2idx_remove(struct rrdengine_datafile *datafile) { + internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + + int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + (void)rc; + internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry"); + + datafile->journalfile->njfv2idx.indexed_as = 0; + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +// ---------------------------------------------------------------------------- + +static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { + struct journal_v2_header *j2_header = NULL; + + spinlock_lock(&journalfile->mmap.spinlock); + + if(!journalfile->mmap.data) { + journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); + if (journalfile->mmap.data == MAP_FAILED) { + internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2"); + close(journalfile->mmap.fd); + journalfile->mmap.fd = -1; + journalfile->mmap.data = NULL; + journalfile->mmap.size = 0; + + spinlock_lock(&journalfile->v2.spinlock); + journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); + spinlock_unlock(&journalfile->v2.spinlock); + + ctx_fs_error(journalfile->datafile->ctx); + } + else { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED); + + madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size); + madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size); + + spinlock_lock(&journalfile->v2.spinlock); + journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; + JOURNALFILE_FLAGS flags = journalfile->v2.flags; + spinlock_unlock(&journalfile->v2.spinlock); + + if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) { + // we need the entire metrics directory into memory to process it + madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory); + } + else { + // let the kernel know that we don't want read-ahead on this file + madvise_random(journalfile->mmap.data, journalfile->mmap.size); + // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); + } + } + } + + if(journalfile->mmap.data) { + j2_header = journalfile->mmap.data; + + if (data_size) + *data_size = journalfile->mmap.size; + } + + spinlock_unlock(&journalfile->mmap.spinlock); + + return j2_header; +} + +static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks, bool wait) { + bool unmounted = false; + + if(!have_locks) { + if(!wait) { + if (!spinlock_trylock(&journalfile->mmap.spinlock)) + return false; + } + else + spinlock_lock(&journalfile->mmap.spinlock); + + if(!wait) { + if(!spinlock_trylock(&journalfile->v2.spinlock)) { + spinlock_unlock(&journalfile->mmap.spinlock); + return false; + } + } + else + spinlock_lock(&journalfile->v2.spinlock); + } + + if(!journalfile->v2.refcount) { + if(journalfile->mmap.data) { + if (munmap(journalfile->mmap.data, journalfile->mmap.size)) { + char path[RRDENG_PATH_MAX]; + journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path)); + netdata_log_error("DBENGINE: failed to unmap index file '%s'", path); + internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path); + ctx_fs_error(journalfile->datafile->ctx); + } + else { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED); + journalfile->mmap.data = NULL; + journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED; + } + } + + unmounted = true; + } + + if(!have_locks) { + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + } + + return unmounted; +} + +void journalfile_v2_data_unmount_cleanup(time_t now_s) { + // DO NOT WAIT ON ANY LOCK!!! + + for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) { + struct rrdengine_instance *ctx = multidb_ctx[tier]; + if(!ctx) continue; + + struct rrdengine_datafile *datafile; + if(uv_rwlock_tryrdlock(&ctx->datafiles.rwlock) != 0) + continue; + + for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) { + struct rrdengine_journalfile *journalfile = datafile->journalfile; + + if(!spinlock_trylock(&journalfile->v2.spinlock)) + continue; + + bool unmount = false; + if (!journalfile->v2.refcount && (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED)) { + // this journal has no references and it is mounted + + if (!journalfile->v2.not_needed_since_s) + journalfile->v2.not_needed_since_s = now_s; + + else if (now_s - journalfile->v2.not_needed_since_s >= 120) + // 2 minutes have passed since last use + unmount = true; + } + spinlock_unlock(&journalfile->v2.spinlock); + + if (unmount) + journalfile_v2_mounted_data_unmount(journalfile, false, false); + } + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + } +} + +struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { + spinlock_lock(&journalfile->v2.spinlock); + + bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); + bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); + bool do_we_need_it = false; + + if(has_data) { + if (!wanted_first_time_s || !wanted_last_time_s || + is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s, + wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) { + + journalfile->v2.refcount++; + + do_we_need_it = true; + + if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted) + journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION; + else + journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION; + + } + } + spinlock_unlock(&journalfile->v2.spinlock); + + if(do_we_need_it) + return journalfile_v2_mounted_data_get(journalfile, data_size); + + return NULL; +} + +void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { + spinlock_lock(&journalfile->v2.spinlock); + + internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); + internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); + + bool unmount = false; + + journalfile->v2.refcount--; + + if(journalfile->v2.refcount == 0) { + journalfile->v2.not_needed_since_s = 0; + + if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) + unmount = true; + } + spinlock_unlock(&journalfile->v2.spinlock); + + if(unmount) + journalfile_v2_mounted_data_unmount(journalfile, false, true); +} + +bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { + + spinlock_lock(&journalfile->v2.spinlock); + bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); + spinlock_unlock(&journalfile->v2.spinlock); + + return has_data; +} + +size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { + + spinlock_lock(&journalfile->mmap.spinlock); + size_t data_size = journalfile->mmap.size; + spinlock_unlock(&journalfile->mmap.spinlock); + + return data_size; +} + +void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); + + internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); + internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); + internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile"); + + journalfile->mmap.fd = fd; + journalfile->mmap.data = journal_data; + journalfile->mmap.size = journal_data_size; + journalfile->v2.not_needed_since_s = now_monotonic_sec(); + journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; + + struct journal_v2_header *j2_header = journalfile->mmap.data; + journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC); + journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC); + journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list); + + journalfile_v2_mounted_data_unmount(journalfile, true, true); + + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + njfv2idx_add(journalfile->datafile); +} + +static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + njfv2idx_remove(journalfile->datafile); + + bool has_references = false; + + do { + if (has_references) + sleep_usec(10 * USEC_PER_MS); + + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); + + if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) { + if(journalfile->mmap.fd != -1) + close(journalfile->mmap.fd); + + journalfile->mmap.fd = -1; + journalfile->mmap.data = NULL; + journalfile->mmap.size = 0; + journalfile->v2.first_time_s = 0; + journalfile->v2.last_time_s = 0; + journalfile->v2.flags = 0; + } + else { + has_references = true; + internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); + } + + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + } while(has_references); +} + +struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile) +{ + struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); + journalfile->datafile = datafile; + spinlock_init(&journalfile->mmap.spinlock); + spinlock_init(&journalfile->v2.spinlock); + spinlock_init(&journalfile->unsafe.spinlock); + journalfile->mmap.fd = -1; + datafile->journalfile = journalfile; + return journalfile; +} + +static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) +{ + int ret; + char path[RRDENG_PATH_MAX]; + + uv_fs_t req; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + journalfile_v1_generate_path(datafile, path, sizeof(path)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(datafile->ctx); + } + uv_fs_req_cleanup(&req); + return ret; +} + +int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + if(journalfile_v2_data_available(journalfile)) { + journalfile_v2_data_unmap_permanently(journalfile); + return 0; + } + + return close_uv_file(datafile, journalfile->file); +} + +int journalfile_unlink(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED); + + return ret; +} + +int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + char path_v2[RRDENG_PATH_MAX]; + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + journalfile_v2_generate_path(datafile, path_v2, sizeof(path)); + + if (journalfile->file) { + ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + (void) close_uv_file(datafile, journalfile->file); + } + + // This is the new journal v2 index file + ret = uv_fs_unlink(NULL, &req, path_v2, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED); + + if(journalfile_v2_data_available(journalfile)) + journalfile_v2_data_unmap_permanently(journalfile); + + return ret; +} + +int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io); + if (fd < 0) { + ctx_fs_error(ctx); + return fd; + } + journalfile->file = file; + __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED); + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + } + memset(superblock, 0, sizeof(*superblock)); + (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ); + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + ctx_io_error(ctx); + } + uv_fs_req_cleanup(&req); + posix_memfree(superblock); + if (ret < 0) { + journalfile_destroy_unsafe(journalfile, datafile); + return ret; + } + + journalfile->unsafe.pos = sizeof(*superblock); + + ctx_io_write_op_bytes(ctx, sizeof(*superblock)); + + return 0; +} + +static int journalfile_check_superblock(uv_file file) +{ + int ret; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { + netdata_log_error("DBENGINE: File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + posix_memfree(superblock); + return ret; +} + +static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) +{ + static BITMAP256 page_error_map = BITMAP256_INITIALIZER; + unsigned i, count, payload_length, descr_size; + struct rrdeng_jf_store_data *jf_metric_data; + + jf_metric_data = buf; + count = jf_metric_data->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + if (payload_length > max_size) { + netdata_log_error("DBENGINE: corrupted transaction payload."); + return; + } + + time_t now_s = max_acceptable_collected_time(); + for (i = 0; i < count ; ++i) { + uuid_t *temp_id; + uint8_t page_type = jf_metric_data->descr[i].type; + + if (page_type > PAGE_TYPE_MAX) { + if (!bitmap256_get_bit(&page_error_map, page_type)) { + netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type); + bitmap256_set_bit(&page_error_map, page_type, 1); + } + continue; + } + + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; + METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx); + + struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i]; + VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr( + descr, now_s, + (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0, + false); + + if(!vd.is_valid) { + if(metric) + mrg_metric_release(main_mrg, metric); + + continue; + } + + bool update_metric_time = true; + if (!metric) { + MRG_ENTRY entry = { + .uuid = temp_id, + .section = (Word_t)ctx, + .first_time_s = vd.start_time_s, + .last_time_s = vd.end_time_s, + .latest_update_every_s = (uint32_t) vd.update_every_s, + }; + + bool added; + metric = mrg_metric_add_and_acquire(main_mrg, entry, &added); + if(added) + update_metric_time = false; + } + Word_t metric_id = mrg_metric_id(main_mrg, metric); + + if (update_metric_time) + mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s); + + pgc_open_add_hot_page( + (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s, + journalfile->datafile, + jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length); + + mrg_metric_release(main_mrg, metric); + } +} + +/* + * Replays transaction by interpreting up to max_size bytes from buf. + * Sets id to the current transaction id or to 0 if unknown. + * Returns size of transaction record or 0 for unknown size. + */ +static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, uint64_t *id, unsigned max_size) +{ + unsigned payload_length, size_bytes; + int ret; + /* persistent structures */ + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + *id = 0; + jf_header = buf; + if (STORE_PADDING == jf_header->type) { + netdata_log_debug(D_RRDENGINE, "Skipping padding."); + return 0; + } + if (sizeof(*jf_header) > max_size) { + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); + return 0; + } + *id = jf_header->id; + payload_length = jf_header->payload_length; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + if (size_bytes > max_size) { + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); + return 0; + } + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + ret = crc32cmp(jf_trailer->checksum, crc); + netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); + return size_bytes; + } + switch (jf_header->type) { + case STORE_DATA: + netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + netdata_log_error("DBENGINE: unknown transaction type, skipping record."); + break; + } + + return size_bytes; +} + + +#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256) +/* + * Iterates journal file transactions and populates the page cache. + * Page cache must already be initialized. + * Returns the maximum transaction id it discovered. + */ +static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) +{ + uv_file file; + uint64_t file_size; + int ret; + uint64_t pos, pos_i, max_id, id; + unsigned size_bytes; + void *buf; + uv_buf_t iov; + uv_fs_t req; + + file = journalfile->file; + file_size = journalfile->unsafe.pos; + + max_id = 1; + ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); + if (unlikely(ret)) + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + + for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) { + size_bytes = MIN(READAHEAD_BYTES, file_size - pos); + iov = uv_buf_init(buf, size_bytes); + ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto skip_file; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + ctx_io_read_op_bytes(ctx, size_bytes); + + for (pos_i = 0; pos_i < size_bytes;) { + unsigned max_size; + + max_size = pos + size_bytes - pos_i; + ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); + if (!ret) /* TODO: support transactions bigger than 4K */ + /* unknown transaction size, move on to the next block */ + pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE); + else + pos_i += ret; + max_id = MAX(max_id, id); + } + } +skip_file: + posix_memfree(buf); + return max_id; +} + +// Checks that the extent list checksum is valid +static int journalfile_check_v2_extent_list (void *data_start, size_t file_size) +{ + UNUSED(file_size); + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); + if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { + netdata_log_error("DBENGINE: extent list CRC32 check: FAILED"); + return 1; + } + + return 0; +} + +// Checks that the metric list (UUIDs) checksum is valid +static int journalfile_check_v2_metric_list(void *data_start, size_t file_size) +{ + UNUSED(file_size); + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); + if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { + netdata_log_error("DBENGINE: metric list CRC32 check: FAILED"); + return 1; + } + return 0; +} + +// +// Return +// 0 Ok +// 1 Invalid +// 2 Force rebuild +// 3 skip + +static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size, size_t journal_v1_file_size) +{ + int rc; + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC) + return 2; + + if (j2_header->magic == JOURVAL_V2_SKIP_MAGIC) + return 3; + + // Magic failure + if (j2_header->magic != JOURVAL_V2_MAGIC) + return 1; + + if (j2_header->journal_v2_file_size != journal_v2_file_size) + return 1; + + if (journal_v1_file_size && j2_header->journal_v1_file_size != journal_v1_file_size) + return 1; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + journal_v2_file_size - sizeof(*journal_v2_trailer)); + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *) j2_header, sizeof(*j2_header)); + + rc = crc32cmp(journal_v2_trailer->checksum, crc); + if (unlikely(rc)) { + netdata_log_error("DBENGINE: file CRC32 check: FAILED"); + return 1; + } + + rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size); + if (rc) return 1; + + if (!db_engine_journal_check) + return 0; + + rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size); + if (rc) return 1; + + // Verify complete UUID chain + + struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset); + + unsigned verified = 0; + unsigned entries; + unsigned total_pages = 0; + + netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count); + for (entries = 0; entries < j2_header->metric_count; entries++) { + + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(metric->uuid, uuid_str); + struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset); + struct journal_page_header local_metric_list_header = *metric_list_header; + + local_metric_list_header.crc = JOURVAL_V2_MAGIC; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header)); + rc = crc32cmp(metric_list_header->checksum, crc); + + if (!rc) { + struct journal_v2_block_trailer *journal_trailer = + (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list)); + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list)); + rc = crc32cmp(journal_trailer->checksum, crc); + internal_error(rc, "DBENGINE: index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset, + crc, metric_list_header->crc); + if (!rc) { + total_pages += metric_list_header->entries; + verified++; + } + } + + metric++; + if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) { + netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified); + return 1; + } + } + + if (entries != verified) { + netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified); + return 1; + } + netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); + + return 0; +} + +void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) { + usec_t started_ut = now_monotonic_usec(); + + size_t data_size = 0; + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0); + if(!j2_header) + return; + + uint8_t *data_start = (uint8_t *)j2_header; + uint32_t entries = j2_header->metric_count; + + if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK; + if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE; + // needs rebuild + return; + } + } + + struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset); + time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + time_t global_first_time_s = header_start_time_s; + time_t now_s = max_acceptable_collected_time(); + for (size_t i=0; i < entries; i++) { + time_t start_time_s = header_start_time_s + metric->delta_start_s; + time_t end_time_s = header_start_time_s + metric->delta_end_s; + + mrg_update_metric_retention_and_granularity_by_uuid( + main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); + + metric++; + } + + journalfile_v2_data_release(journalfile); + usec_t ended_ut = now_monotonic_usec(); + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" + , ctx->config.tier, journalfile->datafile->fileno + , (double)data_size / 1024 / 1024 + , (double)entries / 1000 + , ((double)(ended_ut - started_ut) / USEC_PER_MS) + ); + + time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);; + do { + if(old <= global_first_time_s) + break; + } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); +} + +int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + int ret, fd; + char path_v1[RRDENG_PATH_MAX]; + char path_v2[RRDENG_PATH_MAX]; + struct stat statbuf; + size_t journal_v1_file_size = 0; + size_t journal_v2_file_size; + + journalfile_v1_generate_path(datafile, path_v1, sizeof(path_v1)); + ret = stat(path_v1, &statbuf); + if (!ret) + journal_v1_file_size = (uint32_t)statbuf.st_size; + + journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2)); + fd = open(path_v2, O_RDONLY); + if (fd < 0) { + if (errno == ENOENT) + return 1; + ctx_fs_error(ctx); + netdata_log_error("DBENGINE: failed to open '%s'", path_v2); + return 1; + } + + ret = fstat(fd, &statbuf); + if (ret) { + netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2); + close(fd); + return 1; + } + + journal_v2_file_size = (size_t)statbuf.st_size; + + if (journal_v2_file_size < sizeof(struct journal_v2_header)) { + error_report("Invalid file %s. Not the expected size", path_v2); + close(fd); + return 1; + } + + usec_t mmap_start_ut = now_monotonic_usec(); + uint8_t *data_start = mmap(NULL, journal_v2_file_size, PROT_READ, MAP_SHARED, fd, 0); + if (data_start == MAP_FAILED) { + close(fd); + return 1; + } + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: checking integrity of '%s'", path_v2); + + usec_t validation_start_ut = now_monotonic_usec(); + int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size); + if (unlikely(rc)) { + if (rc == 2) + error_report("File %s needs to be rebuilt", path_v2); + else if (rc == 3) + error_report("File %s will be skipped", path_v2); + else + error_report("File %s is invalid and it will be rebuilt", path_v2); + + if (unlikely(munmap(data_start, journal_v2_file_size))) + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); + + close(fd); + return rc; + } + + struct journal_v2_header *j2_header = (void *) data_start; + uint32_t entries = j2_header->metric_count; + + if (unlikely(!entries)) { + if (unlikely(munmap(data_start, journal_v2_file_size))) + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); + + close(fd); + return 1; + } + + usec_t finished_ut = now_monotonic_usec(); + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " + "mmap: %0.2f ms, validate: %0.2f ms" + , path_v2 + , (double)journal_v2_file_size / 1024 / 1024 + , (double)entries / 1000 + , ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS) + , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS) + ); + + // Initialize the journal file to be able to access the data + + if (!db_engine_journal_check) + journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK; + journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size); + + ctx_current_disk_space_increase(ctx, journal_v2_file_size); + + // File is OK load it + return 0; +} + +struct journal_metric_list_to_sort { + struct jv2_metrics_info *metric_info; +}; + +static int journalfile_metric_compare (const void *item1, const void *item2) +{ + const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info; + const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info; + + return memcmp(metric1->uuid, metric2->uuid, sizeof(uuid_t)); +} + + +// Write list of extents for the journalfile +void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data) +{ + Pvoid_t *PValue; + struct journal_extent_list *j2_extent_base = (void *) data; + struct jv2_extents_info *ext_info; + + bool first = true; + Word_t pos = 0; + size_t count = 0; + while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &pos, &first))) { + ext_info = *PValue; + size_t index = ext_info->index; + j2_extent_base[index].file_index = 0; + j2_extent_base[index].datafile_offset = ext_info->pos; + j2_extent_base[index].datafile_size = ext_info->bytes; + j2_extent_base[index].pages = ext_info->number_of_pages; + count++; + } + return j2_extent_base + count; +} + +static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes) +{ + if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->journal_v2_file_size - sizeof(struct journal_v2_block_trailer))) + return 1; + + return 0; +} + +void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset) +{ + struct journal_metric_list *metric = (void *) data; + + if (journalfile_verify_space(j2_header, data, sizeof(*metric))) + return NULL; + + uuid_copy(metric->uuid, *metric_info->uuid); + metric->entries = metric_info->number_of_pages; + metric->page_offset = pages_offset; + metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC)); + metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC)); + metric->update_every_s = 0; + + return ++metric; +} + +void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset) +{ + struct journal_page_header *data_page_header = (void *) data; + uLong crc; + + uuid_copy(data_page_header->uuid, *metric_info->uuid); + data_page_header->entries = metric_info->number_of_pages; + data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory + data_page_header->crc = JOURVAL_V2_MAGIC; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header)); + crc32set(data_page_header->checksum, crc); + return ++data_page_header; +} + +void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header) +{ + struct journal_page_header *data_page_header = (void *) page_header; + struct journal_v2_block_trailer *journal_trailer = (void *) data; + uLong crc; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list)); + crc32set(journal_trailer->checksum, crc); + return ++journal_trailer; +} + +void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info) +{ + struct journal_page_list *data_page = data; + + if (journalfile_verify_space(j2_header, data, sizeof(*data_page))) + return NULL; + + struct extent_io_data *ei = page_info->custom_data; + + data_page->delta_start_s = (uint32_t) (page_info->start_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC); + data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC); + data_page->extent_index = page_info->extent_index; + + data_page->update_every_s = (uint32_t) page_info->update_every_s; + data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length); + data_page->type = 0; + + return ++data_page; +} + +// Must be recorded in metric_info->entries +static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, + struct journal_metric_list *current_metric) +{ + Pvoid_t *PValue; + + struct journal_page_list *data_page = (void *)data; + // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s + // that belong to this journal file + Pvoid_t JudyL_array = metric_info->JudyL_pages_by_start_time; + + Word_t index_time = 0; + bool first = true; + struct jv2_page_info *page_info; + uint32_t update_every_s = 0; + while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) { + page_info = *PValue; + // Write one descriptor and return the next data page location + data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info); + update_every_s = (uint32_t) page_info->update_every_s; + if (NULL == data_page) + break; + } + current_metric->update_every_s = update_every_s; + return data_page; +} + +// Migrate the journalfile pointed by datafile +// activate : make the new file active immediately +// journafile data will be set and descriptors (if deleted) will be repopulated as needed +// startup : if the migration is done during agent startup +// this will allow us to optimize certain things + +void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused, + Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos, + size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data) +{ + char path[RRDENG_PATH_MAX]; + Pvoid_t *PValue; + struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; + struct rrdengine_journalfile *journalfile = (struct rrdengine_journalfile *) user_data; + struct rrdengine_datafile *datafile = journalfile->datafile; + time_t min_time_s = LONG_MAX; + time_t max_time_s = 0; + struct jv2_metrics_info *metric_info; + + journalfile_v2_generate_path(datafile, path, sizeof(path)); + + netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", + path, + number_of_extents, + number_of_metrics, + number_of_pages); + +#ifdef NETDATA_INTERNAL_CHECKS + usec_t start_loading = now_monotonic_usec(); +#endif + + size_t total_file_size = 0; + total_file_size += (sizeof(struct journal_v2_header) + JOURNAL_V2_HEADER_PADDING_SZ); + + // Extents will start here + uint32_t extent_offset = total_file_size; + total_file_size += (number_of_extents * sizeof(struct journal_extent_list)); + + uint32_t extent_offset_trailer = total_file_size; + total_file_size += sizeof(struct journal_v2_block_trailer); + + // UUID list will start here + uint32_t metrics_offset = total_file_size; + total_file_size += (number_of_metrics * sizeof(struct journal_metric_list)); + + // UUID list trailer + uint32_t metric_offset_trailer = total_file_size; + total_file_size += sizeof(struct journal_v2_block_trailer); + + // descr @ time will start here + uint32_t pages_offset = total_file_size; + total_file_size += (number_of_pages * (sizeof(struct journal_page_list) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer))); + + // File trailer + uint32_t trailer_offset = total_file_size; + total_file_size += sizeof(struct journal_v2_block_trailer); + + int fd_v2; + uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2); + uint8_t *data = data_start; + + memset(data_start, 0, extent_offset); + + // Write header + struct journal_v2_header j2_header; + memset(&j2_header, 0, sizeof(j2_header)); + + j2_header.magic = JOURVAL_V2_MAGIC; + j2_header.start_time_ut = 0; + j2_header.end_time_ut = 0; + j2_header.extent_count = number_of_extents; + j2_header.extent_offset = extent_offset; + j2_header.metric_count = number_of_metrics; + j2_header.metric_offset = metrics_offset; + j2_header.page_count = number_of_pages; + j2_header.page_offset = pages_offset; + j2_header.extent_trailer_offset = extent_offset_trailer; + j2_header.metric_trailer_offset = metric_offset_trailer; + j2_header.journal_v2_file_size = total_file_size; + j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile); + j2_header.data = data_start; // Used during migration + + struct journal_v2_block_trailer *journal_v2_trailer; + + data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset); + internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + + fatal_assert(data == data_start + extent_offset_trailer); + + // Calculate CRC for extents + journal_v2_trailer = (struct journal_v2_block_trailer *) (data_start + extent_offset_trailer); + uLong crc; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list)); + crc32set(journal_v2_trailer->checksum, crc); + + internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + // Skip the trailer, point to the metrics off + data += sizeof(struct journal_v2_block_trailer); + + // Sanity check -- we must be at the metrics_offset + fatal_assert(data == data_start + metrics_offset); + + // Allocate array to sort UUIDs and keep them sorted in the journal because we want to do binary search when we do lookups + struct journal_metric_list_to_sort *uuid_list = mallocz(number_of_metrics * sizeof(struct journal_metric_list_to_sort)); + + Word_t Index = 0; + size_t count = 0; + bool first_then_next = true; + while ((PValue = JudyLFirstThenNext(JudyL_metrics, &Index, &first_then_next))) { + metric_info = *PValue; + + fatal_assert(count < number_of_metrics); + uuid_list[count++].metric_info = metric_info; + min_time_s = MIN(min_time_s, metric_info->first_time_s); + max_time_s = MAX(max_time_s, metric_info->last_time_s); + } + + // Store in the header + j2_header.start_time_ut = min_time_s * USEC_PER_SEC; + j2_header.end_time_ut = max_time_s * USEC_PER_SEC; + + qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare); + internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + + uint32_t resize_file_to = total_file_size; + + for (Index = 0; Index < number_of_metrics; Index++) { + metric_info = uuid_list[Index].metric_info; + + // Calculate current UUID offset from start of file. We will store this in the data page header + uint32_t uuid_offset = data - data_start; + + struct journal_metric_list *current_metric = (void *) data; + // Write the UUID we are processing + data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset); + if (unlikely(!data)) + break; + + // Next we will write + // Header + // Detailed entries (descr @ time) + // Trailer (checksum) + + // Keep the page_list_header, to be used for migration when where agent is running + metric_info->page_list_header = pages_offset; + // Write page header + void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info, + uuid_offset); + + // Start writing descr @ time + void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info, current_metric); + if (unlikely(!page_trailer)) + break; + + // Trailer (checksum) + uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer, + data_start + pages_offset); + + // Calculate start of the pages start for next descriptor + pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)); + // Verify we are at the right location + if (pages_offset != (uint32_t)(next_page_address - data_start)) { + // make sure checks fail so that we abort + data = data_start; + break; + } + } + + if (data == data_start + metric_offset_trailer) { + internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + + // Calculate CRC for metrics + journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer); + crc = crc32(0L, Z_NULL, 0); + crc = + crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list)); + crc32set(journal_v2_trailer->checksum, crc); + internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + + // Prepare to write checksum for the file + j2_header.data = NULL; + journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + trailer_offset); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *)&j2_header, sizeof(j2_header)); + crc32set(journal_v2_trailer->checksum, crc); + + // Write header to the file + memcpy(data_start, &j2_header, sizeof(j2_header)); + + internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + + netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); + + // msync(data_start, total_file_size, MS_SYNC); + journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size); + + internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); + ctx_current_disk_space_increase(ctx, total_file_size); + freez(uuid_list); + return; + } + else { + netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path); + j2_header.data = NULL; + j2_header.magic = JOURVAL_V2_SKIP_MAGIC; + memcpy(data_start, &j2_header, sizeof(j2_header)); + resize_file_to = sizeof(j2_header); + } + + netdata_munmap(data_start, total_file_size); + freez(uuid_list); + + if (likely(resize_file_to == total_file_size)) + return; + + int ret = truncate(path, (long) resize_file_to); + if (ret < 0) { + ctx_current_disk_space_increase(ctx, total_file_size); + ctx_fs_error(ctx); + netdata_log_error("DBENGINE: failed to resize file '%s'", path); + } + else + ctx_current_disk_space_increase(ctx, resize_file_to); +} + +int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile) +{ + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size, max_id; + char path[RRDENG_PATH_MAX]; + bool loaded_v2 = false; + + // Do not try to load jv2 of the latest file + if (datafile->fileno != ctx_last_fileno_get(ctx)) + loaded_v2 = journalfile_v2_load(ctx, journalfile, datafile) == 0; + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + + fd = open_file_for_io(path, O_RDWR, &file, use_direct_io); + if (fd < 0) { + ctx_fs_error(ctx); + + if(loaded_v2) + return 0; + + return fd; + } + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) { + error = ret; + goto cleanup; + } + + if(loaded_v2) { + journalfile->unsafe.pos = file_size; + error = 0; + goto cleanup; + } + + file_size = ALIGN_BYTES_FLOOR(file_size); + journalfile->unsafe.pos = file_size; + journalfile->file = file; + + ret = journalfile_check_superblock(file); + if (ret) { + netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path); + error = ret; + goto cleanup; + } + ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb)); + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: loading journal file '%s'", path); + + max_id = journalfile_iterate_transactions(ctx, journalfile); + + __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED); + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal file '%s' loaded (size:%" PRIu64 ").", path, file_size); + + bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno); + if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) { + ctx->loading.create_new_datafile_pair = false; + return 0; + } + + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, + journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); + + if (is_last_file) + ctx->loading.create_new_datafile_pair = true; + + return 0; + +cleanup: + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + return error; +} |