diff options
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 39 |
1 files changed, 31 insertions, 8 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index fac680aa0..9fecc48ff 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -52,7 +52,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) io_descr->iov = uv_buf_init((void *)io_descr->buf, size); ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, journalfile->pos, flush_transaction_buffer_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); journalfile->pos += RRDENG_BLOCK_SIZE; ctx->disk_space += RRDENG_BLOCK_SIZE; ctx->commit_log.buf = NULL; @@ -64,9 +64,9 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s { struct rrdengine_instance *ctx = wc->ctx; int ret; - unsigned buf_pos, buf_size; + unsigned buf_pos = 0, buf_size; - assert(size); + fatal_assert(size); if (ctx->commit_log.buf) { unsigned remaining; @@ -125,6 +125,29 @@ int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengi return ret; } +int unlink_journal_file(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; @@ -194,7 +217,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - assert(req.result < 0); + fatal_assert(req.result < 0); error("uv_fs_write: %s", uv_strerror(ret)); ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); @@ -232,7 +255,7 @@ static int check_journal_file_superblock(uv_file file) uv_fs_req_cleanup(&req); goto error; } - assert(req.result >= 0); + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || @@ -275,7 +298,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden for (i = 0, valid_pages = 0 ; i < count ; ++i) { uuid_t *temp_id; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; if (PAGE_METRICS != jf_metric_data->descr[i].type) { error("Unknown page type encountered."); @@ -293,7 +316,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden /* First time we see the UUID */ uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); - assert(NULL == *PValue); /* TODO: figure out concurrency model */ + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; @@ -408,7 +431,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde fatal("uv_fs_read: %s", uv_strerror(ret)); /*uv_fs_req_cleanup(&req);*/ } - assert(req.result >= 0); + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); ctx->stats.io_read_bytes += size_bytes; ++ctx->stats.io_read_requests; |