summaryrefslogtreecommitdiffstats
path: root/database/engine/datafile.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-07-08 20:14:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-07-08 20:14:42 +0000
commit4f88e1a9be89a257fd6ed3045703db6e900027ee (patch)
tree518eb3c3aa1dce9ea281d02e0fd3cc01a9e7913f /database/engine/datafile.c
parentAdding upstream version 1.15.0. (diff)
downloadnetdata-4f88e1a9be89a257fd6ed3045703db6e900027ee.tar.xz
netdata-4f88e1a9be89a257fd6ed3045703db6e900027ee.zip
Adding upstream version 1.16.0.upstream/1.16.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r--database/engine/datafile.c224
1 files changed, 157 insertions, 67 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 2d17d05e4..8ef4ed599 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->ctx = ctx;
}
-static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+
int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.datafile_deletions;
- return 0;
+ return ret;
}
int create_data_file(struct rrdengine_datafile *datafile)
@@ -97,21 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile)
int ret, fd;
struct rrdeng_df_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -125,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
- datafile->file = file;
datafile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.datafile_creations;
return 0;
}
@@ -182,25 +205,17 @@ static int load_data_file(struct rrdengine_datafile *datafile)
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
- uv_fs_req_cleanup(&req);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
- assert(req.result >= 0);
- file = req.result;
- uv_fs_req_cleanup(&req);
-#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
- fcntl(fd, F_NOCACHE, 1);
-#endif
info("Initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
@@ -221,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile)
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
static int scan_data_files_cmp(const void *a, const void *b)
{
struct rrdengine_datafile *file1, *file2;
- char path1[1024], path2[1024];
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
file1 = *(struct rrdengine_datafile **)a;
file2 = *(struct rrdengine_datafile **)b;
@@ -238,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b)
return strcmp(path1, path2);
}
-/* Returns number of datafiles that were loaded */
+/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
int ret;
@@ -249,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
- assert(ret >= 0);
- assert(req.result >= 0);
+ if (ret < 0) {
+ assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
info("Found %d files in path %s", ret, ctx->dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s\"", dent.name);
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
- info("Matched file \"%s\"", dent.name);
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -266,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
uv_fs_req_cleanup(&req);
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
if (matched_files == MAX_DATAFILES) {
error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
- free(datafile);
+ freez(datafile);
++failed_to_load;
- continue;
+ break;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
- free(datafile);
- free(journalfile);
+ close_data_file(datafile);
+ freez(datafile);
+ freez(journalfile);
++failed_to_load;
- continue;
+ break;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
+ freez(datafiles);
if (failed_to_load) {
- error("%u files failed to load.", failed_to_load);
+ error("%u datafiles failed to load.", failed_to_load);
+ finalize_data_files(ctx);
+ return UV_EIO;
}
- free(datafiles);
- return matched_files - failed_to_load;
+ return matched_files;
}
/* Creates a datafile and a journalfile pair */
-void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
{
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
int ret;
+ char path[RRDENG_PATH_MAX];
- info("Creating new data and journal files.");
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, fileno);
ret = create_data_file(datafile);
- assert(!ret);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = create_journal_file(journalfile, datafile);
- assert(!ret);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
}
-/* Page cache must already be initialized. */
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
ret = scan_data_files(ctx);
- if (0 == ret) {
- info("Data files not found, creating.");
- create_new_datafile_pair(ctx, 1, 1);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
}
+
return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+
+ }
} \ No newline at end of file