summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c1445
1 files changed, 1162 insertions, 283 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 500dd788..de2b909c 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,132 +1,424 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-static void flush_transaction_buffer_cb(uv_fs_t* req)
+
+// DBENGINE2: Helper
+
+static void update_metric_retention_and_granularity_by_uuid(
+ struct rrdengine_instance *ctx, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s)
+{
+ if(unlikely(last_time_s > now_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
+
+ if (unlikely(first_time_s > last_time_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
+
+ first_time_s = last_time_s;
+ }
+
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
+
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = (Word_t) ctx,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = update_every_s
+ };
+ uuid_copy(entry.uuid, *uuid);
+ metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ }
+
+ if (likely(!added))
+ mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ mrg_metric_release(main_mrg, metric);
+}
+
+static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
- struct generic_io_descriptor *io_descr = req->data;
- struct rrdengine_worker_config* wc = req->loop->data;
- struct rrdengine_instance *ctx = wc->ctx;
+ worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
+
+ WAL *wal = req->data;
+ struct generic_io_descriptor *io_descr = &wal->io_descr;
+ struct rrdengine_instance *ctx = io_descr->ctx;
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
- error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ ctx_io_error(ctx);
+ error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
- posix_memfree(io_descr->buf);
- freez(io_descr);
+ wal_release(wal);
+
+ __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
+
+ worker_is_idle();
}
/* Careful to always call this before creating a new journal file */
-void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
{
- struct rrdengine_instance *ctx = wc->ctx;
int ret;
struct generic_io_descriptor *io_descr;
- unsigned pos, size;
- struct rrdengine_journalfile *journalfile;
+ struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
- return;
- }
- /* care with outstanding transactions when switching journal files */
- journalfile = ctx->datafiles.last->journalfile;
-
- io_descr = mallocz(sizeof(*io_descr));
- pos = ctx->commit_log.buf_pos;
- size = ctx->commit_log.buf_size;
- if (pos < size) {
+ io_descr = &wal->io_descr;
+ io_descr->ctx = ctx;
+ if (wal->size < wal->buf_size) {
/* simulate an empty transaction to skip the rest of the block */
- *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING;
}
- io_descr->buf = ctx->commit_log.buf;
- io_descr->bytes = size;
- io_descr->pos = journalfile->pos;
- io_descr->req.data = io_descr;
+ io_descr->buf = wal->buf;
+ io_descr->bytes = wal->buf_size;
+
+ netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ io_descr->pos = journalfile->unsafe.pos;
+ journalfile->unsafe.pos += wal->buf_size;
+ netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+
+ io_descr->req.data = wal;
+ io_descr->data = journalfile;
io_descr->completion = NULL;
- io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
- ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
- journalfile->pos, flush_transaction_buffer_cb);
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
+ ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
fatal_assert(-1 != ret);
- journalfile->pos += RRDENG_BLOCK_SIZE;
- ctx->disk_space += RRDENG_BLOCK_SIZE;
- ctx->commit_log.buf = NULL;
- ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
- ++ctx->stats.io_write_requests;
+
+ ctx_current_disk_space_increase(ctx, wal->buf_size);
+ ctx_io_write_op_bytes(ctx, wal->buf_size);
}
-void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
- struct rrdengine_instance *ctx = wc->ctx;
- int ret;
- unsigned buf_pos = 0, buf_size;
-
- fatal_assert(size);
- if (ctx->commit_log.buf) {
- unsigned remaining;
-
- buf_pos = ctx->commit_log.buf_pos;
- buf_size = ctx->commit_log.buf_size;
- remaining = buf_size - buf_pos;
- if (size > remaining) {
- /* we need a new buffer */
- wal_flush_transaction_buffer(wc);
+ (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
+ datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
+ struct journal_v2_header *j2_header = NULL;
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+
+ if(!journalfile->mmap.data) {
+ journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
+ if (journalfile->mmap.data == MAP_FAILED) {
+ internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2");
+ close(journalfile->mmap.fd);
+ journalfile->mmap.fd = -1;
+ journalfile->mmap.data = NULL;
+ journalfile->mmap.size = 0;
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ ctx_fs_error(journalfile->datafile->ctx);
+ }
+ else {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
+
+ madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_random(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ }
+ }
+
+ if(journalfile->mmap.data) {
+ j2_header = journalfile->mmap.data;
+
+ if (data_size)
+ *data_size = journalfile->mmap.size;
+ }
+
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ return j2_header;
+}
+
+static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks, bool wait) {
+ bool unmounted = false;
+
+ if(!have_locks) {
+ if(!wait) {
+ if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
+ return false;
}
+ else
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+
+ if(!wait) {
+ if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ return false;
+ }
+ }
+ else
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
}
- if (NULL == ctx->commit_log.buf) {
- buf_size = ALIGN_BYTES_CEILING(size);
- ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+
+ if(!journalfile->v2.refcount) {
+ if(journalfile->mmap.data) {
+ if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
+ char path[RRDENG_PATH_MAX];
+ journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
+ error("DBENGINE: failed to unmap index file '%s'", path);
+ internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
+ ctx_fs_error(journalfile->datafile->ctx);
+ }
+ else {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
+ journalfile->mmap.data = NULL;
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED;
+ }
}
- memset(ctx->commit_log.buf, 0, buf_size);
- buf_pos = ctx->commit_log.buf_pos = 0;
- ctx->commit_log.buf_size = buf_size;
+
+ unmounted = true;
}
- ctx->commit_log.buf_pos += size;
- return ctx->commit_log.buf + buf_pos;
+ if(!have_locks) {
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ }
+
+ return unmounted;
}
-void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
-{
- (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+void journalfile_v2_data_unmount_cleanup(time_t now_s) {
+ // DO NOT WAIT ON ANY LOCK!!!
+
+ for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) {
+ struct rrdengine_instance *ctx = multidb_ctx[tier];
+ if(!ctx) continue;
+
+ struct rrdengine_datafile *datafile;
+ if(uv_rwlock_tryrdlock(&ctx->datafiles.rwlock) != 0)
+ continue;
+
+ for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
+ struct rrdengine_journalfile *journalfile = datafile->journalfile;
+
+ if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
+ continue;
+
+ bool unmount = false;
+ if (!journalfile->v2.refcount && (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED)) {
+ // this journal has no references and it is mounted
+
+ if (!journalfile->v2.not_needed_since_s)
+ journalfile->v2.not_needed_since_s = now_s;
+
+ else if (now_s - journalfile->v2.not_needed_since_s >= 120)
+ // 2 minutes have passed since last use
+ unmount = true;
+ }
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ if (unmount)
+ journalfile_v2_mounted_data_unmount(journalfile, false, false);
+ }
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+ }
+}
+
+struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
+ bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
+ bool do_we_need_it = false;
+
+ if(has_data) {
+ if (!wanted_first_time_s || !wanted_last_time_s ||
+ is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s,
+ wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) {
+
+ journalfile->v2.refcount++;
+
+ do_we_need_it = true;
+
+ if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted)
+ journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
+ else
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
+
+ }
+ }
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(do_we_need_it)
+ return journalfile_v2_mounted_data_get(journalfile, data_size);
+
+ return NULL;
+}
+
+void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
+ internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
+
+ bool unmount = false;
+
+ journalfile->v2.refcount--;
+
+ if(journalfile->v2.refcount == 0) {
+ journalfile->v2.not_needed_since_s = 0;
+
+ if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
+ unmount = true;
+ }
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(unmount)
+ journalfile_v2_mounted_data_unmount(journalfile, false, true);
+}
+
+bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ return has_data;
+}
+
+size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ size_t data_size = journalfile->mmap.size;
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ return data_size;
+}
+
+void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
+ internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
+ internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile");
+
+ journalfile->mmap.fd = fd;
+ journalfile->mmap.data = journal_data;
+ journalfile->mmap.size = journal_data_size;
+ journalfile->v2.not_needed_since_s = now_monotonic_sec();
+ journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
+
+ struct journal_v2_header *j2_header = journalfile->mmap.data;
+ journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
+ journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
+
+ journalfile_v2_mounted_data_unmount(journalfile, true, true);
+
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+}
+
+static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ bool has_references = false;
+
+ do {
+ if (has_references)
+ sleep_usec(10 * USEC_PER_MS);
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
+ if(journalfile->mmap.fd != -1)
+ close(journalfile->mmap.fd);
+
+ journalfile->mmap.fd = -1;
+ journalfile->mmap.data = NULL;
+ journalfile->mmap.size = 0;
+ journalfile->v2.first_time_s = 0;
+ journalfile->v2.last_time_s = 0;
+ journalfile->v2.flags = 0;
+ }
+ else {
+ has_references = true;
+ internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
+ }
+
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ } while(has_references);
}
-void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile)
{
- journalfile->file = (uv_file)0;
- journalfile->pos = 0;
+ struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
+ netdata_spinlock_init(&journalfile->mmap.spinlock);
+ netdata_spinlock_init(&journalfile->v2.spinlock);
+ netdata_spinlock_init(&journalfile->unsafe.spinlock);
+ journalfile->mmap.fd = -1;
+ datafile->journalfile = journalfile;
+ return journalfile;
}
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
{
- struct rrdengine_instance *ctx = datafile->ctx;
- uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
-
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ uv_fs_t req;
+ ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
+ error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(datafile->ctx);
}
uv_fs_req_cleanup(&req);
-
return ret;
}
-int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ if(journalfile_v2_data_available(journalfile)) {
+ journalfile_v2_data_unmap_permanently(journalfile);
+ return 0;
+ }
+
+ return close_uv_file(datafile, journalfile->file);
+}
+
+int journalfile_unlink(struct rrdengine_journalfile *journalfile)
{
struct rrdengine_datafile *datafile = journalfile->datafile;
struct rrdengine_instance *ctx = datafile->ctx;
@@ -134,60 +426,65 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
int ret;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
return ret;
}
-int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
+ char path_v2[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
+ journalfile_v2_generate_path(datafile, path_v2, sizeof(path));
+ if (journalfile->file) {
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
+ (void) close_uv_file(datafile, journalfile->file);
+ }
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ // This is the new journal v2 index file
+ ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
+
+ if(journalfile_v2_data_available(journalfile))
+ journalfile_v2_data_unmap_permanently(journalfile);
return ret;
}
-int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -197,19 +494,18 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_buf_t iov;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
+ fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
journalfile->file = file;
- ++ctx->stats.journalfile_creations;
+ __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
}
memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
@@ -220,25 +516,24 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("uv_fs_write: %s", uv_strerror(ret));
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
+ ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file(journalfile, datafile);
+ journalfile_destroy_unsafe(journalfile, datafile);
return ret;
}
- journalfile->pos = sizeof(*superblock);
- ctx->stats.io_write_bytes += sizeof(*superblock);
- ++ctx->stats.io_write_requests;
+ journalfile->unsafe.pos = sizeof(*superblock);
+
+ ctx_io_write_op_bytes(ctx, sizeof(*superblock));
return 0;
}
-static int check_journal_file_superblock(uv_file file)
+static int journalfile_check_superblock(uv_file file)
{
int ret;
struct rrdeng_jf_sb *superblock;
@@ -247,13 +542,13 @@ static int check_journal_file_superblock(uv_file file)
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
}
iov = uv_buf_init((void *)superblock, sizeof(*superblock));
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("uv_fs_read: %s", uv_strerror(ret));
+ error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -262,7 +557,7 @@ static int check_journal_file_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- error("File has invalid superblock.");
+ error("DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -272,15 +567,10 @@ static int check_journal_file_superblock(uv_file file)
return ret;
}
-static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- void *buf, unsigned max_size)
+static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
{
static BITMAP256 page_error_map;
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned i, count, payload_length, descr_size, valid_pages;
- struct rrdeng_page_descr *descr;
- struct extent_info *extent;
- /* persistent structures */
+ unsigned i, count, payload_length, descr_size;
struct rrdeng_jf_store_data *jf_metric_data;
jf_metric_data = buf;
@@ -288,117 +578,65 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
descr_size = sizeof(*jf_metric_data->descr) * count;
payload_length = sizeof(*jf_metric_data) + descr_size;
if (payload_length > max_size) {
- error("Corrupted transaction payload.");
+ error("DBENGINE: corrupted transaction payload.");
return;
}
- extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
- extent->offset = jf_metric_data->extent_offset;
- extent->size = jf_metric_data->extent_size;
- extent->datafile = journalfile->datafile;
- extent->next = NULL;
-
- for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ time_t now_s = max_acceptable_collected_time();
+ for (i = 0; i < count ; ++i) {
uuid_t *temp_id;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
uint8_t page_type = jf_metric_data->descr[i].type;
if (page_type > PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
- error("Unknown page type %d encountered.", page_type);
+ error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
}
continue;
}
- uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut;
- uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut;
- size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
- time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0;
-
- if (unlikely(start_time_ut > end_time_ut)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut;
- continue;
- }
- if (unlikely(start_time_ut == end_time_ut && entries != 1)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut;
- continue;
- }
-
- if (unlikely(!entries)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut;
- continue;
- }
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx);
- if(entries > 1 && update_every_s == 0) {
- ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut;
- continue;
- }
+ struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i];
+ VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
+ descr, now_s,
+ (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
+ false);
- if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) {
- ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut;
+ if(!vd.is_valid) {
+ if(metric)
+ mrg_metric_release(main_mrg, metric);
- // let this be
- // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1);
+ continue;
}
- temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- /* First time we see the UUID */
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ bool update_metric_time = true;
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = (Word_t)ctx,
+ .first_time_s = vd.start_time_s,
+ .last_time_s = vd.end_time_s,
+ .latest_update_every_s = vd.update_every_s,
+ };
+ uuid_copy(entry.uuid, *temp_id);
+
+ bool added;
+ metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ if(added)
+ update_metric_time = false;
}
+ Word_t metric_id = mrg_metric_id(main_mrg, metric);
- descr = pg_cache_create_descr();
- descr->page_length = jf_metric_data->descr[i].page_length;
- descr->start_time_ut = start_time_ut;
- descr->end_time_ut = end_time_ut;
- descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
- descr->id = &page_index->id;
- descr->extent = extent;
- descr->type = page_type;
- extent->pages[valid_pages++] = descr;
- pg_cache_insert(ctx, page_index, descr);
+ if (update_metric_time)
+ mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s);
- if(page_index->latest_time_ut == descr->end_time_ut)
- page_index->latest_update_every_s = descr->update_every_s;
+ pgc_open_add_hot_page(
+ (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s,
+ journalfile->datafile,
+ jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length);
- if(descr->update_every_s == 0)
- fatal(
- "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
- (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
- }
-
- extent->number_of_pages = valid_pages;
-
- if (likely(valid_pages))
- df_extent_insert(extent);
- else {
- freez(extent);
- ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
+ mrg_metric_release(main_mrg, metric);
}
}
@@ -407,8 +645,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
* Sets id to the current transaction id or to 0 if unknown.
* Returns size of transaction record or 0 for unknown size.
*/
-static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- void *buf, uint64_t *id, unsigned max_size)
+static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
{
unsigned payload_length, size_bytes;
int ret;
@@ -424,14 +662,14 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
return 0;
}
if (sizeof(*jf_header) > max_size) {
- error("Corrupted transaction record, skipping.");
+ error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
*id = jf_header->id;
payload_length = jf_header->payload_length;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
if (size_bytes > max_size) {
- error("Corrupted transaction record, skipping.");
+ error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
jf_trailer = buf + sizeof(*jf_header) + payload_length;
@@ -440,16 +678,16 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
ret = crc32cmp(jf_trailer->checksum, crc);
debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
- error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
case STORE_DATA:
debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
- restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
- error("Unknown transaction type. Skipping record.");
+ error("DBENGINE: unknown transaction type, skipping record.");
break;
}
@@ -463,10 +701,10 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
* Page cache must already be initialized.
* Returns the maximum transaction id it discovered.
*/
-static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
{
uv_file file;
- uint64_t file_size;//, data_file_size;
+ uint64_t file_size;
int ret;
uint64_t pos, pos_i, max_id, id;
unsigned size_bytes;
@@ -475,39 +713,31 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
uv_fs_t req;
file = journalfile->file;
- file_size = journalfile->pos;
- //data_file_size = journalfile->datafile->pos; TODO: utilize this?
+ file_size = journalfile->unsafe.pos;
max_id = 1;
- bool journal_is_mmapped = (journalfile->data != NULL);
- if (unlikely(!journal_is_mmapped)) {
- ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
- if (unlikely(ret))
- fatal("posix_memalign:%s", strerror(ret));
- }
- else
- buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
- for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret))
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
+
+ for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) {
size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
- if (unlikely(!journal_is_mmapped)) {
- iov = uv_buf_init(buf, size_bytes);
- ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
- if (ret < 0) {
- error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
- uv_fs_req_cleanup(&req);
- goto skip_file;
- }
- fatal_assert(req.result >= 0);
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
- ++ctx->stats.io_read_requests;
- ctx->stats.io_read_bytes += size_bytes;
+ goto skip_file;
}
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ctx_io_read_op_bytes(ctx, size_bytes);
- for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ for (pos_i = 0; pos_i < size_bytes;) {
unsigned max_size;
max_size = pos + size_bytes - pos_i;
- ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
if (!ret) /* TODO: support transactions bigger than 4K */
/* unknown transaction size, move on to the next block */
pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
@@ -515,73 +745,722 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
pos_i += ret;
max_id = MAX(max_id, id);
}
- if (likely(journal_is_mmapped))
- buf += size_bytes;
}
skip_file:
- if (unlikely(!journal_is_mmapped))
- posix_memfree(buf);
+ posix_memfree(buf);
return max_id;
}
-int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- struct rrdengine_datafile *datafile)
+// Checks that the extent list checksum is valid
+static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
+{
+ UNUSED(file_size);
+ uLong crc;
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
+ if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
+ error("DBENGINE: extent list CRC32 check: FAILED");
+ return 1;
+ }
+
+ return 0;
+}
+
+// Checks that the metric list (UUIDs) checksum is valid
+static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
+{
+ UNUSED(file_size);
+ uLong crc;
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
+ if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
+ error("DBENGINE: metric list CRC32 check: FAILED");
+ return 1;
+ }
+ return 0;
+}
+
+//
+// Return
+// 0 Ok
+// 1 Invalid
+// 2 Force rebuild
+// 3 skip
+
+static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size, size_t journal_v1_file_size)
+{
+ int rc;
+ uLong crc;
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
+ return 2;
+
+ if (j2_header->magic == JOURVAL_V2_SKIP_MAGIC)
+ return 3;
+
+ // Magic failure
+ if (j2_header->magic != JOURVAL_V2_MAGIC)
+ return 1;
+
+ if (j2_header->journal_v2_file_size != journal_v2_file_size)
+ return 1;
+
+ if (journal_v1_file_size && j2_header->journal_v1_file_size != journal_v1_file_size)
+ return 1;
+
+ journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + journal_v2_file_size - sizeof(*journal_v2_trailer));
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
+
+ rc = crc32cmp(journal_v2_trailer->checksum, crc);
+ if (unlikely(rc)) {
+ error("DBENGINE: file CRC32 check: FAILED");
+ return 1;
+ }
+
+ rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
+ if (rc) return 1;
+
+ rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
+ if (rc) return 1;
+
+ if (!db_engine_journal_check)
+ return 0;
+
+ // Verify complete UUID chain
+
+ struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
+
+ unsigned verified = 0;
+ unsigned entries;
+ unsigned total_pages = 0;
+
+ info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
+ for (entries = 0; entries < j2_header->metric_count; entries++) {
+
+ char uuid_str[UUID_STR_LEN];
+ uuid_unparse_lower(metric->uuid, uuid_str);
+ struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
+ struct journal_page_header local_metric_list_header = *metric_list_header;
+
+ local_metric_list_header.crc = JOURVAL_V2_MAGIC;
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
+ rc = crc32cmp(metric_list_header->checksum, crc);
+
+ if (!rc) {
+ struct journal_v2_block_trailer *journal_trailer =
+ (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
+ rc = crc32cmp(journal_trailer->checksum, crc);
+ internal_error(rc, "DBENGINE: index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
+ crc, metric_list_header->crc);
+ if (!rc) {
+ total_pages += metric_list_header->entries;
+ verified++;
+ }
+ }
+
+ metric++;
+ if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
+ info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
+ return 1;
+ }
+ }
+
+ if (entries != verified) {
+ info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
+ return 1;
+ }
+ info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
+
+ return 0;
+}
+
+void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) {
+ usec_t started_ut = now_monotonic_usec();
+
+ size_t data_size = 0;
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0);
+ if(!j2_header)
+ return;
+
+ uint8_t *data_start = (uint8_t *)j2_header;
+ uint32_t entries = j2_header->metric_count;
+
+ struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
+ time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+ time_t now_s = max_acceptable_collected_time();
+ for (size_t i=0; i < entries; i++) {
+ time_t start_time_s = header_start_time_s + metric->delta_start_s;
+ time_t end_time_s = header_start_time_s + metric->delta_end_s;
+ time_t update_every_s = (metric->entries > 1) ? ((end_time_s - start_time_s) / (entries - 1)) : 0;
+ update_metric_retention_and_granularity_by_uuid(
+ ctx, &metric->uuid, start_time_s, end_time_s, update_every_s, now_s);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
+ fatal_assert(uuid_compare(metric_list_header->uuid, metric->uuid) == 0);
+ fatal_assert(metric->entries == metric_list_header->entries);
+#endif
+ metric++;
+ }
+
+ journalfile_v2_data_release(journalfile);
+ usec_t ended_ut = now_monotonic_usec();
+
+ info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
+ , ctx->config.tier, journalfile->datafile->fileno
+ , (double)data_size / 1024 / 1024
+ , (double)entries / 1000
+ , ((double)(ended_ut - started_ut) / USEC_PER_MS)
+ );
+}
+
+int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ int ret, fd;
+ char path_v1[RRDENG_PATH_MAX];
+ char path_v2[RRDENG_PATH_MAX];
+ struct stat statbuf;
+ size_t journal_v1_file_size = 0;
+ size_t journal_v2_file_size;
+
+ journalfile_v1_generate_path(datafile, path_v1, sizeof(path_v1));
+ ret = stat(path_v1, &statbuf);
+ if (!ret)
+ journal_v1_file_size = (uint32_t)statbuf.st_size;
+
+ journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
+ fd = open(path_v2, O_RDONLY);
+ if (fd < 0) {
+ if (errno == ENOENT)
+ return 1;
+ ctx_fs_error(ctx);
+ error("DBENGINE: failed to open '%s'", path_v2);
+ return 1;
+ }
+
+ ret = fstat(fd, &statbuf);
+ if (ret) {
+ error("DBENGINE: failed to get file information for '%s'", path_v2);
+ close(fd);
+ return 1;
+ }
+
+ journal_v2_file_size = (size_t)statbuf.st_size;
+
+ if (journal_v2_file_size < sizeof(struct journal_v2_header)) {
+ error_report("Invalid file %s. Not the expected size", path_v2);
+ close(fd);
+ return 1;
+ }
+
+ usec_t mmap_start_ut = now_monotonic_usec();
+ uint8_t *data_start = mmap(NULL, journal_v2_file_size, PROT_READ, MAP_SHARED, fd, 0);
+ if (data_start == MAP_FAILED) {
+ close(fd);
+ return 1;
+ }
+
+ info("DBENGINE: checking integrity of '%s'", path_v2);
+ usec_t validation_start_ut = now_monotonic_usec();
+ int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
+ if (unlikely(rc)) {
+ if (rc == 2)
+ error_report("File %s needs to be rebuilt", path_v2);
+ else if (rc == 3)
+ error_report("File %s will be skipped", path_v2);
+ else
+ error_report("File %s is invalid and it will be rebuilt", path_v2);
+
+ if (unlikely(munmap(data_start, journal_v2_file_size)))
+ error("DBENGINE: failed to unmap '%s'", path_v2);
+
+ close(fd);
+ return rc;
+ }
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ uint32_t entries = j2_header->metric_count;
+
+ if (unlikely(!entries)) {
+ if (unlikely(munmap(data_start, journal_v2_file_size)))
+ error("DBENGINE: failed to unmap '%s'", path_v2);
+
+ close(fd);
+ return 1;
+ }
+
+ usec_t finished_ut = now_monotonic_usec();
+
+ info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
+ "mmap: %0.2f ms, validate: %0.2f ms"
+ , path_v2
+ , (double)journal_v2_file_size / 1024 / 1024
+ , (double)entries / 1000
+ , ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS)
+ , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS)
+ );
+
+ // Initialize the journal file to be able to access the data
+ journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
+
+ ctx_current_disk_space_increase(ctx, journal_v2_file_size);
+
+ // File is OK load it
+ return 0;
+}
+
+struct journal_metric_list_to_sort {
+ struct jv2_metrics_info *metric_info;
+};
+
+static int journalfile_metric_compare (const void *item1, const void *item2)
+{
+ const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
+ const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
+
+ return uuid_compare(*(metric1->uuid), *(metric2->uuid));
+}
+
+
+// Write list of extents for the journalfile
+void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
+{
+ Pvoid_t *PValue;
+ struct journal_extent_list *j2_extent_base = (void *) data;
+ struct jv2_extents_info *ext_info;
+
+ bool first = true;
+ Word_t pos = 0;
+ size_t count = 0;
+ while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &pos, &first))) {
+ ext_info = *PValue;
+ size_t index = ext_info->index;
+ j2_extent_base[index].file_index = 0;
+ j2_extent_base[index].datafile_offset = ext_info->pos;
+ j2_extent_base[index].datafile_size = ext_info->bytes;
+ j2_extent_base[index].pages = ext_info->number_of_pages;
+ count++;
+ }
+ return j2_extent_base + count;
+}
+
+static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
+{
+ if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->journal_v2_file_size - sizeof(struct journal_v2_block_trailer)))
+ return 1;
+
+ return 0;
+}
+
+void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
+{
+ struct journal_metric_list *metric = (void *) data;
+
+ if (journalfile_verify_space(j2_header, data, sizeof(*metric)))
+ return NULL;
+
+ uuid_copy(metric->uuid, *metric_info->uuid);
+ metric->entries = metric_info->number_of_pages;
+ metric->page_offset = pages_offset;
+ metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
+ metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
+
+ return ++metric;
+}
+
+void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
+{
+ struct journal_page_header *data_page_header = (void *) data;
+ uLong crc;
+
+ uuid_copy(data_page_header->uuid, *metric_info->uuid);
+ data_page_header->entries = metric_info->number_of_pages;
+ data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory
+ data_page_header->crc = JOURVAL_V2_MAGIC;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header));
+ crc32set(data_page_header->checksum, crc);
+ return ++data_page_header;
+}
+
+void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
+{
+ struct journal_page_header *data_page_header = (void *) page_header;
+ struct journal_v2_block_trailer *journal_trailer = (void *) data;
+ uLong crc;
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list));
+ crc32set(journal_trailer->checksum, crc);
+ return ++journal_trailer;
+}
+
+void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
+{
+ struct journal_page_list *data_page = data;
+
+ if (journalfile_verify_space(j2_header, data, sizeof(*data_page)))
+ return NULL;
+
+ struct extent_io_data *ei = page_info->custom_data;
+
+ data_page->delta_start_s = (uint32_t) (page_info->start_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
+ data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
+ data_page->extent_index = page_info->extent_index;
+
+ data_page->update_every_s = page_info->update_every_s;
+ data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
+ data_page->type = 0;
+
+ return ++data_page;
+}
+
+// Must be recorded in metric_info->entries
+void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info)
+{
+ Pvoid_t *PValue;
+
+ struct journal_page_list *data_page = (void *)data;
+ // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s
+ // that belong to this journal file
+ Pvoid_t JudyL_array = metric_info->JudyL_pages_by_start_time;
+
+ Word_t index_time = 0;
+ bool first = true;
+ struct jv2_page_info *page_info;
+ while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
+ page_info = *PValue;
+ // Write one descriptor and return the next data page location
+ data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
+ if (NULL == data_page)
+ break;
+ }
+ return data_page;
+}
+
+// Migrate the journalfile pointed by datafile
+// activate : make the new file active immediately
+// journafile data will be set and descriptors (if deleted) will be repopulated as needed
+// startup : if the migration is done during agent startup
+// this will allow us to optimize certain things
+
+void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
+ Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
+ size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
+{
+ char path[RRDENG_PATH_MAX];
+ Pvoid_t *PValue;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
+ struct rrdengine_journalfile *journalfile = (struct rrdengine_journalfile *) user_data;
+ struct rrdengine_datafile *datafile = journalfile->datafile;
+ time_t min_time_s = LONG_MAX;
+ time_t max_time_s = 0;
+ struct jv2_metrics_info *metric_info;
+
+ journalfile_v2_generate_path(datafile, path, sizeof(path));
+
+ info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
+ path,
+ number_of_extents,
+ number_of_metrics,
+ number_of_pages);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ usec_t start_loading = now_monotonic_usec();
+#endif
+
+ size_t total_file_size = 0;
+ total_file_size += (sizeof(struct journal_v2_header) + JOURNAL_V2_HEADER_PADDING_SZ);
+
+ // Extents will start here
+ uint32_t extent_offset = total_file_size;
+ total_file_size += (number_of_extents * sizeof(struct journal_extent_list));
+
+ uint32_t extent_offset_trailer = total_file_size;
+ total_file_size += sizeof(struct journal_v2_block_trailer);
+
+ // UUID list will start here
+ uint32_t metrics_offset = total_file_size;
+ total_file_size += (number_of_metrics * sizeof(struct journal_metric_list));
+
+ // UUID list trailer
+ uint32_t metric_offset_trailer = total_file_size;
+ total_file_size += sizeof(struct journal_v2_block_trailer);
+
+ // descr @ time will start here
+ uint32_t pages_offset = total_file_size;
+ total_file_size += (number_of_pages * (sizeof(struct journal_page_list) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)));
+
+ // File trailer
+ uint32_t trailer_offset = total_file_size;
+ total_file_size += sizeof(struct journal_v2_block_trailer);
+
+ int fd_v2;
+ uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2);
+ uint8_t *data = data_start;
+
+ memset(data_start, 0, extent_offset);
+
+ // Write header
+ struct journal_v2_header j2_header;
+ memset(&j2_header, 0, sizeof(j2_header));
+
+ j2_header.magic = JOURVAL_V2_MAGIC;
+ j2_header.start_time_ut = 0;
+ j2_header.end_time_ut = 0;
+ j2_header.extent_count = number_of_extents;
+ j2_header.extent_offset = extent_offset;
+ j2_header.metric_count = number_of_metrics;
+ j2_header.metric_offset = metrics_offset;
+ j2_header.page_count = number_of_pages;
+ j2_header.page_offset = pages_offset;
+ j2_header.extent_trailer_offset = extent_offset_trailer;
+ j2_header.metric_trailer_offset = metric_offset_trailer;
+ j2_header.journal_v2_file_size = total_file_size;
+ j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
+ j2_header.data = data_start; // Used during migration
+
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
+ internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+
+ fatal_assert(data == data_start + extent_offset_trailer);
+
+ // Calculate CRC for extents
+ journal_v2_trailer = (struct journal_v2_block_trailer *) (data_start + extent_offset_trailer);
+ uLong crc;
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
+ crc32set(journal_v2_trailer->checksum, crc);
+
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+ // Skip the trailer, point to the metrics off
+ data += sizeof(struct journal_v2_block_trailer);
+
+ // Sanity check -- we must be at the metrics_offset
+ fatal_assert(data == data_start + metrics_offset);
+
+ // Allocate array to sort UUIDs and keep them sorted in the journal because we want to do binary search when we do lookups
+ struct journal_metric_list_to_sort *uuid_list = mallocz(number_of_metrics * sizeof(struct journal_metric_list_to_sort));
+
+ Word_t Index = 0;
+ size_t count = 0;
+ bool first_then_next = true;
+ while ((PValue = JudyLFirstThenNext(JudyL_metrics, &Index, &first_then_next))) {
+ metric_info = *PValue;
+
+ fatal_assert(count < number_of_metrics);
+ uuid_list[count++].metric_info = metric_info;
+ min_time_s = MIN(min_time_s, metric_info->first_time_s);
+ max_time_s = MAX(max_time_s, metric_info->last_time_s);
+ }
+
+ // Store in the header
+ j2_header.start_time_ut = min_time_s * USEC_PER_SEC;
+ j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
+
+ qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
+ internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+
+ uint32_t resize_file_to = total_file_size;
+
+ for (Index = 0; Index < number_of_metrics; Index++) {
+ metric_info = uuid_list[Index].metric_info;
+
+ // Calculate current UUID offset from start of file. We will store this in the data page header
+ uint32_t uuid_offset = data - data_start;
+
+ // Write the UUID we are processing
+ data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
+ if (unlikely(!data))
+ break;
+
+ // Next we will write
+ // Header
+ // Detailed entries (descr @ time)
+ // Trailer (checksum)
+
+ // Keep the page_list_header, to be used for migration when where agent is running
+ metric_info->page_list_header = pages_offset;
+ // Write page header
+ void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info,
+ uuid_offset);
+
+ // Start writing descr @ time
+ void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info);
+ if (unlikely(!page_trailer))
+ break;
+
+ // Trailer (checksum)
+ uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer,
+ data_start + pages_offset);
+
+ // Calculate start of the pages start for next descriptor
+ pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer));
+ // Verify we are at the right location
+ if (pages_offset != (uint32_t)(next_page_address - data_start)) {
+ // make sure checks fail so that we abort
+ data = data_start;
+ break;
+ }
+ }
+
+ if (data == data_start + metric_offset_trailer) {
+ internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+
+ // Calculate CRC for metrics
+ journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
+ crc = crc32(0L, Z_NULL, 0);
+ crc =
+ crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
+ crc32set(journal_v2_trailer->checksum, crc);
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+
+ // Prepare to write checksum for the file
+ j2_header.data = NULL;
+ journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + trailer_offset);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (void *)&j2_header, sizeof(j2_header));
+ crc32set(journal_v2_trailer->checksum, crc);
+
+ // Write header to the file
+ memcpy(data_start, &j2_header, sizeof(j2_header));
+
+ internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+
+ info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
+
+ // msync(data_start, total_file_size, MS_SYNC);
+ journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
+
+ internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
+ ctx_current_disk_space_increase(ctx, total_file_size);
+ freez(uuid_list);
+ return;
+ }
+ else {
+ info("DBENGINE: failed to build index '%s', file will be skipped", path);
+ j2_header.data = NULL;
+ j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
+ memcpy(data_start, &j2_header, sizeof(j2_header));
+ resize_file_to = sizeof(j2_header);
+ }
+
+ netdata_munmap(data_start, total_file_size);
+ freez(uuid_list);
+
+ if (likely(resize_file_to == total_file_size))
+ return;
+
+ int ret = truncate(path, (long) resize_file_to);
+ if (ret < 0) {
+ ctx_current_disk_space_increase(ctx, total_file_size);
+ ctx_fs_error(ctx);
+ error("DBENGINE: failed to resize file '%s'", path);
+ }
+ else
+ ctx_current_disk_space_increase(ctx, resize_file_to);
+}
+
+int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
{
uv_fs_t req;
uv_file file;
int ret, fd, error;
uint64_t file_size, max_id;
char path[RRDENG_PATH_MAX];
+ bool loaded_v2 = false;
+
+ // Do not try to load jv2 of the latest file
+ if (datafile->fileno != ctx_last_fileno_get(ctx))
+ loaded_v2 = journalfile_v2_load(ctx, journalfile, datafile) == 0;
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = open_file_direct_io(path, O_RDWR, &file);
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
+
+ fd = open_file_for_io(path, O_RDWR, &file, use_direct_io);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
+
+ if(loaded_v2)
+ return 0;
+
return fd;
}
- info("Loading journal file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
- if (ret)
- goto error;
- file_size = ALIGN_BYTES_FLOOR(file_size);
+ if (ret) {
+ error = ret;
+ goto cleanup;
+ }
- ret = check_journal_file_superblock(file);
- if (ret)
- goto error;
- ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
- ++ctx->stats.io_read_requests;
+ if(loaded_v2) {
+ journalfile->unsafe.pos = file_size;
+ error = 0;
+ goto cleanup;
+ }
+ file_size = ALIGN_BYTES_FLOOR(file_size);
+ journalfile->unsafe.pos = file_size;
journalfile->file = file;
- journalfile->pos = file_size;
- journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0);
- info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
- max_id = iterate_transactions(ctx, journalfile);
+ ret = journalfile_check_superblock(file);
+ if (ret) {
+ info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
+ error = ret;
+ goto cleanup;
+ }
+ ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
+
+ info("DBENGINE: loading journal file '%s'", path);
- ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+ max_id = journalfile_iterate_transactions(ctx, journalfile);
+
+ __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
+
+ info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
+
+ bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
+ if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
+ ctx->loading.create_new_datafile_pair = false;
+ return 0;
+ }
+
+ pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
+ journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
+
+ if (is_last_file)
+ ctx->loading.create_new_datafile_pair = true;
- info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
- if (likely(journalfile->data))
- netdata_munmap(journalfile->data, file_size);
return 0;
- error:
- error = ret;
+cleanup:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
return error;
}
-
-void init_commit_log(struct rrdengine_instance *ctx)
-{
- ctx->commit_log.buf = NULL;
- ctx->commit_log.buf_pos = 0;
- ctx->commit_log.transaction_id = 1;
-}