diff options
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 117 |
1 files changed, 69 insertions, 48 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 44d8461db..30eaa0ec6 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) uv_fs_req_cleanup(req); free(io_descr->buf); - free(io_descr); + freez(io_descr); } /* Careful to always call this before creating a new journal file */ @@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); @@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->datafile = datafile; } +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); - exit(ret); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - return 0; + return ret; } int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -143,21 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng int ret, fd; struct rrdeng_jf_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, - S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif + journalfile->file = file; + ++ctx->stats.journalfile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -170,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } - journalfile->file = file; journalfile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.journalfile_creations; return 0; } @@ -226,7 +247,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden { struct page_cache *pg_cache = &ctx->pg_cache; unsigned i, count, payload_length, descr_size, valid_pages; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; struct extent_info *extent; /* persistent structures */ struct rrdeng_jf_store_data *jf_metric_data; @@ -271,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } @@ -406,25 +429,17 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi { uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size, max_id; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); - uv_fs_req_cleanup(&req); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif info("Loading journal file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); @@ -449,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } void init_commit_log(struct rrdengine_instance *ctx) |