summaryrefslogtreecommitdiffstats
path: root/database/engine/datafile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r--database/engine/datafile.c335
1 files changed, 335 insertions, 0 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 00000000..2d17d05e
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,335 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return 0;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ datafile->file = file;
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.datafile_creations;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[1024], path2[1024];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ assert(ret >= 0);
+ assert(req.result >= 0);
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s\"", dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s\"", dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ free(datafile);
+ ++failed_to_load;
+ continue;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ free(datafile);
+ free(journalfile);
+ ++failed_to_load;
+ continue;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ if (failed_to_load) {
+ error("%u files failed to load.", failed_to_load);
+ }
+ free(datafiles);
+
+ return matched_files - failed_to_load;
+}
+
+/* Creates a datafile and a journalfile pair */
+void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+
+ info("Creating new data and journal files.");
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ assert(!ret);
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ assert(!ret);
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+}
+
+/* Page cache must already be initialized. */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (0 == ret) {
+ info("Data files not found, creating.");
+ create_new_datafile_pair(ctx, 1, 1);
+ }
+ return 0;
+} \ No newline at end of file