diff options
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r-- | database/engine/datafile.c | 224 |
1 files changed, 157 insertions, 67 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 2d17d05e..8ef4ed59 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->ctx = ctx; } -static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + + int destroy_data_file(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.datafile_deletions; - return 0; + return ret; } int create_data_file(struct rrdengine_datafile *datafile) @@ -97,21 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile) int ret, fd; struct rrdeng_df_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, - S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif + datafile->file = file; + ++ctx->stats.datafile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -125,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } - datafile->file = file; datafile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.datafile_creations; return 0; } @@ -182,25 +205,17 @@ static int load_data_file(struct rrdengine_datafile *datafile) struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); - uv_fs_req_cleanup(&req); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } - assert(req.result >= 0); - file = req.result; - uv_fs_req_cleanup(&req); -#ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); - fcntl(fd, F_NOCACHE, 1); -#endif info("Initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); @@ -221,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile) return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } static int scan_data_files_cmp(const void *a, const void *b) { struct rrdengine_datafile *file1, *file2; - char path1[1024], path2[1024]; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; file1 = *(struct rrdengine_datafile **)a; file2 = *(struct rrdengine_datafile **)b; @@ -238,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b) return strcmp(path1, path2); } -/* Returns number of datafiles that were loaded */ +/* Returns number of datafiles that were loaded or < 0 on error */ static int scan_data_files(struct rrdengine_instance *ctx) { int ret; @@ -249,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); - assert(ret >= 0); - assert(req.result >= 0); + if (ret < 0) { + assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } info("Found %d files in path %s", ret, ctx->dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s\"", dent.name); + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s\"", dent.name); + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -266,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx) } uv_fs_req_cleanup(&req); + if (0 == matched_files) { + freez(datafiles); + return 0; + } if (matched_files == MAX_DATAFILES) { error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); } qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - free(datafile); + freez(datafile); ++failed_to_load; - continue; + break; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - free(datafile); - free(journalfile); + close_data_file(datafile); + freez(datafile); + freez(journalfile); ++failed_to_load; - continue; + break; } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + freez(datafiles); if (failed_to_load) { - error("%u files failed to load.", failed_to_load); + error("%u datafiles failed to load.", failed_to_load); + finalize_data_files(ctx); + return UV_EIO; } - free(datafiles); - return matched_files - failed_to_load; + return matched_files; } /* Creates a datafile and a journalfile pair */ -void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; int ret; + char path[RRDENG_PATH_MAX]; - info("Creating new data and journal files."); + info("Creating new data and journal files in path %s", ctx->dbfiles_path); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, fileno); ret = create_data_file(datafile); - assert(!ret); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = create_journal_file(journalfile, datafile); - assert(!ret); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; } -/* Page cache must already be initialized. */ +/* Page cache must already be initialized. + * Return 0 on success. + */ int init_data_files(struct rrdengine_instance *ctx) { int ret; ret = scan_data_files(ctx); - if (0 == ret) { - info("Data files not found, creating."); - create_new_datafile_pair(ctx, 1, 1); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; } + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + + } }
\ No newline at end of file |