summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c379
1 files changed, 240 insertions, 139 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 9998ee540..24d3c1c6d 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,57 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-
-// DBENGINE2: Helper
-
-static void update_metric_retention_and_granularity_by_uuid(
- struct rrdengine_instance *ctx, uuid_t *uuid,
- time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
-{
- if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
- last_time_s = now_s;
- }
-
- if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
-
- first_time_s = last_time_s;
- }
-
- if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
- }
-
- bool added = false;
- METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
- if (!metric) {
- MRG_ENTRY entry = {
- .section = (Word_t) ctx,
- .first_time_s = first_time_s,
- .last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
- };
- uuid_copy(entry.uuid, *uuid);
- metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
- }
-
- if (likely(!added))
- mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
-
- mrg_metric_release(main_mrg, metric);
-}
-
static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -60,12 +9,12 @@ static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
struct generic_io_descriptor *io_descr = &wal->io_descr;
struct rrdengine_instance *ctx = io_descr->ctx;
- debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
ctx_io_error(ctx);
- error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
- debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
@@ -92,10 +41,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ spinlock_lock(&journalfile->unsafe.spinlock);
io_descr->pos = journalfile->unsafe.pos;
journalfile->unsafe.pos += wal->buf_size;
- netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ spinlock_unlock(&journalfile->unsafe.spinlock);
io_descr->req.data = wal;
io_descr->data = journalfile;
@@ -122,10 +71,129 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
+// ----------------------------------------------------------------------------
+
+struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) {
+ struct rrdengine_datafile *datafile = NULL;
+
+ rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock);
+
+ Pvoid_t *PValue = NULL;
+
+ if(unlikely(!s->init)) {
+ s->init = true;
+ s->last = s->wanted_start_time_s;
+
+ PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ s->last = 0;
+ PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue)
+ s->last = s->wanted_start_time_s;
+ }
+ }
+
+ while(1) {
+ if (likely(!PValue)) {
+ PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ // cannot find anything after that point
+ datafile = NULL;
+ break;
+ }
+ }
+
+ datafile = *PValue;
+ TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s,
+ datafile->journalfile->v2.last_time_s,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+
+ if(rc == PAGE_IS_IN_RANGE) {
+ // this is good to return
+ break;
+ }
+ else if(rc == PAGE_IS_IN_THE_PAST) {
+ // continue to get the next
+ datafile = NULL;
+ PValue = NULL;
+ continue;
+ }
+ else /* PAGE_IS_IN_THE_FUTURE */ {
+ // we finished - no more datafiles
+ datafile = NULL;
+ PValue = NULL;
+ break;
+ }
+ }
+
+ if(datafile)
+ s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+ else
+ s->j2_header_acquired = NULL;
+
+ rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock);
+
+ return datafile;
+}
+
+static void njfv2idx_add(struct rrdengine_datafile *datafile) {
+ internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+ datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s;
+
+ do {
+ internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed");
+
+ Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if (unlikely(*PValue)) {
+ // already there
+ datafile->journalfile->njfv2idx.indexed_as++;
+ }
+ else {
+ *PValue = datafile;
+ break;
+ }
+ } while(0);
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+static void njfv2idx_remove(struct rrdengine_datafile *datafile) {
+ internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+
+ int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ (void)rc;
+ internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry");
+
+ datafile->journalfile->njfv2idx.indexed_as = 0;
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+// ----------------------------------------------------------------------------
+
static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
struct journal_v2_header *j2_header = NULL;
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!journalfile->mmap.data) {
journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
@@ -136,9 +204,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->mmap.data = NULL;
journalfile->mmap.size = 0;
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -147,12 +215,21 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
- madvise_random(journalfile->mmap.data, journalfile->mmap.size);
- madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ JOURNALFILE_FLAGS flags = journalfile->v2.flags;
+ spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) {
+ // we need the entire metrics directory into memory to process it
+ madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory);
+ }
+ else {
+ // let the kernel know that we don't want read-ahead on this file
+ madvise_random(journalfile->mmap.data, journalfile->mmap.size);
+ // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
+ }
}
}
@@ -163,7 +240,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
*data_size = journalfile->mmap.size;
}
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return j2_header;
}
@@ -173,20 +250,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if(!have_locks) {
if(!wait) {
- if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
+ if (!spinlock_trylock(&journalfile->mmap.spinlock))
return false;
}
else
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!wait) {
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ if(!spinlock_trylock(&journalfile->v2.spinlock)) {
+ spinlock_unlock(&journalfile->mmap.spinlock);
return false;
}
}
else
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
}
if(!journalfile->v2.refcount) {
@@ -194,7 +271,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
char path[RRDENG_PATH_MAX];
journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
- error("DBENGINE: failed to unmap index file '%s'", path);
+ netdata_log_error("DBENGINE: failed to unmap index file '%s'", path);
internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -209,8 +286,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
}
if(!have_locks) {
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
}
return unmounted;
@@ -230,7 +307,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
+ if(!spinlock_trylock(&journalfile->v2.spinlock))
continue;
bool unmount = false;
@@ -244,7 +321,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
// 2 minutes have passed since last use
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if (unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, false);
@@ -254,7 +331,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
}
struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
@@ -276,7 +353,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(do_we_need_it)
return journalfile_v2_mounted_data_get(journalfile, data_size);
@@ -285,7 +362,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
@@ -300,7 +377,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, true);
@@ -308,25 +385,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
return has_data;
}
size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
size_t data_size = journalfile->mmap.size;
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return data_size;
}
void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
@@ -341,22 +418,27 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd,
struct journal_v2_header *j2_header = journalfile->mmap.data;
journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
+ journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list);
journalfile_v2_mounted_data_unmount(journalfile, true, true);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
+
+ njfv2idx_add(journalfile->datafile);
}
static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ njfv2idx_remove(journalfile->datafile);
+
bool has_references = false;
do {
if (has_references)
sleep_usec(10 * USEC_PER_MS);
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
if(journalfile->mmap.fd != -1)
@@ -374,8 +456,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *
internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
} while(has_references);
}
@@ -384,9 +466,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
{
struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
- netdata_spinlock_init(&journalfile->mmap.spinlock);
- netdata_spinlock_init(&journalfile->v2.spinlock);
- netdata_spinlock_init(&journalfile->unsafe.spinlock);
+ spinlock_init(&journalfile->mmap.spinlock);
+ spinlock_init(&journalfile->v2.spinlock);
+ spinlock_init(&journalfile->unsafe.spinlock);
journalfile->mmap.fd = -1;
datafile->journalfile = journalfile;
return journalfile;
@@ -401,7 +483,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(datafile->ctx);
}
uv_fs_req_cleanup(&req);
@@ -430,7 +512,7 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -454,7 +536,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
if (journalfile->file) {
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -464,14 +546,14 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
// This is the new journal v2 index file
ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -516,7 +598,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -548,7 +630,7 @@ static int journalfile_check_superblock(uv_file file)
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -557,7 +639,7 @@ static int journalfile_check_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- error("DBENGINE: File has invalid superblock.");
+ netdata_log_error("DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -569,7 +651,7 @@ static int journalfile_check_superblock(uv_file file)
static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
{
- static BITMAP256 page_error_map;
+ static BITMAP256 page_error_map = BITMAP256_INITIALIZER;
unsigned i, count, payload_length, descr_size;
struct rrdeng_jf_store_data *jf_metric_data;
@@ -578,7 +660,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
descr_size = sizeof(*jf_metric_data->descr) * count;
payload_length = sizeof(*jf_metric_data) + descr_size;
if (payload_length > max_size) {
- error("DBENGINE: corrupted transaction payload.");
+ netdata_log_error("DBENGINE: corrupted transaction payload.");
return;
}
@@ -589,7 +671,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
if (page_type > PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
- error("DBENGINE: unknown page type %d encountered.", page_type);
+ netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
}
continue;
@@ -658,36 +740,36 @@ static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, s
*id = 0;
jf_header = buf;
if (STORE_PADDING == jf_header->type) {
- debug(D_RRDENGINE, "Skipping padding.");
+ netdata_log_debug(D_RRDENGINE, "Skipping padding.");
return 0;
}
if (sizeof(*jf_header) > max_size) {
- error("DBENGINE: corrupted transaction record, skipping.");
+ netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
*id = jf_header->id;
payload_length = jf_header->payload_length;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
if (size_bytes > max_size) {
- error("DBENGINE: corrupted transaction record, skipping.");
+ netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
jf_trailer = buf + sizeof(*jf_header) + payload_length;
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
ret = crc32cmp(jf_trailer->checksum, crc);
- debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
- error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
case STORE_DATA:
- debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
- error("DBENGINE: unknown transaction type, skipping record.");
+ netdata_log_error("DBENGINE: unknown transaction type, skipping record.");
break;
}
@@ -725,7 +807,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
iov = uv_buf_init(buf, size_bytes);
ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto skip_file;
}
@@ -764,7 +846,7 @@ static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("DBENGINE: extent list CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: extent list CRC32 check: FAILED");
return 1;
}
@@ -784,7 +866,7 @@ static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("DBENGINE: metric list CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: metric list CRC32 check: FAILED");
return 1;
}
return 0;
@@ -828,19 +910,19 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
rc = crc32cmp(journal_v2_trailer->checksum, crc);
if (unlikely(rc)) {
- error("DBENGINE: file CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: file CRC32 check: FAILED");
return 1;
}
rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
if (rc) return 1;
- rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
- if (rc) return 1;
-
if (!db_engine_journal_check)
return 0;
+ rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
+ if (rc) return 1;
+
// Verify complete UUID chain
struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
@@ -849,7 +931,7 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
unsigned entries;
unsigned total_pages = 0;
- info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
+ netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
for (entries = 0; entries < j2_header->metric_count; entries++) {
char uuid_str[UUID_STR_LEN];
@@ -880,16 +962,16 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
metric++;
if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
- info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
+ netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
return 1;
}
}
if (entries != verified) {
- info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
+ netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
return 1;
}
- info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
+ netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
return 0;
}
@@ -905,15 +987,25 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
uint8_t *data_start = (uint8_t *)j2_header;
uint32_t entries = j2_header->metric_count;
+ if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK;
+ if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE;
+ // needs rebuild
+ return;
+ }
+ }
+
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+ time_t global_first_time_s = header_start_time_s;
time_t now_s = max_acceptable_collected_time();
for (size_t i=0; i < entries; i++) {
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
- update_metric_retention_and_granularity_by_uuid(
- ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
metric++;
}
@@ -921,12 +1013,18 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
journalfile_v2_data_release(journalfile);
usec_t ended_ut = now_monotonic_usec();
- info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
+ netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
, ctx->config.tier, journalfile->datafile->fileno
, (double)data_size / 1024 / 1024
, (double)entries / 1000
, ((double)(ended_ut - started_ut) / USEC_PER_MS)
);
+
+ time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);;
+ do {
+ if(old <= global_first_time_s)
+ break;
+ } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
}
int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -949,13 +1047,13 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (errno == ENOENT)
return 1;
ctx_fs_error(ctx);
- error("DBENGINE: failed to open '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to open '%s'", path_v2);
return 1;
}
ret = fstat(fd, &statbuf);
if (ret) {
- error("DBENGINE: failed to get file information for '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2);
close(fd);
return 1;
}
@@ -975,7 +1073,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
return 1;
}
- info("DBENGINE: checking integrity of '%s'", path_v2);
+ netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2);
usec_t validation_start_ut = now_monotonic_usec();
int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
if (unlikely(rc)) {
@@ -987,7 +1085,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
error_report("File %s is invalid and it will be rebuilt", path_v2);
if (unlikely(munmap(data_start, journal_v2_file_size)))
- error("DBENGINE: failed to unmap '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
close(fd);
return rc;
@@ -998,7 +1096,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (unlikely(!entries)) {
if (unlikely(munmap(data_start, journal_v2_file_size)))
- error("DBENGINE: failed to unmap '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
close(fd);
return 1;
@@ -1006,7 +1104,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
usec_t finished_ut = now_monotonic_usec();
- info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
+ netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
"mmap: %0.2f ms, validate: %0.2f ms"
, path_v2
, (double)journal_v2_file_size / 1024 / 1024
@@ -1016,6 +1114,9 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
);
// Initialize the journal file to be able to access the data
+
+ if (!db_engine_journal_check)
+ journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK;
journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
ctx_current_disk_space_increase(ctx, journal_v2_file_size);
@@ -1179,7 +1280,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
journalfile_v2_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
+ netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
path,
number_of_extents,
number_of_metrics,
@@ -1350,7 +1451,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
- info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
+ netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
// msync(data_start, total_file_size, MS_SYNC);
journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
@@ -1361,7 +1462,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
return;
}
else {
- info("DBENGINE: failed to build index '%s', file will be skipped", path);
+ netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path);
j2_header.data = NULL;
j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
memcpy(data_start, &j2_header, sizeof(j2_header));
@@ -1378,7 +1479,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
if (ret < 0) {
ctx_current_disk_space_increase(ctx, total_file_size);
ctx_fs_error(ctx);
- error("DBENGINE: failed to resize file '%s'", path);
+ netdata_log_error("DBENGINE: failed to resize file '%s'", path);
}
else
ctx_current_disk_space_increase(ctx, resize_file_to);
@@ -1428,19 +1529,19 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
ret = journalfile_check_superblock(file);
if (ret) {
- info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
+ netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
error = ret;
goto cleanup;
}
ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
- info("DBENGINE: loading journal file '%s'", path);
+ netdata_log_info("DBENGINE: loading journal file '%s'", path);
max_id = journalfile_iterate_transactions(ctx, journalfile);
__atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
- info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
+ netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
@@ -1459,7 +1560,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
cleanup:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);