summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c117
1 files changed, 69 insertions, 48 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 44d8461db..30eaa0ec6 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(io_descr->buf);
- free(io_descr);
+ freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
@@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
@@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->datafile = datafile;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
- exit(ret);
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- return 0;
+ return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -143,21 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -170,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
- journalfile->file = file;
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.journalfile_creations;
return 0;
}
@@ -226,7 +247,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned i, count, payload_length, descr_size, valid_pages;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
struct extent_info *extent;
/* persistent structures */
struct rrdeng_jf_store_data *jf_metric_data;
@@ -271,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
@@ -406,25 +429,17 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
{
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size, max_id;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Loading journal file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -449,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
void init_commit_log(struct rrdengine_instance *ctx)