summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c39
1 files changed, 31 insertions, 8 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index fac680aa0..9fecc48ff 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -52,7 +52,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
journalfile->pos, flush_transaction_buffer_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
journalfile->pos += RRDENG_BLOCK_SIZE;
ctx->disk_space += RRDENG_BLOCK_SIZE;
ctx->commit_log.buf = NULL;
@@ -64,9 +64,9 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
{
struct rrdengine_instance *ctx = wc->ctx;
int ret;
- unsigned buf_pos, buf_size;
+ unsigned buf_pos = 0, buf_size;
- assert(size);
+ fatal_assert(size);
if (ctx->commit_log.buf) {
unsigned remaining;
@@ -125,6 +125,29 @@ int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengi
return ret;
}
+int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+{
+ struct rrdengine_datafile *datafile = journalfile->datafile;
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
@@ -194,7 +217,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- assert(req.result < 0);
+ fatal_assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
@@ -232,7 +255,7 @@ static int check_journal_file_superblock(uv_file file)
uv_fs_req_cleanup(&req);
goto error;
}
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
@@ -275,7 +298,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
for (i = 0, valid_pages = 0 ; i < count ; ++i) {
uuid_t *temp_id;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
if (PAGE_METRICS != jf_metric_data->descr[i].type) {
error("Unknown page type encountered.");
@@ -293,7 +316,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
/* First time we see the UUID */
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
- assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
@@ -408,7 +431,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
fatal("uv_fs_read: %s", uv_strerror(ret));
/*uv_fs_req_cleanup(&req);*/
}
- assert(req.result >= 0);
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
ctx->stats.io_read_bytes += size_bytes;
++ctx->stats.io_read_requests;